geduldig / TwitterAPI

Minimal python wrapper for Twitter's REST and Streaming APIs
936 stars 263 forks source link

gracefully handling a stalled stream #210

Closed alexge233 closed 1 year ago

alexge233 commented 2 years ago

Hi,

I keep getting a Twitter stream stalled due to operational disconnect. I handle this with an error decorator, which then tries to reconnect using retry and some back off delay. However, when I do this, I eventually get a "This stream is currently at the maximum allowed connection limit" which as I understand, means I've been creating new connections, rather than killing and restarting the old one. What's the advised or suggested way to kill and destroy the previous stream connection, and then reconnect from scratch?

geduldig commented 2 years ago

What is the exact request you are using? What is the error message and number you are getting back?

alexge233 commented 2 years ago

The error I initially get is Twitter stream stalled which I then try to fix by reconnecting.

The error I eventually get which I need help solving is This stream is currently at the maximum allowed connection limit. I suspect the reason is that instead of destroying the existing connection, I open a new one.

So far I've tried to delete my member reference to TwitterAPI when I try to reconnect.

My question is if there's a better way to deal with a failed connection. I had a quick look at the source code but couldn't figure it out.

alexge233 commented 2 years ago

This is my reconnect method, the class owns an instance of TwitterAPI and is a Wrapper around TwitterAPI and other code. It calls start shown right below.

def reconnect(self):
        del self.api
        self.api = TwitterAPI(self.consumer_key,
                              self.consumer_secret,
                              auth_type='oAuth2',
                              api_version='2')
        assert self.api is not None
        self.start()

This is my start method which I initially call:

def start(self):
        EXPANSIONS   = 'author_id'
        TWEET_FIELDS = 'author_id,created_at,entities,id,lang,public_metrics,source,text'
        USER_FIELDS  = 'created_at,description,location,name,username'

        r = self.api.request('tweets/search/stream', {
                             'expansions': EXPANSIONS,
                             'tweet.fields': TWEET_FIELDS,
                             'user.fields': USER_FIELDS },
                     hydrate_type=HydrateType.APPEND)

        if r.status_code != 200:
            logger.error(f'Stream.request_error {r.status_code} => {r.text}')

        for item in r:
            self.callback(item)

The callback captures data and deals with it accordingly.

Through the use of a decorator which can see what the exceptions are, I try to reconnect when I get a TwitterConnectionError shown below.

What usually happens is I get multiple errors of Stream Stalled due to operational reasons

The error decorator captures this and then tries to reconnect:

def __call__(self, f):
        __error__ = self
        @functools.wraps(f)
        def __impl(self, *args, **kwargs):
            try:
                return f(self, *args, **kwargs)
            except TwitterRequestError as e:
                logger.error(f"{__error__.__caller__}.request_error {e}")
                return e
            except TwitterConnectionError as e:
                logger.error(f"{__error__.__caller__}.connection_error {e}")
                self.reconnect()
            except Exception as e:
                logger.exception(f"{__error__.__caller__}.exception => {e}")
                return e
        return __impl

What eventually happens is (this is a guess), Twitter sees many open stream connections coming from me, and disconnects me with a different error 429 which says This stream is currently at the maximum allowed connection limit and then it has "connection_issue" : "TooManyConnections". Empirically I see this happen after I have received 7 "OperationalDisconnect" with Twitter Stream Stalled.

Hope the above helps.

geduldig commented 2 years ago

The stream wlll automatically disconnect when there is no data for 90 seconds. Probably what is happening is this is occurring repeatedly and so you are reconnecting too frequently. Twitter recommends that you increase (double) the interval before you reconnect with each attempt. Here is the relevant doc: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections

The 90 second rule is something built into TwitterAPI. I'm not sure if this will help you, but you can increase the 90 second timeout by setting an environment variable DEFAULT_STREAMING_TIMEOUT. This is 90 by default.

Let me know if none of this helps. Jonas

alexge233 commented 2 years ago

Hi Jonas, thanks for the advice.

I've added a delay and back-off to reconnecting using the retry module. I'll setup the streaming timeout to 90 seconds as well. Is del self.api enough to invalidate the previous object?

The documentation seems to suggest I am trying to reconnect too fast, so I will add a delay between reconnects too and see what happens. The Twitter documentation you linked to says to use at least 1 min delay if I receive a 429 error.

I'll test it this week and report back for posterity.

geduldig commented 2 years ago

You don't need del self.ap because the connection and disconnection happen inside the request method. You simply will call request again after waiting the appropriate amount of time.

