HunterLarco / twitter-v2

An asynchronous client library for the Twitter REST and Streaming API's
MIT License
163 stars 35 forks source link

Streams unreliable for collecting tweets from slow tweeting accounts #66

Open rctgamer3 opened 3 years ago

rctgamer3 commented 3 years ago

Prerequisites

Description

The streams part of twitter-v2 is unreliable. I've set it to monitor five twitter accounts that tweet about once every six hours.

Steps to Reproduce

  1. Watch several accounts in a filtered stream that don't post that often.
  2. Start streaming tweets
  3. Streams randomly quit within 30-45 minutes or less.

Expected behaviour:

Should be able to run indefinitely. twitter-v2 should not disconnect the stream just because it thinks the stream is unresponsive. The sample code by Twitter themselves does not have this behaviour.

Actual behaviour:

Streams randomly hit this._closeWithError.

    at Timeout.<anonymous> (/home/rctgamer3/project/node_modules/twitter-v2/src/TwitterStream.js:50:30)
    at listOnTimeout (internal/timers.js:554:17)
    at processTimers (internal/timers.js:497:7)

Stream timeout's, and throws a TwitterError that the stream is unresponsive. Reproduces how often: Always, the only variable is the time until it closes the stream.

Versions

1.0.8

Additional Information

If you need any further information to triage things, let me know.

HunterLarco commented 3 years ago

@rctgamer3 I've been listening to a stream filtered on an abandoned twitter account for the last 24 hours and it has experienced no issues. To look into this further could you provide either of the following:

Note that you may actually just be running into network weather closing your connection (hence the unresponsive error). Checkout this section of the readme to implement retry logic.

rctgamer3 commented 3 years ago

Oh, sorry, forgot about this. I'll try to get you the requested information.

gustawdaniel commented 3 years ago

This is code to reproduce:

import TwitterStream from "twitter-v2/build/TwitterStream";

const Twitter = require('twitter-v2');

const client = new Twitter({
    bearer_token: process.env.TWITTER_API_BEARER_TOKEN,
});

// client.get('tweets/search/stream/rules').then(console.log);

const listenForever = async (
    streamFactory: () => TwitterStream,
    dataConsumer: (data: any) => void
): Promise<any> => {
    try {
        for await (const {data} of streamFactory()) {
            dataConsumer(data);
        }
        // The stream has been closed by Twitter. It is usually safe to reconnect.
        console.log('Stream disconnected healthily. Reconnecting.');
        return listenForever(streamFactory, dataConsumer);
    } catch (error) {
        // An error occurred so we reconnect to the stream. Note that we should
        // probably have retry logic here to prevent reconnection after a number of
        // closely timed failures (may indicate a problem that is not downstream).
        console.warn('Stream disconnected with error. Retrying.', error);
        return listenForever(streamFactory, dataConsumer);
    }
}

listenForever(
    () => client.stream('tweets/search/stream'),
    (data) => console.log(data)
).catch(console.error);

And this is results:

{
  id: '1407654882488823808',
  text: 'GET https://t.co/qnn5a0VaXe for check rules of filtering messages on twitter.'
}
Stream disconnected with error. Retrying. TwitterError: Stream unresponsive
    at Timeout.<anonymous> (/media/nvme/pro/crypto/twitter-producer/node_modules/twitter-v2/build/TwitterStream.js:43:38)
    at listOnTimeout (node:internal/timers:556:17)
    at processTimers (node:internal/timers:499:7)
Stream disconnected with error. Retrying. TwitterError: Stream unresponsive
    at Timeout.<anonymous> (/media/nvme/pro/crypto/twitter-producer/node_modules/twitter-v2/build/TwitterStream.js:43:38)
    at listOnTimeout (node:internal/timers:556:17)
    at processTimers (node:internal/timers:499:7)

First json when I posted tweet:

https://twitter.com/PreciseLabPL/status/1407654882488823808

Next error some minutes after my tweet and next error some minutes after.

