mqttjs / MQTT.js

The MQTT client for Node.js and the browser
Other
8.53k stars 1.41k forks source link

[Bug]: JavascriptException: Error: stream.push() after EOF, js engine: hermes #1914

Open sanduluca opened 1 month ago

sanduluca commented 1 month ago

MQTTjs Version

5.8.0

Broker

AWS

Environment

React Native (engine: Hermes)

Description

After releasing a new version of out react native app with the mqtt feature we started receiving crashes in Firebase crashlytics. image

This is the only issue I found related to this problem https://github.com/nodejs/readable-stream/issues/207

Code ```tsx // mqtt.tsx import React, { createContext, useContext } from 'react'; import mqtt from 'mqtt'; import useMqttConnection, { MqttError, MqttStatus } from 'hooks/useMqttConnection'; import { useAppSelector } from 'store'; interface MqttContextValue { mqttClient: mqtt.MqttClient | null; mqttStatus: MqttStatus; mqttError: MqttError | null; subscribeToTopic: (topic: string, ops?: mqtt.IClientSubscribeOptions) => void; } // @ts-ignore const MqttContext = createContext({ mqttClient: {} }); export const MqttProvider = ({ children }: React.PropsWithChildren) => { const isAuthenticated = useAppSelector(state => state.auth.authenticated); const { mqttClient, mqttStatus, mqttError, setMqttError, setMqttStatus } = useMqttConnection(isAuthenticated); const subscribeToTopic = (topic: string, ops: mqtt.IClientSubscribeOptions = { qos: 1 }) => { if (!mqttClient) return; mqttClient.subscribe(topic, ops, error => { if (error) { setMqttStatus('Error'); setMqttError({ type: 'MqttTopic', msg: error.message, }); } }); }; return ( {children} ); }; export const useMqtt = () => useContext(MqttContext); ``` ```ts // useMqttConnection.ts import { useState, useEffect, useRef, useCallback } from 'react'; import mqtt, { Timer } from 'mqtt'; import BackgroundTimer from 'react-native-background-timer'; import { instance } from 'api'; export type MqttStatus = 'Connected' | 'Disconnected' | 'Offline' | 'Reconnecting' | 'Error'; export type MqttError = { type: string; msg: string }; interface WssDetails { signedUrl: string; clientId: string; validitySeconds: number; } // dont directly assign methods to timer object otherwise this throws: Cannot set property 'NaN' of undefined const timer: Timer = { clear: id => BackgroundTimer.clearInterval(id), // @ts-expect-error set: (func, time) => BackgroundTimer.setInterval(func, time), }; const getWssDetails = () => instance.get('/app/wss'); function useMqttConnection(isAuthenticated: boolean) { const [mqttStatus, setMqttStatus] = useState('Disconnected'); const [mqttError, setMqttError] = useState(null); const [mqttClient, setMqttClient] = useState(null); const [wssDetails, setWssDetails] = useState(null); const wssDetailsRef = useRef(wssDetails); const isFetchingWssDetails = useRef(false); wssDetailsRef.current = wssDetails; const hasWssDetails = !!wssDetailsRef.current; const doMqttConnection = isAuthenticated; const fetchWssDetails = useCallback(() => { isFetchingWssDetails.current = true; return getWssDetails() .then(r => { setWssDetails(r.data); }) .catch(() => { // in case we could not fetch initial details, retry every 25 seconds setTimeout(() => { if (!wssDetailsRef.current) { fetchWssDetails(); } }, 25000); }) .finally(() => { isFetchingWssDetails.current = false; }); }, []); useEffect(() => { if (!doMqttConnection) return; fetchWssDetails(); }, [doMqttConnection, fetchWssDetails]); useEffect(() => { if (!doMqttConnection || !hasWssDetails) return; const transformWsUrl = ( url: string, options: mqtt.IClientOptions, currentClient: mqtt.MqttClient, ) => { if (!isFetchingWssDetails.current) { fetchWssDetails(); } currentClient.options.clientId = wssDetailsRef.current!.clientId; return wssDetailsRef.current!.signedUrl; }; const client = mqtt .connect(wssDetailsRef.current!.signedUrl, { clientId: wssDetailsRef.current!.clientId, reconnectPeriod: 5000, queueQoSZero: true, resubscribe: true, clean: true, keepalive: 60, protocolVersion: 5, properties: { sessionExpiryInterval: 600, }, timerVariant: timer, transformWsUrl, }) .on('connect', () => { setMqttStatus('Connected'); }) .on('error', error => { setMqttError({ type: 'MqttGeneral', msg: error.message }); }) .on('disconnect', () => { setMqttStatus('Disconnected'); }) .on('offline', () => { setMqttStatus('Offline'); }) .on('reconnect', () => { setMqttStatus('Reconnecting'); }) .on('close', () => { setMqttStatus('Disconnected'); }); setMqttClient(client); return () => { client.end(); }; }, [doMqttConnection, fetchWssDetails, hasWssDetails]); return { mqttClient, mqttStatus, mqttError, setMqttStatus, setMqttError, }; } export default useMqttConnection; ```

Minimal Reproduction

Unfortunately we cannot reproduce this locally. We are only recording crashes in Firebase Crashlytics

Debug logs