You probably do not need to change the streaming timout from 90 seconds, but it is there if you decide to. That timeout is there just to insure that request does actually disconnect after 90 seconds of no data in the stream. I've never tested how long one can keep the stream open with no data. It's possible 90 seconds is overly conservative. I don't know.

Yes, you probably are reconnecting after too short of a wait. As that link describes, when 429 errors occur you are supposed to wait 1 minute. If that reconnect fails, double the wait to 2 minutes, and so on.

alexge233 commented 2 years ago

Yeah it's starting to make a lot of sense now; I leave the stream open for days on end, so I suspect during times of inactivity the 90 seconds time-out might be too low. I've made the changes you suggested:

This is a filtered stream so I am not getting many tweets, hence why it may be timing out. I wonder if there's a more elegant way of handling such disconnects due to inactivity. So far it's been ok, I'll let it run until the end of the week and get back to you if fixed. Thank you very much for your help, it is greatly appreciated!

alexge233 commented 2 years ago

Ok, I've gotten the Twitter Stream stalled error multiple times in the past week, but the Too many connections isn't showing up any more. I've got a timeout of 360 and a reconnect delay of 60 with a back off of 60. I'm closing this as it seems to have stopped the issue from happening again for now! Thank you very much for your help!

alexge233 commented 1 year ago

Hi there!

Unfortunately I never really managed to solve this, and recently it has come back with a vengeance. I did some digging and found out others also have it, and that it prevails in both Rust and Python: https://twittercommunity.com/t/rate-limit-on-tweets-stream-api/144389/23

I suspect the offending line is: https://github.com/geduldig/TwitterAPI/blob/079d28e8e06f5409c940bd6fcd8a784b0407a2b0/TwitterAPI/TwitterAPI.py#L222

The reason being that what is needed there is a with statement, otherwise the stream isn't released when the exception is thrown. Kind of like this:

with requests.get('http://httpbin.org/get', stream=True) as r:

Looking into my logs, I see that the connection is never released. The Twitter API error then makes sense, Twitter sees too many connections from me, because I keep reconnecting using a new connection stream each time.

What I usually see is:

  1. first I get {'title': 'operational-disconnect', 'disconnect_type': 'OperationalDisconnect', 'detail': 'This stream has been disconnected for operational reasons.', 'type': 'https://api.twitter.com/2/problems/operational-disconnect'}
  2. then I will get the error Twittter connection_error => Twitter stream stalled
  3. then I will get Twittter connection_error => ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
  4. and finally I get ERROR Twittter connection_error => HTTPSConnectionPool(host='api.twitter.com', port=443): Read timed out.

Those errors on their own appear to make sense to me at least. Here's where it gets interesting. The following error will spam over many minutes or even hours:

429 && {"title":"ConnectionException","detail":"This stream is currently at the maximum allowed connection limit.","connection_issue":"TooManyConnections","type":"https://api.twitter.com/2/problems/streaming-connection"}

This implies that I have too many open connections to Twitter, meaning when I reconnect, the older ones don't close down. I know I haven't reached my Twitter cap (all I do is read, and I have 2mil per month) so I'm thinking it is the connection cap: https://developer.twitter.com/en/support/twitter-api/error-troubleshooting

Meaning that I am sending more than 50 requests per 15min time-frame.

Reading into requests, you can see that stream = True is mentioned and needs to be manually released: https://requests.readthedocs.io/en/latest/user/advanced/#body-content-workflow

However, I can't see you setting stream = False or stream = True anywhere, which is the piece of the puzzle I'm missing. I'm willing to bet that the connection pool has still lingering connections, even though Twitter claims to have shut them down on their end. As a different solution, maybe instead of a with a try and finally may be needed and then manually shut it down.

I have followed the previous solution of reconnecting, and it does work, but it spams my logs with multiple disconnection messages, even when I back off progressively while disconnecting. I'm also worried of Twitter black-listing me because of this.

Alternatively, I could manually be calling close on TwitterResponse when I detect this error, or if this should be done within TwitterAPI?

I'm happy to try and write a patch if you don't have the time :-)

geduldig commented 1 year ago

Hello Alex, Sorry this is still not resolved. I haven't seen the issue myself. I'll run the stream example over night to see what occurs. If you can write a patch and show that it solves the issue, please do!

Jonas

geduldig commented 1 year ago

I've been running something similar to https://github.com/geduldig/TwitterAPI/blob/master/examples/v2/stream_tweets.py for the past several hours. No 429 errors yet, and the program has not stopped. I'll leave it running.

I do not call requests.get() inside TwitterAPI.requests(). I use requests.Session(), and this is called with with.

