confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
123 stars 896 forks source link

Consumer disconnects after polling for about an hour #12

Closed mikesparr closed 8 years ago

mikesparr commented 8 years ago

Although I expect data flowing pretty constantly, I tested consumer polling after topic reached EOF and in about an hour it disconnected. I did not set the timeout parameter on the poll() method so expect it to keep polling indefinitely. Is there some ENV setting for librdkafka or something to prevent premature disconnects?

2016-07-03 18:34:26,177 - DEBUG - stream_consumer.pyc - rets.nnrmls.Property [1] reached end at offset 67
%3|1467592759.828|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467592759.828|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467592949.425|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467592949.425|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467593066.174|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467593066.174|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467593549.569|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467593549.569|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467593666.255|FAIL|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467593666.255|ERROR|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467593849.440|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467593849.440|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467594149.695|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467594149.695|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467594266.302|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467594266.302|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467594749.356|FAIL|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467594749.356|ERROR|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467594749.742|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467594749.742|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467594866.341|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467594866.341|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467595349.819|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467595349.819|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467595466.452|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467595466.452|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
2016-07-03 19:25:06,541 - ERROR - stream_consumer.pyc - Error polling messages.

Implementation code:

class StreamConsumer:
...

    def consume(self, topics, auto_offset='latest', processor=_printer, *args):
        """Connects to topic and listens for messages, handing them off to processor"""

        logger = self.__get_logger()

        try: 
            running = True

            conf = {
                'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
                'group.id': self.group_id,
                'default.topic.config': {'auto.offset.reset': auto_offset}
            }

            try:
                c = kafka.Consumer(**conf)
            except:
                logger.error( "Error creating Consumer with config [{}]".format(conf) )

            try:
                if auto_offset == 'earliest' or auto_offset == 'smallest':
                    c.subscribe(topics, on_assign=_on_assign)
                else:
                    c.subscribe(topics)
            except:
                logger.error( "Error subscribing to topics [{}]".format(topics) )
                c = None

            if c:
                logger.info( "Starting to poll topics [{}]...".format(topics) )
                while running:
                    try:
                        msg = c.poll()
                        if not msg.error():
                            processor(msg)
                        elif msg.error().code() == kafka.KafkaError._PARTITION_EOF:
                            # End of partition event
                            logger.debug( "{} [{}] reached end at offset {}".format(
                                            msg.topic(), msg.partition(), msg.offset()) )
                        else:
                            logger.debug( "Unknown error [{}]. Quitting...".format(msg.error()) )
                            raise kafka.KafkaException(msg.error())
                    except:
                        running = False
                        logger.error( "Error polling messages." )
                # end while running loop
            else:
                logger.error( "Consumer object missing. Nothing to do!" )

            try:
                c.close() # close connection
            except:
                logger.warn( "Could not close connection (c). Nothing to do!" )

        except Exception, e:
            logger.error( "Error consuming topics [{}]".format(topics) )
            logger.debug( traceback.format_exc() )

        return None

I leveraged the suggested _on_assign syntax suggested in #11 (thanks again) if I decided to request --from-beginning upon startup.

def _on_assign (c, ps):
    """Resets the consumer offset"""
    for p in ps:
        p.offset=-2
    c.assign(ps)

Any ideas why it disconnected after an hour of inactivity after EOF? Thanks!

edenhill commented 8 years ago

Hi,

the disconnect intervals (900 and 1200 seconds) look artificial, so I think you are seeing the broker's idle connection reaper in action, or alternatively some firewall/nat/etc device timing out the sessions.

Since it only kills idle connections these logs can probably be ignored, there is even a log.connection.close=false client property that silences these logs for that reason.

But are you saying that the consumer stops working when this happens?

mikesparr commented 8 years ago

I need to investigate more because my consumers stopped. 2 of 3 consumer scripts that use same consumer class I shared in ticket stopped but third still running, which implies not NAT or network.

I'll see if I can force it to repeat and root out cause but wondered what settings, if any, might play a role. Am I right that if I don't set a timeout parameter on poll() it will keep running? Any librdkafka settings I need to declare?

Thanks!

Sent from my iPhone

On Jul 4, 2016, at 2:04 PM, Magnus Edenhill notifications@github.com wrote:

Hi,

the disconnect intervals (900 and 1200 seconds) look artificial, so I think you are seeing the broker's idle connection reaper in action, or alternatively some firewall/nat/etc device timing out the sessions.