Fatal Exception: com.facebook.react.common.JavascriptException: Error: stream.push() after EOF, js engine: hermes, stack:
anonymous@3372:233
c@3372:1426
S@3375:2499
anonymous@3375:3938
_@3478:2534
dispatchEvent@135:5649
anonymous@154:3328
value@52:779
value@49:935
value@37:3897
anonymous@37:693
value@37:2528
value@37:664

       at com.facebook.react.modules.core.ExceptionsManagerModule.reportException(ExceptionsManagerModule.java:65)
       at java.lang.reflect.Method.invoke(Method.java)
       at com.facebook.react.bridge.JavaMethodWrapper.invoke(JavaMethodWrapper.java:372)
       at com.facebook.react.bridge.JavaModuleWrapper.invoke(JavaModuleWrapper.java:146)
       at com.facebook.jni.NativeRunnable.run(NativeRunnable.java)
       at android.os.Handler.handleCallback(Handler.java:996)
       at android.os.Handler.dispatchMessage(Handler.java:110)
       at com.facebook.react.bridge.queue.MessageQueueThreadHandler.dispatchMessage(MessageQueueThreadHandler.java:27)
       at android.os.Looper.loopOnce(Looper.java:210)
       at android.os.Looper.loop(Looper.java:302)
       at com.facebook.react.bridge.queue.MessageQueueThreadImpl$4.run(MessageQueueThreadImpl.java:233)
       at java.lang.Thread.run(Thread.java:1012)
robertsLando commented 1 month ago

@sanduluca what was the previous working version you were using? What are the steps to reproduce the issue? Does it happens randomly?

sanduluca commented 1 month ago

We were not using the mqtt before this. We could not reproduce this. Yes. It seems to happen randomly. We have 2-5 crashes a day on a 1000+ users daily

robertsLando commented 1 month ago

It's hard based on that log to say what could trigger that, could you try to at least provide me a valid stack trace? Something that points me to the line that is triggering the issue.

IMO giving that we treat react native as a browser and it is using wss the error cold be in this two places:

https://github.com/mqttjs/MQTT.js/blob/2da3b3401db1bb880801c7264a2a51dc08d50f62/src/lib/connect/ws.ts#L263

https://github.com/mqttjs/MQTT.js/blob/2da3b3401db1bb880801c7264a2a51dc08d50f62/src/lib/BufferedDuplex.ts#L59

I dunno if adding a check before the push could fix the issue

@mcollina any hint?

robertsLando commented 1 month ago

Try using version 5.9.1

sanduluca commented 2 weeks ago

Ok, after releasing a few new updates to our users with new mqtt version we still receive JavascriptException: Error: stream.push() after EOF

// package.json
"mqtt": "^5.9.1",
// yarn.lock
mqtt-packet@^9.0.0:
  version "9.0.0"
  resolved "https://registry.yarnpkg.com/mqtt-packet/-/mqtt-packet-9.0.0.tgz#fd841854d8c0f1f5211b00de388c4ced45b59216"
  integrity sha512-8v+HkX+fwbodsWAZIZTI074XIoxVBOmPeggQuDFCGg1SqNcC+uoRMWu7J6QlJPqIUIJXmjNYYHxBBLr1Y/Df4w==
  dependencies:
    bl "^6.0.8"
    debug "^4.3.4"
    process-nextick-args "^2.0.1"

mqtt@^5.2.0, mqtt@^5.9.1:
  version "5.9.1"
  resolved "https://registry.yarnpkg.com/mqtt/-/mqtt-5.9.1.tgz#422ded61d432b995d931ae4d9c470684f33e3289"
  integrity sha512-FMENfSUMfCSUCnkuUVAL4U01795SUEfrX0NZ53HNr1r2VNpwKhR5Au9viq9WCFGtgrDAmsll4fkloqFCFgStYA==
  dependencies:
    "@types/readable-stream" "^4.0.5"
    "@types/ws" "^8.5.9"
    commist "^3.2.0"
    concat-stream "^2.0.0"
    debug "^4.3.4"
    help-me "^5.0.0"
    lru-cache "^10.0.1"
    minimist "^1.2.8"
    mqtt "^5.2.0"
    mqtt-packet "^9.0.0"
    number-allocator "^1.0.14"
    readable-stream "^4.4.2"
    reinterval "^1.1.0"
    rfdc "^1.3.0"
    split2 "^4.2.0"
    worker-timers "^7.1.4"
    ws "^8.17.1"

The error seems to come from onMessage

Fatal Exception: com.facebook.react.common.JavascriptException
Error: stream.push() after EOF, js engine: hermes, stack: anonymous@1:3472651 NodeError@1:3473631 readableAddChunk@1:3484528 anonymous@1:3488655 onMessage@1:3684762 dispatchEvent@1:265253 anonymous@1:287806 emit@1:123298 emit@1:121941 __callFunction@1:115290 anonymous@1:113595 __guard@1:114551 callFunctionReturnFlushedQueue@1:113553

Here: https://github.com/mqttjs/MQTT.js/blob/395e066b884727ed84a8ce4c3ee2cc96a0e38b04/src/lib/connect/ws.ts#L264

image image

robertsLando commented 2 weeks ago

@sanduluca I'm sorry but I'm not able to reproduce this issue anyway, could you try to fix this yourself and submit a PR? For what I know that issue happens when writing to a stream that is destroyed and the check is there now, no clue whatever could cause that :(

there is also another check here

https://github.com/mqttjs/MQTT.js/blob/395e066b884727ed84a8ce4c3ee2cc96a0e38b04/src/lib/BufferedDuplex.ts#L59

As the proxy could be a buffered duplex so there are 2 sreams to handle

sanduluca commented 2 weeks ago

I am not able to reproduce it constantly either. I caught it myself for the first time yesterday. Here is a screenshot: cloud-photo-size-2-5373186271675023670-y

Anyway, I was searching what we can do and found a possible solution. Opened a PR for this