geduldig commented 1 year ago

I still cannot replicate your issue. Can you provide me with a complete and minimal code example that demonstrates the issue?

alexge233 commented 1 year ago

Hi,

I’ll upload tomorrow the code and logs. It takes a few days before it shows up. I’m also gonna try to manually close the connection when the error is thrown to see if it stops the subsequent error. Sorry for not replying earlier.

Cheers, Alex

On 7 Nov 2022, at 14:01, geduldig @.***> wrote:



I still cannot replicate your issue. Can you provide me with a complete and minimal code example that demonstrates the issue?

— Reply to this email directly, view it on GitHubhttps://github.com/geduldig/TwitterAPI/issues/210#issuecomment-1305657972, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABO73A5Q5XFKQ7Y2K355WTLWHEDSHANCNFSM5NAREUFA. You are receiving this because you modified the open/close state.Message ID: @.***>

geduldig commented 1 year ago

I would be interested to know if you see this error with this example: https://github.com/geduldig/TwitterAPI/blob/master/examples/v2/stream_tweets.py

Would you mind trying that as well?

alexge233 commented 1 year ago

I'll run the example for a few days, see what happens. Normally, it takes quite a bit of time, because the stream has to fail enough times before I start seeing the error. The actual code I'm using is above with a minor change, which is that when the error decorator is called, it calls the start method (I've renamed it reconnect).

Here's the reconnect code:

    @retry(delay=120, backoff=2)
    @error.TwitterError(caller='Stream')
    def reconnect(self):
        time.sleep(65)
        self.start()

Here's the actual streaming function:

    @error.TwitterError(caller='Stream')
    def start(self):
        EXPANSIONS   = 'author_id'
        TWEET_FIELDS = 'author_id,created_at,entities,id,lang,public_metrics,source,text'
        USER_FIELDS  = 'created_at,description,location,name,username'

        r = self.api.request('tweets/search/stream', {
                             'expansions': EXPANSIONS,
                             'tweet.fields': TWEET_FIELDS,
                             'user.fields': USER_FIELDS },
                     hydrate_type=HydrateType.APPEND)

        logger.info(f'[{r.status_code}] starting twitter streaming')
        if r.status_code != 200:
            logger.error(f'Stream.request_error {r.status_code} => {r.text}')

            if r.status_code == 429:
                time.sleep(60)
                raise TwitterConnectionError(
                    f'Stream.Operational Disconnect => {r.status_code} && {r.text}')
                return

            else:
                time.sleep(60)
                raise TwitterConnectionError(
                    f'Stream.Unknown Disconnect => {r.status_code} && {r.text}')
                return
        else:
            for item in r:
                self.callback(item)

The self.callback is simply a parser that does other voodoo not related to the stream. The decorator which captures the errors is below:

class TwitterError():
    def __init__(self, caller):
        self.__caller__ = caller

    def __call__(self, f):
        __error__ = self
        @functools.wraps(f)
        def __impl(self, *args, **kwargs):
            try:
                return f(self, *args, **kwargs)

            except TwitterRequestError as e:
                logger.error(f"Twitter request_error => {e}")
                return e

            except TwitterConnectionError as e:
                logger.error(f"Twittter connection_error => {e}")
                self.reconnect()

            except Exception as e:
                logger.exception(f"Twitter exception => {e}")
                return e

        return __impl

I run this for months on end, and I've setup the maximum amount of allowed filtering rules. It runs on a VPS, and it consistently spams my logs. The way I got around it was as you originally suggested to back-off and reconnect later. Which kind of works, but it doesn't really as it masks the problem.

My suspicion is that the underlying socket is never shut down properly, and every time I call reconnect a new one is opened. I'm inclined to add a self.api.close() in reconnect to see if that makes a difference. In the meantime I'll let your example run for a few days and see what happens. I'll let you know if something happens.

geduldig commented 1 year ago

Can you please package your code into a single file so that I can run it? I shouldn't have to make any edits to run it. This way I can compare apples to apples.

alexge233 commented 1 year ago

Yes no problem, I’ll upload it in a bit.

On 8 Nov 2022, at 15:03, geduldig @.***> wrote:

Can you please package your code into a single file so that I can run it? I shouldn't have to make any edits to run it. This way I can compare apples to apples.

— Reply to this email directly, view it on GitHub https://github.com/geduldig/TwitterAPI/issues/210#issuecomment-1307357652, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABO73AZPNETSJ2Y373OGB23WHJTTNANCNFSM5NAREUFA. You are receiving this because you modified the open/close state.

alexge233 commented 1 year ago

Right, here's a trimmed down version as requested. It does exactly what I do, but it is slimmed down so no decorators, RabbitMQ, or classes. It is based off of your examples, and it should be streaming exactly the same tweets. I've tested it locally and it runs ok for now.

import os
import json
from TwitterAPI import TwitterAPI
from TwitterAPI import TwitterOAuth
from TwitterAPI import TwitterRequestError
from TwitterAPI import TwitterConnectionError
from TwitterAPI import HydrateType

users = ["binance","monero","Forbes","coinbase","CoinMarketCap","krakenfx","markets","business","Financialtimes","FT","TechCrunch","WSJProAI","WSJ","WSJBusiness","wallstreetbets","WSBConsensus","BBGNewEconomy","nytimes","BBCBusiness","ReutersBiz","FinancialJuice","StockMKTNewz","breakingmkts"]
EXPANSIONS   = 'author_id'
TWEET_FIELDS = 'author_id,created_at,entities,id,lang,public_metrics,source,text'
USER_FIELDS  = 'created_at,description,location,name,username'

o = TwitterOAuth.read_file()
api = TwitterAPI((o.consumer_key, o.consumer_secret, auth_type='oAuth2', api_version='2')

try:
    r = api.request('tweets/search/stream', {
                'expansions': EXPANSIONS,
                'tweet.fields': TWEET_FIELDS,
                'user.fields': USER_FIELDS },hydrate_type=HydrateType.APPEND)

    for user in users:
        keys = f"from:{user} lang:en"
        r = api.request('tweets/search/stream/rules',{'add': [{'value': keys}]})
        print(f'[{r.status_code}] RULE ADDED: {r.text}')
        if r.status_code != 201: exit()

    # GET STREAM RULES
    r = api.request('tweets/search/stream/rules', method_override='GET')
    print(f'[{r.status_code}] RULES: {r.text}')
    if r.status_code != 200: exit()

    # START STREAM
    r = api.request('tweets/search/stream')
    print(f'[{r.status_code}] START...')
    if r.status_code != 200: exit()
    for item in r:
        print(item)

except TwitterRequestError as e:
    print(e.status_code)
    for msg in iter(e):
        print(msg)

except TwitterConnectionError as e:
    print(e)

except Exception as e:
    print(e)
geduldig commented 1 year ago

That's a bit too much code. We are trying to determine if there is an issue with streaming. Please either extract the minimal amount of code that exhibits the issue, or work from the streaming example I provided here: https://github.com/geduldig/TwitterAPI/blob/master/examples/v2/stream_tweets.py

alexge233 commented 1 year ago

That's a bit too much code. We are trying to determine if there is an issue with streaming. Please either extract the minimal amount of code that exhibits the issue, or work from the streaming example I provided here: https://github.com/geduldig/TwitterAPI/blob/master/examples/v2/stream_tweets.py

Made an edit in previous post

geduldig commented 1 year ago

Much appreciated.

geduldig commented 1 year ago

Looking at your test code, I see one change you need to make if you want this to run "forever."

You should put the entire try block inside a while True block, and add contrinue or break statements inside the except blocks.

I also added some code to inspect each returned item, in case the item is not a tweet.

import os
import json
from TwitterAPI import TwitterAPI
from TwitterAPI import TwitterOAuth
from TwitterAPI import TwitterRequestError
from TwitterAPI import TwitterConnectionError
from TwitterAPI import HydrateType

users = ["binance","monero","Forbes","coinbase","CoinMarketCap","krakenfx","markets","business","Financialtimes","FT","TechCrunch","WSJProAI","WSJ","WSJBusiness","wallstreetbets","WSBConsensus","BBGNewEconomy","nytimes","BBCBusiness","ReutersBiz","FinancialJuice","StockMKTNewz","breakingmkts"]
EXPANSIONS   = 'author_id'
TWEET_FIELDS = 'author_id,created_at,entities,id,lang,public_metrics,source,text'
USER_FIELDS  = 'created_at,description,location,name,username'

o = TwitterOAuth.read_file()
api = TwitterAPI(o.consumer_key, o.consumer_secret, auth_type='oAuth2', api_version='2')

# ADD STREAM RULES
for user in users:
    keys = f"from:{user} lang:en"
    r = api.request('tweets/search/stream/rules',{'add': [{'value': keys}]})
    print(f'[{r.status_code}] RULE ADDED: {r.text}')
    if r.status_code != 201: exit()

# GET STREAM RULES
r = api.request('tweets/search/stream/rules', method_override='GET')
print(f'[{r.status_code}] RULES: {r.text}')
if r.status_code != 200: exit()

while True:
    try:
        r = api.request(
                'tweets/search/stream', 
                {'expansions': EXPANSIONS, 'tweet.fields': TWEET_FIELDS, 'user.fields': USER_FIELDS },
                hydrate_type=HydrateType.APPEND)

        # START STREAM
        print(f'[{r.status_code}] START...')
        if r.status_code != 200: exit()
        for item in r:
            if 'data' in item:
                print(item['data']['text'])
            elif 'warning' in item:
                print(f'>>> WARNING - {item["warning"]}')
            else:
                print(f'>>> UNKNOWN ITEM - {item}')

    except TwitterRequestError as e:
        print(e.status_code)
        for msg in iter(e):
            print(f'>>> TWITTER REQUEST ERROR - {msg}')
        break

    except TwitterConnectionError as e:
        print(f'>>> TWITTER CONNECTION ERROR - {e}')
        continue

    except KeyboardInterrupt:
        break

    except Exception as e:
        print(f'>>> UNEXPECTED EXCEPION - {e}')
        break
alexge233 commented 1 year ago

Thanks for your feedback! I’ll try it tomorrow. I’m wondering if the connection closes automatically once the stream data is consumed, and that’s what causes the errors. I’ll dig a bit deeper into requests to see if that’s the case. I’ll report back tomorrow. Did you get any errors whilst running this?

geduldig commented 1 year ago

It's been continuously running for day and no errors.

alexge233 commented 1 year ago

Ok, do you want to close the issue then? I'll make the change you suggested, and in case of an error I'll try to print the amount connections open. There's not much else I can think of

geduldig commented 1 year ago

My code is still running. It disconnected once with the following error, then restarted without issue:

>>> TWITTER CONNECTION ERROR - ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))