Since it only kills idle connections these logs can probably be ignored, there is even a log.connection.close=false client property that silences these logs for that reason.

But are you saying that the consumer stops working when this happens?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

edenhill commented 8 years ago

Correct, if you dont set a timeout on poll() it will block indefinitely waiting for a new message. Do note however that callbacks will be served.

It is hard to say what the error might be so I don't have any direct recommendations on configuration, but if you are able to reproduce this in not too long time and think that the consumer is stopping, then I suggest you enable debug=fetch to debug the consumer fetcher. Be warned though it will be quite noisy.

mikesparr commented 8 years ago

Same issue occurring with 3rd program so all 3 of 3 after reaching EOF in partitions closed consumer about an hour since EOF. I cannot trust this client lib if it doesn't stay running like kafka-python so appreciate any/all advice how to disable that "connection reaper".

2016-07-05 10:27:11,050 - DEBUG - stream_producer.pyc - Sending message to topic [etl.snip/] with key [snip/]
2016-07-05 10:27:11,050 - DEBUG - stream_consumer.py - rets.nnrmls.ListingEditEvent [0] reached end at offset 88
2016-07-05 10:27:11,050 - DEBUG - stream_consumer.py - rets.nnrmls.ListingEditEvent [4] reached end at offset 112
2016-07-05 10:27:11,050 - DEBUG - stream_consumer.py - rets.nnrmls.ListingEditEvent [3] reached end at offset 84
2016-07-05 10:27:11,050 - DEBUG - stream_consumer.py - rets.nnrmls.ListingEditEvent [2] reached end at offset 95
2016-07-05 10:27:11,050 - DEBUG - stream_consumer.py - rets.nnrmls.ListingEditEvent [1] reached end at offset 73
%3|1467736631.452|FAIL|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467736631.452|ERROR|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467736631.507|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467736631.507|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467736949.625|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467736949.625|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467737231.613|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467737231.613|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467737549.749|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467737549.749|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467737831.762|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467737831.762|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467738149.863|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467738149.863|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467738431.881|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467738431.881|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467738749.631|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467738749.631|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467739049.562|FAIL|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467739049.562|ERROR|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467739349.567|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467739349.567|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467739649.638|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467739649.638|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467739649.698|FAIL|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467739649.698|ERROR|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
2016-07-05 11:35:13,839 - ERROR - stream_consumer.py - Error polling messages.

Script exits after fails, similar to other scripts, and around an hour from last EOF notice (end of offset).

edenhill commented 8 years ago

After reaching EOF is anything producing new messages to those partitions during that silent hour?

mikesparr commented 8 years ago

No there is not. That is the issue here, is that I'm testing a tiny slice of data (2 days' worth) so once processing done, no new data unless source publishes new data. It's real estate MLS listing data so at night I don't expect any new records for at least 7-8 hours given the work hours of the real estate agents and when they input data into their MLS systems. We don't control when data will come in so our extractor (E) polls their system every 5-15 minutes and only publishes if changes exists. Thereafter a diff processor compares raw record to existing in Cassandra DB cluster and generates a diff object (only the changed fields) in another topic. Thereafter another program consumes those messages and transforms source data to our schema and publishes transformed to topic Logstash slurps up and indexes in Elasticsearch.

Typical ETL but there will be periods of many hours we don't expect new data so need to prevent these timeouts killing the programs.

edenhill commented 8 years ago

Okay, thanks for clarifying that. So from the app's perspective it will seem like the Consumer is doing absolutely nothing if it has reached the EOF for all consumed partitions and no new messages are being produced to those partitions. I.e., there are no new messages or events to return to the application.

You say that the application stops after about an hour, can you explain exactly what stop means in this regard? poll() not returning? Exception throw? Exiting? Something else?

mikesparr commented 8 years ago

I'm trying to figure out why it exits. It does actually exit with status code but haven't figured out what yet. The only case in my loop in my consumer code above to stop the loop would be an unknown exception but I don't see one thrown in logs which is puzzling to me.

The 3 programs managed by Supervisor show "EXITED" and in tailing logs every one of those "EXITED" after those errors in logs above and process stopped. The start command for the programs is python -m appname.modulename -flags and STDOUT & STDERR captured by Supervisor.

screen_shot_2016-07-05_at_1_55_01_pm

edenhill commented 8 years ago

Any chance stderr or stdout is captured from those scripts in some log? What about the exit code, that might give us some hints, is that available somewhere?

