Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

pykafka producer deadlock on nosetests' integration tests #1004

Open hoathenguyen85 opened 4 years ago

hoathenguyen85 commented 4 years ago

I posted a question at https://stackoverflow.com/questions/61800654/pykafka-deadlock-on-nosetests-integration-tests. This only happens on python3.6. We are using pykafka 2.7.0 on kafka 1.0.0.

When running nosetest that goes through following code:

   def publish(self, topic, message):
       topic = topic.lower()
       self._log.info('publish top topic ' + topic)

       if topic not in self.producer_map:
           k_topic = self.__messenger.topics[topic.encode()]
           self._log.info(k_topic)
           new_producer = k_topic.get_producer()
           self.producer_map[topic] = new_producer

       self.producer_map[topic].produce(message)

when the tests finishes, it'lll hang caused by these producer trying to stop. I use gdb to look at where it hangs and it shows this.

#9 Frame 0x7f4824fe7958, for file /usr/lib64/python3.6/threading.py, line 1072, in _wait_for_tstate_lock (self=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7670>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <_io.TextIOWrapper at remote 0x7f48549b1708>)) at remote 0x7f48275474a8>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4824de3160>, block=True, timeout=-1, lock=<_thread.lock at remote 0x7f4824de78a0>)
    elif lock.acquire(block, timeout):
#13 Frame 0x7f4824e045a0, for file /usr/lib64/python3.6/threading.py, line 1056, in join (self=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de76---Type <return> to continue, or q <return> to quit---
70>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <redacted>, <redacted>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4824de3160>, timeout=None)
    self._wait_for_tstate_lock()