Have you tried my code?

alexge233 commented 1 year ago

I’m migrating VPS, Once it’s done I’ll let it run and report back.

On 11 Nov 2022, at 18:55, geduldig @.***> wrote:



My code is still running. It disconnected once with the following error, then restarted without issue:

TWITTER CONNECTION ERROR - ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))

Have you tried my code?

— Reply to this email directly, view it on GitHubhttps://github.com/geduldig/TwitterAPI/issues/210#issuecomment-1312071912, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABO73A232BR3SKILVPSKEJLWH2JBDANCNFSM5NAREUFA. You are receiving this because you modified the open/close state.Message ID: @.***>

alexge233 commented 1 year ago

Ok, it's live with the change you've suggested. I'll let you know of any errors.

alexge233 commented 1 year ago

Six days in, and I've just seen the first error:

{'title': 'operational-disconnect', 'disconnect_type': 'OperationalDisconnect', 'detail': 'This stream has been disconnected for operational reasons.', 'type': 'https://api.twitter.com/2/problems/operational-disconnect'}

I'm gonna try and add a close for the connection before I reconnect. I want to see if this will stop the error from popping up again.

geduldig commented 1 year ago

Where in the code did that come from? >>> UNKNOWN ITEM?

alexge233 commented 1 year ago

My error logger which captures the exceptions and errors raised

