g8rswimmer / go-twitter

This is a go library for twitter v2 API integration.
MIT License
315 stars 61 forks source link

Connection Lost consistently On Filtered tweet stream #141

Closed saminahbab closed 2 years ago

saminahbab commented 2 years ago

I am trying to start a real time ongoing stream with one rule. Creation of the rule does not throw errors, and the rule registers.

My main stream code looks like this:

client := &twitter.Client{
        Authorizer: Authorize{
            Token: config.TWITTER_TOKEN,
        },
        Client: http.DefaultClient,
        Host:   "https://api.twitter.com",
    }

    options := twitter.TweetSearchStreamOpts{
        TweetFields: []twitter.TweetField{
            twitter.TweetFieldCreatedAt,
            twitter.TweetFieldEntities,
            twitter.TweetFieldAuthorID,
            twitter.TweetFieldText,
            twitter.TweetFieldID,
            twitter.TweetFieldConversationID,
        },
        UserFields: []twitter.UserField{
            twitter.UserFieldDescription,
            twitter.UserFieldURL,
            twitter.UserFieldName,
            twitter.UserFieldVerified,
        },
        Expansions: []twitter.Expansion{},
    }

    tweetStream, err := client.TweetSearchStream(ctx, options)
    if err != nil {
        return nil, err
    }

    te.Stream = tweetStream

Here is the code to retrieve the tweets:

for {
        select {

        case tm := <-te.Stream.Tweets():
            for _, tweet := range tm.Raw.Tweets {
                message := ParseMessage(tweet)
                channel <- message
            }

        case sm := <-te.Stream.SystemMessages():
            smb, _ := json.Marshal(sm)
            log.Println("System Message")
            log.Println(smb)

        case strErr := <-te.Stream.Err():
            log.Println("Stream Error")
            log.Println("Error -> ", strErr)
            break

        case shouldBreak, _ := <-te.ShutdownChannel:
            // Graceful Exit signal
            if shouldBreak {
                break
            }

        default:
        }

        if te.Stream.Connection() == false {
            fmt.Println("connection lost")
            break
        }
    }

The problem, within thirty seconds of starting the program, the streamer returns "connection lost" and exits. This happens every time. Have you seen similar behaviour before?

g8rswimmer commented 2 years ago

@saminahbab Do you have a timeout on the context that you are passing? I have not had a timeout after 30 seconds, but documentation on the streaming API does say that the connection could be lost. https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data

saminahbab commented 2 years ago

No I do not have a timeout. I use context.Background() Also, just now tried to use the example script provided with my token.

I get the same error

> go run .
Callout to tweet search stream callout
connection lost

So a bit confused as to how to keep the connection persistent. Can I please confirm that this does not happen for you, with the example script?

saminahbab commented 2 years ago

Just checked but TweetSampleStream works as expected.. will debug further today EDIT: Just found this:

2022/05/26 08:53:22 twitter callout status 429 ConnectionException:This stream is currently at the maximum allowed connection limit.

despite exiting the code every time I see the connection lost. Is there potentially a code bug here that loses the connection erroneously, but it is still somehow active? despite the program closing?

BerndCzech commented 2 years ago

Hi there, I have the same connection lost issue. I will also have another look within the next week. But anyways the best v2 API client i came across. gj!

g8rswimmer commented 2 years ago

Something to remember is that the stream APIs from twitter are using long lived REST connections. Those connections may be disconnected (it is also subject to rate limiting as well). Documentation here.

I believe that the streaming API as a back fill option to allowing for stream to pickup where it left off.

If you can provide some examples of what you are seeing, I could try and reproduce but without anything like that, we might end up with the old it works on my machine.

saminahbab commented 2 years ago

No I do not have a timeout. I use context.Background() Also, just now tried to use the example script provided with my token.

I get the same error

> go run .
Callout to tweet search stream callout
connection lost

So a bit confused as to how to keep the connection persistent. Can I please confirm that this does not happen for you, with the example script?

@g8rswimmer I have mentioned here that the example script that I linked leads to the same kind of connection lost issues, would you expect that the stream APIs from twitter maybe disconnected in the same twitter disconnections that you mention? If you could confirm that the example script works for you without being disconnected within a minute consistently, that would be a good start for me (as it is not working for me).

But also as @BerndCzech says, great job on this client, has been nice to code with. Best one I have come across too!

EDIT: i meant this link: https://github.com/g8rswimmer/go-twitter/blob/master/v2/_examples/tweets/filtered-stream/tweet-search-stream/main.go

BerndCzech commented 2 years ago

@g8rswimmer you can clone my repo if you want. (No Bearer Token included ;) )

What I did:

WARN[2022-05-31T21:44:59+02:00] client got disconnected                      
WARN[2022-05-31T21:45:00+02:00] client got disconnected                      
INFO[2022-05-31T21:45:01+02:00] tweet: {"Raw":{"data":[{"id":"1531723395322327040","text":"Hi #Sarstedt"}]}}

WARN[2022-05-31T21:45:17+02:00] client got disconnected                      
WARN[2022-05-31T21:45:36+02:00] client got disconnected   
...                                 
WARN[2022-05-31T21:45:38+02:00] client got disconnected                      
WARN[2022-05-31T21:45:39+02:00] client got disconnected                      
INFO[2022-05-31T21:45:40+02:00] tweet: {"Raw":{"data":[{"id":"1531723561471287296","text":"Hi #Sarstedt again"}]}}

WARN[2022-05-31T21:45:57+02:00] client got disconnected                      
WARN[2022-05-31T21:46:00+02:00] client got disconnected                      
^CFATA[2022-05-31T21:46:01+02:00] shut downterminated streaming: context canceled 

Maybe you can make some sense out of it. Anyways with the 1 second retry it works for me in practice.

Cheers,

saminahbab commented 2 years ago

Hello @g8rswimmer, Can I confirm that the above solution would be the intended way to use the stream, as proposed by @BerndCzech ? IE

time.Sleep(time.Second)
//tweetStream, err = client.TweetSearchStream(ctx, opts)
//if err != nil {
//  return errors.Errorf("tweet sample callout error: %v", err)
//}

So just sleep for a second, do not retry to reconnect (code is commented out), and just carry on with the loop and tweets will arrive?

The reason I ask is because your sample code shows this:

            default:
            }
            if tweetStream.Connection() == false {
                fmt.Println("connection lost")
                return
            }

suggesting that you do need to handle the reconnection in some kind of way. If so, could you suggest a reasonable way to handle the reconnection without hitting ratelimits and using the backfill?

Thank you for your time!

ghost commented 2 years ago

@g8rswimmer @BerndCzech Heartbeat timeout may be too short. According to Twitter API v2 docs: "The endpoint provides a 20-second keep alive heartbeat (it will look like a new line character). ".
So it seems like the constant keepAliveTO = 11 * time.Second is too short. It's better to wait at least 20s before decide that we are disconnected. Set constant keepAliveTO more than 20 seconds and try to reconnect instead of simply break may help.

g8rswimmer commented 2 years ago

@calehh Of course twitter is not updating their documentation or making sure that it references the updated document. This was the doc that I used, which states a 10 second heartbeat.