twitter / hbc

A Java HTTP client for consuming Twitter's realtime Streaming API
https://developer.twitter.com/en/docs/tweets/filter-realtime/overview
Apache License 2.0
962 stars 373 forks source link

Shutdown problem #141

Closed Jakob-Bach closed 2 years ago

Jakob-Bach commented 9 years ago

Hi, I'm using hbc-twitter4j and recognized two problems with the shutdown process. If the stream does not deliver many messages (e.g. if you just type a nonsense string as term or a quite unpopular hashtag) and especially if there are many message-queue consuming threads (depending on the ExecutorService you have set in the Twitter4jStatusClient constructor) the stop() method of the Twitter4jStatusClient does not work properly - the client thread fails to finish in time (log output) and the application might even run on because of the background threads which consume the queue. This is because:

1) The queue-consuming threads which are started by BaseTwitter4jClient#process() use a blocking take() method without timeout and therefore might wait infinite time until they check the isDone() flag, realize the client has been shut down and can terminate. This problem is intensified if you use more queue-consuming threads, because each one needs a message from the queue to terminate. If you don't choose to make the queue-consuming threads daemons, they hold the application alive while waiting for messages, even if the main() method (or whatever started and stopped the client) is finished.

2.) The ClientBase Runnable object uses the AbstractProcessor#process() respectively AbstractProcessor#processNextMessage() method to read from the stream. The read operation depends on the used subclass, but is also blocking. Therefore ClientBase#processConnectionData() might not be able to check the isDone() flag in time (of no new messages are on the stream) and ClientBase#stop(int) logs "Client thread failed to finish".

Both issues could be handled with a timeout: 1) Do not use the take() method to consume the message queue in BaseTwitter4jClient#process() (or any subclass), but poll(long, TimeUnit) instead and call parseMessage(msg) only if msg != null.

while (!client.isDone()) {
    /*
    * Wait only a limited amount of time, so the is Done()
    * check is performed in certain intervals and has not to
    * wait for a message arriving.
    */
    try {
        String msg = AdvancedTwitter4jStatusClient.this.getMyMessageQueue().poll(AdvancedTwitter4jStatusClient.this.getConnectionCheckupInterval(), TimeUnit.MILLISECONDS);
        if (msg != null) {
            parseMessage(msg);
        }
    } catch (Exception e) {
        onException(e);
    }

2) Wrap the result of AbstractProcessor#processNextMessage() in a Future for the AbstractProcessor#process() method :

Callable<String> messageCallable = () -> {
    String message = processNextMessage();
    while (message == null) {
        message = processNextMessage();
    }
    return message;
};
try {
    // Either result returned and added to queue or timeout.
    Future<String> messageFuture = this.getReadMessageExectuor().submit(messageCallable);
    String message = messageFuture.get(this.getMessageReadTimeout(), TimeUnit.MILLISECONDS);
    return this.queue.offer(message, this.offerTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
    if (e instanceof TimeoutException) {
        // TimeoutException is thrown if no new message arrived in time,
        // no problem for us
        return false;
    } else if (...) {
        ...
    }

Is there any reason why those issues are not implemented with a timeout?

kevinoliver commented 9 years ago

Thanks for the report. To be honest, none of our use cases have required clean shutdowns so we haven't focused on it at all. We'd be open to a pull request to fix it though.

YousefED commented 9 years ago

+1, @Jakob2014uafmx are you planning on submitting your changes as a PR or would you like me to?

Jakob-Bach commented 9 years ago

You can do a PR if you like (I do not plan to do it), but the 2nd fix from above does not work as assumed (it has problems with streams that contain only a few tweets, because the executor does not finish the old threads in that case [an explicit cancel() also does not help], so the new ones are queued). A better way would be (because the I/O operation somewhere deep down in the code seems to be non-blocking, so processNextMessage() returns fast)

public boolean process() throws IOException, InterruptedException {
    long stopTime = System.currentTimeMillis() + this.getMessageReadTimeout();
    String msg = processNextMessage();
    while (msg == null && System.currentTimeMillis() < stopTime) {
        msg = processNextMessage();
    }
    if (msg != null) {
        return this.queue.offer(msg, offerTimeoutMillis, TimeUnit.MILLISECONDS);
    } else {
        return false;
    }
}

(by the way, if you use the StatsTracker, it counts that a message has been dropped if the timeout occurs)

Regarding fix 1) Of course the first fix should be done in BaseTwitter4jClient and not by overriding (so this unnecessary field "myMessageQueue" would not needed, I only used it because the original field is private)

jayv commented 8 years ago

We (https://partners.twitter.com/livefyre) reconnect all the time to update the filter stream query in our processing pipeline and run into HTTP420's all the time on connections with little traffic due to this bug!! @kevinoliver what is the status on this, should we bump priority via our partner contact or ...?

kevinoliver commented 8 years ago

@jayv sorry, I haven't worked on this project in quite a while. sounds like your best bet is to escalate to your contact.

jayv commented 8 years ago

I've made a workaround based on v2.2.0 with minimal code changes. I have a testcase (not included) that proves that since this fix there are no hosebird-client-io-thread threads around after shutting down on connections with no activity.

https://github.com/jayv/hbc/commit/e97f8402b7d083018805ac75c0366a216d20219f