geduldig commented 1 year ago

Sounds like you are not using the code I gave you. We need to track down where the exception arose.

alexge233 commented 1 year ago

I am using your version of the code, but I also have error loggers capturing errors, because this script works in tandem with various other processes. This specific error in comes from the following lines, and is captured from the response header and message:

        while True:
            r = self.api.request('tweets/search/stream', {
                                 'expansions': EXPANSIONS,
                                 'tweet.fields': TWEET_FIELDS,
                                 'user.fields': USER_FIELDS },
                         hydrate_type=HydrateType.APPEND)

            logger.info(f'[{r.status_code}] starting twitter streaming')
            if r.status_code != 200:
                logger.error(f'Stream.request_error {r.status_code} => {r.text}')

                if r.status_code == 429:
                    logger.error(
                        f'Stream.Operational Disconnect => {r.status_code} && {r.text}')

                else:
                    logger.error(
                        f'Stream.Unknown Disconnect => {r.status_code} && {r.text}')
            else:
                for item in r:
                    self.callback(item)

I'm inclined to dump the stream information on stdout when this happens

geduldig commented 1 year ago

Which line in the code above logged the error? Did you remove the try/except blocks?

alexge233 commented 1 year ago

Lines

if r.status_code == 429:
    logger.error(f'Stream.Operational Disconnect => {r.status_code} && {r.text}')

I'm catching the exceptions in a decorator. Basically, it is an error raised by twitter's end by sending me a 429. I've not seen anything else, it just randomly sends me this status code. This time it took 6 days before I saw it.