mikesparr commented 8 years ago

Checking...

mikesparr commented 8 years ago

Supervisor log shows SIGTERM events, it restarted and again so after X attempts it shows EXITED status.

2016-07-02 22:25:44,749 INFO daemonizing the supervisord process
2016-07-02 22:25:44,749 INFO supervisord started with pid 3065
2016-07-02 22:25:45,753 INFO spawned: 'trifecta-ui' with pid 3066
2016-07-02 22:25:45,755 INFO spawned: 'core-api' with pid 3067
2016-07-02 22:25:45,758 INFO spawned: 'trifecta-cli' with pid 3068
2016-07-02 22:25:45,760 INFO spawned: 'burrow' with pid 3069
2016-07-02 22:25:45,763 INFO spawned: 'photos-nnrmls' with pid 3070
2016-07-02 22:25:45,763 INFO spawned: 'transformer-nnrmls' with pid 3071
2016-07-02 22:25:45,764 INFO spawned: 'extractor-nnrmls' with pid 3077
2016-07-02 22:25:45,765 INFO spawned: 'diffs-nnrmls' with pid 3079
2016-07-02 22:25:45,766 INFO spawned: 'webhooks' with pid 3089
2016-07-02 22:25:46,750 INFO success: trifecta-ui entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,753 INFO success: core-api entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,756 INFO success: trifecta-cli entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,758 INFO success: burrow entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,764 INFO success: photos-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,764 INFO success: transformer-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,764 INFO success: extractor-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,765 INFO success: diffs-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-02 22:25:46,766 INFO success: webhooks entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 09:02:52,563 INFO exited: diffs-nnrmls (exit status 0; expected)
2016-07-03 10:55:57,450 INFO stopped: transformer-nnrmls (terminated by SIGTERM)
2016-07-03 10:55:57,453 INFO stopped: photos-nnrmls (terminated by SIGTERM)
2016-07-03 10:55:58,471 INFO stopped: extractor-nnrmls (terminated by SIGTERM)
2016-07-03 11:09:20,885 INFO spawned: 'photos-nnrmls' with pid 17719
2016-07-03 11:09:20,886 INFO spawned: 'transformer-nnrmls' with pid 17720
2016-07-03 11:09:21,014 INFO spawned: 'extractor-nnrmls' with pid 17727
2016-07-03 11:09:21,075 INFO spawned: 'diffs-nnrmls' with pid 17731
2016-07-03 11:09:21,935 INFO success: photos-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 11:09:21,935 INFO success: transformer-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 11:09:22,014 INFO success: extractor-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 11:09:22,102 INFO success: diffs-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 12:04:08,150 INFO stopped: extractor-nnrmls (terminated by SIGTERM)
2016-07-03 16:44:43,335 INFO exited: diffs-nnrmls (exit status 0; expected)
2016-07-03 16:44:44,300 INFO exited: transformer-nnrmls (exit status 0; expected)
2016-07-03 16:44:45,600 INFO exited: photos-nnrmls (exit status 0; expected)
2016-07-03 16:55:40,359 INFO spawned: 'photos-nnrmls' with pid 23802
2016-07-03 16:55:40,360 INFO spawned: 'transformer-nnrmls' with pid 23803
2016-07-03 16:55:40,570 INFO spawned: 'extractor-nnrmls' with pid 23810
2016-07-03 16:55:40,628 INFO spawned: 'diffs-nnrmls' with pid 23814
2016-07-03 16:55:41,406 INFO success: photos-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 16:55:41,406 INFO success: transformer-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 16:55:41,682 INFO success: extractor-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 16:55:41,682 INFO success: diffs-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 17:03:44,794 INFO stopped: transformer-nnrmls (terminated by SIGTERM)
2016-07-03 17:03:45,796 INFO stopped: photos-nnrmls (terminated by SIGTERM)
2016-07-03 17:03:45,801 INFO stopped: extractor-nnrmls (terminated by SIGTERM)
2016-07-03 17:03:46,811 INFO stopped: diffs-nnrmls (terminated by SIGTERM)
2016-07-03 17:07:26,776 INFO spawned: 'photos-nnrmls' with pid 24752
2016-07-03 17:07:26,777 INFO spawned: 'transformer-nnrmls' with pid 24753
2016-07-03 17:07:26,909 INFO spawned: 'extractor-nnrmls' with pid 24760
2016-07-03 17:07:26,965 INFO spawned: 'diffs-nnrmls' with pid 24764
2016-07-03 17:07:27,852 INFO success: photos-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 17:07:27,852 INFO success: transformer-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 17:07:27,975 INFO success: extractor-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 17:07:27,975 INFO success: diffs-nnrmls entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-07-03 17:58:41,016 INFO exited: photos-nnrmls (exit status 0; expected)
2016-07-03 19:25:07,028 INFO exited: diffs-nnrmls (exit status 0; expected)
2016-07-05 11:35:14,156 INFO exited: transformer-nnrmls (exit status 0; expected)
mikesparr commented 8 years ago