There is connected topic:

https://twittercommunity.com/t/filtered-stream-disconnects-every-5-minutes-on-nodejs/153866/7

My rules:

https://api.twitter.com/2/tweets/search/stream/rules

{
    "data": [
        {
            "id": "1407650287389712386",
            "value": "from:PreciseLabPL OR from:elonmusk"
        }
    ],
    "meta": {
        "sent": "2021-06-23T11:00:28.629Z"
    }
}
gustawdaniel commented 3 years ago

Third error after some next minutes (about 5) and in this time both PreciseLabPL and elonmusk did not send tweets.

gustawdaniel commented 3 years ago

Every 20 sec there is send empty

stream.on('data', (line) => {

then

_refreshTimeout() {

is runned and because this._state is equal to 1 (number) - State.STARTED then timeout is cleared

    if (this._timeout) {
        clearTimeout(this._timeout);
    }

but when in these 30 seconds twitter will not send empty message to "on.('data')" then we can observe this error:

There is fragment from monitoring ( I added some console logs to this package ):

on data line 200560 
_refreshTimeout run 200560
this._state this._wait 200561 1 30000
on data line 220565 
_refreshTimeout run 220565
this._state this._wait 220565 1 30000
on data line 240575 
_refreshTimeout run 240576
this._state this._wait 240576 1 30000
on data line 260587 
_refreshTimeout run 260587
this._state this._wait 260587 1 30000
on data line 280594 
_refreshTimeout run 280594
this._state this._wait 280595 1 30000
this._state timeout 310604 1
Stream disconnected with error. Retrying. TwitterError: Stream unresponsive
    at Timeout.<anonymous> (/media/nvme/pro/crypto/twitter-producer/node_modules/twitter-v2/build/TwitterStream.js:56:38)
    at listOnTimeout (node:internal/timers:556:17)
    at processTimers (node:internal/timers:499:7)
construct TwitterStream {
  _connect: [AsyncFunction (anonymous)],
  _close: [Function (anonymous)],
  _state: 0,
  _events: [
    DeferredPromise {
      resolve: [Function (anonymous)],
      reject: [Function (anonymous)],
      promise: [Promise]
    }
  ],
  _wait: 30000
}
_refreshTimeout run 311069
this._state this._wait 311070 1 30000
on data line 331073 
_refreshTimeout run 331073
this._state this._wait 331074 1 30000
on data line 351076 
_refreshTimeout run 351076
this._state this._wait 351077 1 30000

You can see that when error is thrown there difference between time of last _refreshTimeout was exactly 30 sec.

310604-280595 = 30009

I observer two errors

  1. in 310 sec - about 5 min after start
  2. in 620 sec - about 10 min after start

I changed timeout from 30 sec to 120 sec and have this error again

this._state this._wait 260591 1 120000
on data line 280600 
_refreshTimeout run 280601
this._state this._wait 280601 1 120000
this._state timeout 400682 1
Stream disconnected with error. Retrying. TwitterError: Stream unresponsive
    at Timeout.<anonymous> (/media/nvme/pro/crypto/twitter-producer/node_modules/twitter-v2/build/TwitterStream.js:56:38)
    at listOnTimeout (node:internal/timers:556:17)
    at processTimers (node:internal/timers:499:7)
construct TwitterStream {
  _connect: [AsyncFunction (anonymous)],
  _close: [Function (anonymous)],
  _state: 0,
  _events: [
    DeferredPromise {
      resolve: [Function (anonymous)],
      reject: [Function (anonymous)],
      promise: [Promise]
    }
  ],
  _wait: 120000
}
_refreshTimeout run 401117
this._state this._wait 401118 1 120000

My propositions

1 change name of error from

Stream unresponsive

to

Refresh Timeout Exceeded

describe this error as lack of response from twitter.

  1. close connection by this.close(); instead of this._closeWithError(new TwitterError_1.default('Stream unresponsive'))

  2. Check other events on streams


Other events:

Docs:

https://nodejs.org/docs/latest-v16.x/api/stream.html

Class: stream.Readable Event: 'close' Event: 'data' Event: 'end' Event: 'error' Event: 'pause' Event: 'readable' Event: 'resume'

Class: stream.Writable Event: 'close' Event: 'drain' Event: 'error' Event: 'finish' Event: 'pipe' Event: 'unpipe'

I our case we should have readable stream:

https://github.com/node-fetch/node-fetch#streams

but we have writable

It can be connected with pipe

const stream = response.body.pipe(split_1.default());
stream Stream {
  _events: [Object: null prototype] {
    end: [Function (anonymous)],
    unpipe: [Function: onunpipe],
    error: [Function: onerror],
    close: [Function: bound onceWrapper] { listener: [Function: onclose] },
    finish: [Function: bound onceWrapper] { listener: [Function: onfinish] }
  },
  _eventsCount: 5,
  _maxListeners: undefined,
  writable: true,
  readable: true,
  paused: false,
  autoDestroy: true,
  write: [Function (anonymous)],
  push: [Function (anonymous)],
  queue: [Function (anonymous)],
  end: [Function (anonymous)],
  destroy: [Function (anonymous)],
  pause: [Function (anonymous)],
  resume: [Function (anonymous)],
  [Symbol(kCapture)]: false
}

but response is

response.body PassThrough {
  _readableState: ReadableState {
    objectMode: false,
    highWaterMark: 16384,
    buffer: BufferList { head: null, tail: null, length: 0 },
    length: 0,
    pipes: [],
    flowing: null,
    ended: false,
    endEmitted: false,
    reading: false,
    constructed: true,
    sync: false,
    needReadable: false,
    emittedReadable: false,
    readableListening: false,
    resumeScheduled: false,
    errorEmitted: false,
    emitClose: true,
    autoDestroy: true,
    destroyed: false,
    errored: null,
    closed: false,
    closeEmitted: false,
    defaultEncoding: 'utf8',
    awaitDrainWriters: null,
    multiAwaitDrain: false,
    readingMore: false,
    decoder: null,
    encoding: null,
    [Symbol(kPaused)]: null
  },
  _events: [Object: null prototype] {
    prefinish: [Function: prefinish],
    unpipe: [Function: onunpipe],
    error: [ [Function: onerror], [Function (anonymous)] ],
    close: [Function: bound onceWrapper] { listener: [Function: onclose] },
    finish: [Function: bound onceWrapper] { listener: [Function: onfinish] }
  },
  _eventsCount: 5,
  _maxListeners: undefined,
  _writableState: WritableState {
    objectMode: false,
    highWaterMark: 16384,
    finalCalled: false,
    needDrain: false,
    ending: false,
    ended: false,
    finished: false,
    destroyed: false,
    decodeStrings: true,
    defaultEncoding: 'utf8',
    length: 0,
    writing: false,
    corked: 0,
    sync: true,
    bufferProcessing: false,
    onwrite: [Function: bound onwrite],
    writecb: null,
    writelen: 0,
    afterWriteTickInfo: null,
    buffered: [],
    bufferedIndex: 0,
    allBuffers: true,
    allNoop: true,
    pendingcb: 0,
    constructed: true,
    prefinished: false,
    errorEmitted: false,
    emitClose: true,
    autoDestroy: true,
    errored: null,
    closed: false,
    closeEmitted: false,
    [Symbol(kOnFinished)]: []
  },
  allowHalfOpen: true,
  [Symbol(kCapture)]: false,
  [Symbol(kCallback)]: null
}
gustawdaniel commented 3 years ago

There is connected issue:

https://github.com/twitterdev/Twitter-API-v2-sample-code/issues/34

I finally suggest write in README that this package can be used ONLY with node 12 until mentioned issue will be solved and that twitter developers are aware of this bug and works to fix it.


I confirming that

nvm use 12

fixed problem.