I've rerun the script, only now logger is set to DEBUG so I can see a lot more information. I'm hoping this will shed some light into what's going on.

geduldig commented 1 year ago

One more question. I don't see the try/except blocks in your code above. Did you remove them?

alexge233 commented 1 year ago

No there’s a decorator wraps the entire function in try catch. It just handles them externally of the function but I’m happy to put them back in if you think that will make a difference.

On 18 Nov 2022, at 12:35, geduldig @.***> wrote:



One more question. I don't see the try/except blocks in your code above. Did you remove them?

— Reply to this email directly, view it on GitHubhttps://github.com/geduldig/TwitterAPI/issues/210#issuecomment-1319939349, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABO73AYL563HDMRMEQRTWJTWI5ZXLANCNFSM5NAREUFA. You are receiving this because you modified the open/close state.Message ID: @.***>

geduldig commented 1 year ago

If you are not using the code I provided, it makes it very difficult for me to help you debug this.

That said, it looks like when you call the request method Twitter is disconnecting due to some internal issue that is not immediately resolvable by Twitter. Then, you probably are trying to immediately reconnect multiple times and too frequently before the issue has been resolved. Instead you should employ a "back off" strategy. See https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections

geduldig commented 1 year ago

I just got the 429 with my code. Give me a little time. I think I have something I can work with.

alexge233 commented 1 year ago

I’ve been using the progressive back off for months now. Doesn’t seem to trigger 429, the error appears randomly, often after days of no issues. I’m not even sure it’s something on the client end at this point. I’m really hoping that running the logger with Debug on will provide some more insights. If you want I can copy an isolated minimal version of the error handling so you know what the differences are

On 18 Nov 2022, at 17:23, geduldig @.***> wrote:



I just got the 429 with my code. Give me a little time. I think I have something I can work with.

— Reply to this email directly, view it on GitHubhttps://github.com/geduldig/TwitterAPI/issues/210#issuecomment-1320313723, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABO73A52WB6A5JKVPAX2BXDWI63R5ANCNFSM5NAREUFA. You are receiving this because you modified the open/close state.Message ID: @.***>

geduldig commented 1 year ago

Here is what is happening in my code that causes the 429 error:

  1. While reading in the stream iterator loop, I get a read timeout error (urllib3.exceptions.ReadTimeoutError).
  2. This gets caught as a TwitterConnectionError , sends execution to the start of the While loop
  3. A stream re-connection is attempted with request(), which fails (requests.exceptions.ConnectionError)
  4. After many (over 4,000 in an interval of roughly a minute) re-connection attempts, request() returns a 429 error.

In my case, I think what is required is the 5 second doubling "back off" strategy when TwitterConnectionError is caught. This would occur in 2 and 3 above. For 4, I would use the 1 minute doubling "back off" strategy.

I will implement this in my code and test. If it works I'll publish the code, and you can either try it out or translate it into your code.

alexge233 commented 1 year ago

Hi there, did you have any time to write a patch? I got a trigger of the event which is different to yours, mine was ConnectionBroken IncompleteRead(0 bytes read) but I didn't see a TimeoutError

geduldig commented 1 year ago

I'm trying to build a working example along the lines of the link above -- Twitter's doc on handling disconnections. The problem is disconnections take several days to occur. In fact, I got the error only today since my last posted comment. Unfortunately, there was a bug in my code, so I am re-running and waiting (probably another week) for a disconnect error.

I'm not sure if this requires a patch. It probably doesn't. I think what is required is simply following the guidelines in that link. Have you tried doing that yourself? If you haven't, then I can guarentee that your code won't be able to recover from a disconnect. My last comment includes the recipe for backing off.

I should have a working example to give you once I get all the bugs out of my code. It might take another week or two.

geduldig commented 1 year ago

One more thing. I just reread your code above which has sleep(60). That is incorrect for the two cases, 429 and all others.

alexge233 commented 1 year ago

Hey, sorry for the late reply. First of all thank you very much for your help, no worries if you don't currently have the time for this.

Yes I've read the link as well as other links that relate to this. What mostly concerned me was the error I was getting with a message This stream is currently at the maximum allowed connection limit as this hints towards Twitter seeing many connections from me, when I only expected one. I get that they may occasionally disconnect for a variety of reasons. The sleep you've seen is only part of the code, there's a retry decorator I've been using with an exponential back-off. It works, because I could reconnect, but obviously it's not ideal.