Gist of my consumer class called by the 3 programs that exit:

The only time I expect the loop to end is if unknown exception (not just EOF which I check for) but I'm not seeing any caught unknown exception in logs.

edenhill commented 8 years ago

Very weird. If the client crashed in the C code the signal would be SIGSEGV or SIGABRT, but SIGTERM is the default channel used by the kill utility, which could indicate some deliberate kill. The Python client itself, nor the underlying librdkafka C libraries, does not send any signals.

Some random ideas:

mikesparr commented 8 years ago

It looks like when matching timestamps to other logs, the SIGTERM might be my restarts of supervisor. It looks like exit with status = 0 is when those scripts "EXIT" unexpectedly. I'm going to add the traceback debug log to that try block around c.poll() and let it run and see if I can catch the actual exception and report.

Thanks so much for helping. It looks like some thing thrown from c.poll() because if it was a msg.error() I would have handled it or raised and I'd see statement in logs before my raise kafka.KafkaException(msg.error()). It has to be something c.poll() is throwing itself so stay tuned.

mikesparr commented 8 years ago

I believe scripts exit because I call the c.close() near end (like a finally) and since I end the loop on exception, it runs and forces exit and close. Please take a peek at my gist link above and if you have better recommendation for where to place c.close() I appreciate. Meanwhile I'll try to figure out what it's throwing...

                    except:
                        running = False
                        logger.error( "Error polling messages." )
                        logger.debug( traceback.format_exc() )
                # end while running loop
            else:
                logger.error( "Consumer object missing. Nothing to do!" )

            try:
                c.close() # close connection
            except:
                logger.warn( "Could not close connection (c). Nothing to do!" )

I think above exits 0 when I c.close() but that is just side effect of the thrown exception breaking my loop.

edenhill commented 8 years ago

All that is covered by a number of try/except, and each except: logs the exception, so it should end up in your logs if that was the case, right?

mikesparr commented 8 years ago

Correct. But the last log exerpt on failure is that one "Error polling messages" and regrettably I didn't have the traceback debug to print the stack trace. I just added above and redeploying but may have to wait an hour or two before it quits again (assuming it will when we want it to). Once I see exit in Supervisor I'll tail logs and see what actual exception c.poll() is throwing and report.

I do have Monit (watchdog) also monitoring processes but haven't enabled it yet for this app so that is likely not culprit. I'll do some tests on c.close() to see if it forces exit for some reason. We'll figure it out. Thanks!

edenhill commented 8 years ago

There are a couple of cases where various errors can be returned by poll() if the connection(s) to the coordinator or partition leader broker goes down (internal error codes), or if the cluster is rebalancing itself (external error codes).

In the latter case, the external error codes coming from the brokers, we can't really know the exact error code that we'll see. So it might be better to not exit the consumer if an error is returned by poll(), there's a big chance that it will recover automatically once brokers are back up.

mikesparr commented 8 years ago

Okay, I will let this latest revision run its course and see if we can catch actual error in logs just to know. Next revision I'll remove the running = False and instead log it and continue. Would you keep the close() where I have it in that class as a final step when loop ends or since loop should never end, it's unnecessary because script will stop with some interrupt command?

edenhill commented 8 years ago

I would keep the close() to make sure that final offsets are committed, even in case of previous error.

mikesparr commented 8 years ago

Of course when I want it to fail it won't, but seeing non-idle activity since during day. I just shut off extractor to force no new records in other topics so I should have some log data after dinner time. I'll check later and report. Thanks again.

mikesparr commented 8 years ago

18 hours and didn't stop. I decided to remove the end of loop on .poll() exception and if I see any more in logs I'll share but for now will close this issue. Thanks again for your quick response and hard work in providing the client lib.

edenhill commented 8 years ago

Glad to hear that!