#17 (frame information optimized out)
#21 Frame 0x7f484d8bee08, for file /usr/local/lib/python3.6/site-packages/pykafka/producer.py, line 235, in __del__ (self=<Producer(_cluster=<Cluster(_seed_hosts='<redacted>:9092', _socket_timeout_ms=30000, _offsets_channel_socket_timeout_ms=10000, _handler=<ThreadingHandler at remote 0x7f4824e81da0>, _brokers={0: <Broker(_connection=<BrokerConnection(_buff=<bytearray at remote 0x7f4824fdb228>, host=b'<redacted>', port=9092, _handler=<...>, _socket=<socket at remote 0x7f4824dff048>, source_host='', source_port=0, _wrap_socket=<function at remote 0x7f4824dfba60>) at remote 0x7f4824fd5898>, _offsets_channel_connection=None, _id=0, _host=b'<redacted>', _port=9092, _source_host='', _source_port=0, _ssl_config=None, _handler=<...>, _req_handler=<RequestHandler(handler=<...>, shared=<Shared at remote 0x7f4824dd0048>, t=<Thread(_target=<function at remote 0x7f4824dfbae8>, _name="1: pykafka.RequestHandler.worker for b'<redacted>':9092", _args=(), _kwargs={}, _daemonic=True, _ident=139948381951744, _tstate_lock=<_thread.lock at remote 0x7f4824de7120>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de71e8>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de71e8>, release=<built-in method release of _thread.lock object at remote 0x7f4824de71e8>, _waiters=<collections.deque at remote 0x7f4824e079a0>) at remote 0x7f4835420198>, _flag=True) at remote 0x7f4835424358>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <redacted>, <_io.TextIOWrapper at remote 0x7f48549b1708>)) at remote 0x7f48275474a8>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4835424940>) at remote 0x7f4824f62438>, _offsets_channel_req_handler=None, _socket_timeout_ms=30000, _offsets_channel_socket_timeout_ms=10000, _buffer_size=1048576, _req_handlers={}, _broker_version='0.9.0', _api_versions={0: <ApiVersionsSpec at remote 0x7f4824e88408>, 1: <ApiVersionsSpec at remote 0x7f4824e88458>, 2: <ApiVersionsSpec at remote 0x7f4824e884a8>, 3: <ApiVersionsSpec at remote 0x7f4824e884f8>, 4: <ApiVersionsSpec at remote 0x7f4824e88548>, 5: <ApiVersionsSpec at remote 0x7f4824e88598>, 6: <ApiVersionsSpec at remote 0x7f4824e885e8>, 7: <ApiVersionsSpec at remote 0x7f4824e88638>, 8: <ApiVersionsSpec at remote 0x7f4824e88688>, 9: <ApiVersionsSpec at remote 0x7f4824e886d8>, 10: <ApiVersionsSpec at remote 0x7f4824e88728>, 11: <ApiVersionsSpec at remote 0x7f4824e88778>, 12: <ApiVersionsSpec at remote 0x7f4824e887c8>, 13: <ApiVersionsSpec at remote 0x7f4824e88818>, 14: <ApiVersionsSpec at remote 0x7f4824e88868>, 15: <ApiVersionsSpec at remote 0x7f4824e888b8>, 16: <ApiVersionsSpec at remote 0x7f4824e88908>}) at remote 0x7f482500f4e0>}, _topics=<TopicDict(_cluster=<weakref at remote 0x7f4824f9c4f8>, _exclude_internal_topics=True) at remote 0x7f4824fcd888>, _source_address='', _source_host='', _source_port=0, _ssl_config=None, _zookeeper_connect=None, _max_connection_retries=3, _max_connection_retries_offset_mgr=8, _broker_version='0.9.0', _api_versions={...}, controller_broker=None) at remote 0x7f4824ed8198>, _protocol_version=0, _topic=<Topic(_name=b'<redacted>', _cluster=<...>, _partitions={0: <Partition(_id=0, _leader=<...>, _replicas=[<...>], _isr=[<...>], _topic=<weakref at remote 0x7f4824e0cdb8>) at remote 0x7f48353ffbe0>}) at remote 0x7f4824deedd8>, _partitioner=<RandomPartitioner(idx=0) at remote 0x7f48526baba8>, _compression=0, _max_retries=3, _retry_backoff_ms=100, _required_acks=1, _ack_timeout_ms=10000, _max_queued_messages=100000, _min_queued_messages=70000, _linger_ms=5000, _queue_empty_timeout_ms=0, _block_on_queue_full=True, _max_request_size=1000012, _synchronous=False, _worker_exception=None, _owned_brokers={0: <OwnedBroker(producer=<weakproxy at remote 0x7f4824e0ce08>, broker=<...>, lock=<_thread.RLock at remote 0x7f4824fd8420>, flush_ready=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7698>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7698>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7698>, _waiters=<collections.deque at remote 0x7f4824ddc118>) at remote 0x7f4824de3320>, _flag=False) at remote 0x7f4824de33c8>, has_message=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7710>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7710>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7710>, _waiters=<collections.deque at remote 0x7f4824ddc0b0>) at remote 0x7f4824de3240>, _flag=True) at remote 0x7f4824de3390>, slot_available=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7788>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7788>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7788>, _waiters=<collections.deque at remote 0x7f4824ddc180>) at remote 0x7f4824de3208>, _flag=True) at remote 0x7f4824de3278>, queue=<collections.deque at remote 0x7f4824ddc1e8>, messages_pending=2, running=False, _auto_start=True, _queue_reader_worker=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(...), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7670>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<...>) at remote 0x7f4824de3160>) at remote 0x7f4824e81cc0>}, _delivery_reports=<_DeliveryReportNone(queue=None) at remote 0x7f4824e814a8>, _pending_timeout_ms=5000, _auto_start=True, _serializer=None, _running=True, _update_lock=<_thread.lock at remote 0x7f4824de7620>) at remote 0x7f4824fdb208>)
    self.stop()
#30 Garbage-collecting

What I do to fix this is to stop the producer right after it produces a message, which is not ideal as it'll waste time when sending lots of messages.