I'm not concerned about disconnects that have to do with their service end, e.g., maintenance, etc. I'm concerned about disconnects because they see many connections from me, since that causes me to wait longer and longer times, meaning I may lose tweets I need to consume real-time or near real-time.

At this point I'm inclined to avoid the while loop, and treat each stream as a finite connection. Read as much, close it, wait for a minute, then start over, using some kind of event loop. If an error occurs, then the only thing that would change is the event loop would wait longer. I understand why that's not really a TwitterAPI issue, and that I should be taking care of it on my end. In any case I do appreciate any examples or hints you may have!

geduldig commented 1 year ago

I've been running my code for over a week, seen a bunch of disconnects. The code is able to recover and reconnect. So far, I have not seen any 429. Try running my code as is, and see if you get a 429. You shouldn't. The code prints tweets and errors to the console, and it prints errors also to a log file which you can check periodically.

geduldig commented 1 year ago
from TwitterAPI import (TwitterAPI, 
                        TwitterOAuth, 
                        TwitterRequestError, 
                        TwitterConnectionError, 
                        HydrateType)

EXPANSIONS   = 'author_id'
USER_FIELDS  = 'created_at,description,location,name,username'
TWEET_FIELDS = 'author_id,created_at,entities,id,lang,public_metrics,source,text'

##
## SET UP LOGGING TO FILE AND TO CONSOLE
##

import logging
formatter = logging.Formatter('%(levelname)s %(asctime)s %(message)s', '%m/%d/%Y %I:%M:%S %p')
fh = logging.FileHandler('stream_forever.log')
sh = logging.StreamHandler()
fh.setFormatter(formatter)
sh.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(fh)
logger.addHandler(sh)

##
## AUTHENTICATE WITH TWITTER 
##

o = TwitterOAuth.read_file()
api = TwitterAPI(o.consumer_key, o.consumer_secret, auth_type='oAuth2', api_version='2')

##
## STREAMING RULES 
##

# ADD RULES (NO HARM RE-ADDING, BUT ONCE IS ENOUGH)
# users = ["binance","monero","Forbes","coinbase","CoinMarketCap","krakenfx","markets",
#          "business","Financialtimes","FT","TechCrunch","WSJProAI","WSJ","WSJBusiness",
#          "wallstreetbets","WSBConsensus","BBGNewEconomy","nytimes","BBCBusiness",
#          "ReutersBiz","FinancialJuice","StockMKTNewz","breakingmkts"]
# for user in users:
#   keys = f"from:{user} lang:en"
#   r = api.request('tweets/search/stream/rules', {'add': [{'value': keys}]})
#   print(f'[{r.status_code}] RULE ADDED: {r.text}')
#   if r.status_code != 201: exit()

# PRINT RULES
r = api.request('tweets/search/stream/rules', method_override='GET')
print(f'\n[{r.status_code}] RULES:')
if r.status_code != 200: exit()
for item in r:
    print(item['value'])

##
## "BACK OFF" STRATEGY FOR DISCONNECTS AND RATE LIMITS
## USE 60 SECOND INTERVAL FOR HTTP 429 ERRORS
## USE 5 SECOND INTERVAL FOR ALL OTHER HTTP ERRORS
## REFER TO: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections
##

from time import sleep

backOffCount = 0

def backOff(interval):
    global backOffCount, logger
    seconds = pow(2, backOffCount) * interval
    backOffCount = min(backOffCount + 1, 6)
    logger.info(f'Back off {seconds} seconds')
    sleep(seconds)

##
## STREAM FOREVER...
##

while True:
    try:
        # START STREAM
        logger.info('START STREAM...')
        r = api.request('tweets/search/stream', 
            {
                'expansions': EXPANSIONS, 
                'user.fields': USER_FIELDS,
                'tweet.fields': TWEET_FIELDS 
            },
            hydrate_type=HydrateType.APPEND)

        if r.status_code == 200:
            logger.info(f'[{r.status_code}] STREAM CONNECTED')
            backOffCount = 0
        else:
            logger.info(f'[{r.status_code}] FAILED TO CONNECT. REASON:\n{r.text}')
            backOff(60 if r.status_code == 429 else 5)            
            continue

        for item in r:
            if 'data' in item:
                print(item['data']['text'])
            elif 'errors' in item:
                logger.error(item['errors'])
            else:
                # PROBABLY SHOULD NEVER BRANCH TO HERE
                logger.warning(f'UNKNOWN ITEM TYPE: {item}')

    except TwitterConnectionError:
        backOff(5)            
        continue

    except TwitterRequestError:
        break

    except KeyboardInterrupt:
        break

    finally:
        logger.info('STREAM STOPPED')
alexge233 commented 1 year ago

I've been running the previous version of the code, and haven't seen any 429, only some disconnects, and a couple of 505 errors. I'll report back in a week. I'm running with debug on so all I see is this:

2022-11-29 19:17:56 oneshot messaging.twitter[1339838] INFO [200] starting twitter streaming
2022-12-01 16:17:42 oneshot root[1339838] WARNING <class 'str'> Twitter stream stalled
2022-12-01 16:17:42 oneshot messaging.error[1339838] ERROR Twittter connection_error => Twitter stream stalled
2022-12-01 16:18:47 oneshot urllib3.connectionpool[1339838] DEBUG Starting new HTTPS connection (1): api.twitter.com:443
2022-12-01 16:18:47 oneshot urllib3.connectionpool[1339838] DEBUG https://api.twitter.com:443 "GET /2/tweets/search/stream?expansions=author_id&tweet.fields=author_id%2Ccreated_at%2Centities%2Cid%2Clang%2Cpublic_metrics%2Csource%2Ctext&user.fields=created_at%2Cdescription%2Clocation%2Cname%2Cusername HTTP/1.1" 200 None
2022-12-01 16:18:47 oneshot messaging.twitter[1339838] INFO [200] starting twitter streaming
2022-12-02 03:37:09 oneshot root[1339838] WARNING <class 'urllib3.exceptions.ProtocolError'> ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
2022-12-02 03:37:09 oneshot messaging.error[1339838] ERROR Twittter connection_error => ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
2022-12-02 03:38:14 oneshot urllib3.connectionpool[1339838] DEBUG Starting new HTTPS connection (1): api.twitter.com:443
2022-12-02 03:38:15 oneshot urllib3.connectionpool[1339838] DEBUG https://api.twitter.com:443 "GET /2/tweets/search/stream?expansions=author_id&tweet.fields=author_id%2Ccreated_at%2Centities%2Cid%2Clang%2Cpublic_metrics%2Csource%2Ctext&user.fields=created_at%2Cdescription%2Clocation%2Cname%2Cusername HTTP/1.1" 200 None
2022-12-02 03:38:15 oneshot messaging.twitter[1339838] INFO [200] starting twitter streaming
2022-12-02 12:09:00 oneshot root[1339838] WARNING <class 'urllib3.exceptions.ProtocolError'> ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
2022-12-02 12:09:00 oneshot messaging.error[1339838] ERROR Twittter connection_error => ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
2022-12-02 12:10:05 oneshot urllib3.connectionpool[1339838] DEBUG Starting new HTTPS connection (1): api.twitter.com:443
2022-12-02 12:10:06 oneshot urllib3.connectionpool[1339838] DEBUG https://api.twitter.com:443 "GET /2/tweets/search/stream?expansions=author_id&tweet.fields=author_id%2Ccreated_at%2Centities%2Cid%2Clang%2Cpublic_metrics%2Csource%2Ctext&user.fields=created_at%2Cdescription%2Clocation%2Cname%2Cusername HTTP/1.1" 200 None
2022-12-02 12:10:06 oneshot messaging.twitter[1339838] INFO [200] starting twitter streaming
2022-12-02 16:19:12 oneshot root[1339838] WARNING <class 'str'> Twitter stream stalled
2022-12-02 16:19:12 oneshot messaging.error[1339838] ERROR Twittter connection_error => Twitter stream stalled
2022-12-02 16:20:17 oneshot urllib3.connectionpool[1339838] DEBUG Starting new HTTPS connection (1): api.twitter.com:443
2022-12-02 16:20:17 oneshot urllib3.connectionpool[1339838] DEBUG https://api.twitter.com:443 "GET /2/tweets/search/stream?expansions=author_id&tweet.fields=author_id%2Ccreated_at%2Centities%2Cid%2Clang%2Cpublic_metrics%2Csource%2Ctext&user.fields=created_at%2Cdescription%2Clocation%2Cname%2Cusername HTTP/1.1" 200 None
2022-12-02 16:20:17 oneshot messaging.twitter[1339838] INFO [200] starting twitter streaming

I think those are to be expected as the connection is expected to occasionally disconnect. I'm also happy that this seems like it is disconnecting and then reconnecting after one minute only.

geduldig commented 1 year ago

Yup, those disconnections are to be expected.

geduldig commented 1 year ago

I believe this has been solved. I added a new example stream_forever similar to the one above. I can't review your code, but let me know if the code I have posted has any issues and I will look into them. I have not seen any 429 errors, and all disconnects get handled and re-connect on their own.