confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
101 stars 894 forks source link

Producer fails to send messages #264

Closed rohitgcs closed 6 years ago

rohitgcs commented 7 years ago

Description

Producer doesn't connect and send messages. I send messages and it all runs, but when I try to flush, I get Connection Timed Out Errors.

How to reproduce

confluent_kafka.version() ('0.11.0', 720896) confluent_kafka.libversion() ('0.11.0', 721151) Kafka Broker 0.9.0.1

Full script: from confluent_kafka import Producer p = Producer({'bootstrap.servers': 'localhost', 'client.id': 'test-producer', 'debug': 'broker' ,'api.version.request': True}) for data in ['test'+str(i) for i in range(0,10)]: print data p.produce(KAFKA_TOPIC, data.encode('utf-8')) print "Before: ",len(p) time.sleep(2) print "After: ", len(p) p.poll(0)

p.flush()

p = Producer({'bootstrap.servers': 'localhost', 'client.id': 'test-producer', 'debug': 'broker' ,'api.version.request': True}) %7|1507675267.835|BRKMAIN|test-producer#producer-7| [thrd::0/internal]: :0/internal: Enter main broker thread %7|1507675267.835|BROKER|test-producer#producer-7| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1 %7|1507675267.835|DESTROY|test-producer#producer-6| [thrd:app]: Terminating instance %7|1507675267.835|DESTROY|test-producer#producer-6| [thrd:main]: Destroy internal %7|1507675267.835|DESTROY|test-producer#producer-6| [thrd:main]: Removing all topics %7|1507675267.835|TERMINATE|test-producer#producer-6| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Handle is terminating: failed 0 request(s) in retry+outbuf %7|1507675267.835|BROKERFAIL|test-producer#producer-6| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress) %7|1507675267.835|STATE|test-producer#producer-6| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state CONNECT -> DOWN %7|1507675267.835|BRKMAIN|test-producer#producer-7| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread %7|1507675267.835|CONNECT|test-producer#producer-7| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state INIT connecting %7|1507675267.835|CONNECT|test-producer#producer-7| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to ipv4#localhost:9092 (plaintext) with socket 8 %7|1507675267.835|STATE|test-producer#producer-7| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> CONNECT %7|1507675267.835|STATE|test-producer#producer-7| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP %7|1507675267.935|TERMINATE|test-producer#producer-6| [thrd::0/internal]: :0/internal: Handle is terminating: failed 0 request(s) in retry+outbuf %7|1507675267.935|BROKERFAIL|test-producer#producer-6| [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Success) %7|1507675267.935|STATE|test-producer#producer-6| [thrd::0/internal]: :0/internal: Broker changed state UP -> DOWN

Checklist

Please provide the following information:

edenhill commented 7 years ago

Could you provide some more logs, the provided log only gives 100 milliseconds of output.

Side note: len(Producer) will return both messages queued for tranmission, in transit, and messages that have been delivered/failed delivery. The latter bunch will only get served/removed when you call poll() or flush().

Chloejay commented 4 years ago

same issue when rerun docker localhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress),

here is the logs

%7|1601868269.297|BRKMAIN|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread %7|1601868269.297|WAKEUPFD|driver.producer#producer-1| [thrd:app]: localhost:9092/bootstrap: Enabled low-latency ops queue wake-ups %7|1601868269.297|BROKER|driver.producer#producer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1 %7|1601868269.297|BRKMAIN|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread %7|1601868269.297|CONNECT|driver.producer#producer-1| [thrd:app]: localhost:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s)) %7|1601868269.298|CONNECT|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op %7|1601868269.298|STATE|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT %7|1601868269.298|BROADCAST|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change %7|1601868269.298|CONNECT|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting %7|1601868269.298|STATE|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT %7|1601868269.298|BROADCAST|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change %7|1601868269.298|INIT|driver.producer#producer-1| [thrd:app]: librdkafka v1.4.0 (0x10400ff) driver.producer#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xfffff) Flushing producer and exiting. %7|1601868269.301|DESTROY|driver.producer#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0)) %7|1601868269.301|TERMINATE|driver.producer#producer-1| [thrd:app]: Interrupting timers %7|1601868269.301|TERMINATE|driver.producer#producer-1| [thrd:app]: Sending TERMINATE to internal main thread %7|1601868269.302|TERMINATE|driver.producer#producer-1| [thrd:app]: Joining internal main thread %7|1601868269.302|TERMINATE|driver.producer#producer-1| [thrd:main]: Internal main thread terminating %7|1601868269.302|DESTROY|driver.producer#producer-1| [thrd:main]: Destroy internal %7|1601868269.302|BROADCAST|driver.producer#producer-1| [thrd:main]: Broadcasting state change %7|1601868269.302|DESTROY|driver.producer#producer-1| [thrd:main]: Removing all topics %7|1601868269.302|DESTROY|driver.producer#producer-1| [thrd:main]: Sending TERMINATE to localhost:9092/bootstrap %7|1601868269.302|TERMINATE|driver.producer#producer-1| [thrd:main]: Purging reply queue %7|1601868269.302|TERMINATE|driver.producer#producer-1| [thrd:main]: Decommissioning internal broker %7|1601868269.302|TERMINATE|driver.producer#producer-1| [thrd:main]: Join 2 broker thread(s) %7|1601868269.302|TERM|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs %7|1601868269.302|BROKERFAIL|driver.producer#producer-1| [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Undefined error: 0) %7|1601868269.302|FAIL|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 4ms in state INIT) %7|1601868269.302|STATE|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN %7|1601868269.302|BROADCAST|driver.producer#producer-1| [thrd::0/internal]: Broadcasting state change %7|1601868269.302|BUFQ|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers %7|1601868269.302|BUFQ|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers %7|1601868269.302|BUFQ|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset %7|1601868269.302|TERMINATE|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x7fc682023728), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf %7|1601868269.302|BROKERFAIL|driver.producer#producer-1| [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Undefined error: 0) %7|1601868269.302|BUFQ|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers %7|1601868269.302|BUFQ|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers %7|1601868269.302|BUFQ|driver.producer#producer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset %7|1601868269.306|CONNECT|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to ipv6#[::1]:9092 (plaintext) with socket 5 %7|1601868269.307|TERMINATE|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Handle is terminating in state CONNECT: 1 refcnts (0x7fc68181ad28), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf %7|1601868269.307|BROKERFAIL|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress) %7|1601868269.307|STATE|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state CONNECT -> DOWN %7|1601868269.307|BROADCAST|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change %7|1601868269.307|BUFQ|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers %7|1601868269.307|BUFQ|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers %7|1601868269.307|BUFQ|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0 buffers on connection reset %7|1601868269.307|TERM|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received TERMINATE op in state DOWN: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs %7|1601868269.307|BROKERFAIL|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress) %7|1601868269.307|FAIL|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Client is terminating (after 0ms in state DOWN) %7|1601868269.307|BUFQ|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers %7|1601868269.307|BUFQ|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers %7|1601868269.307|BUFQ|driver.producer#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0 buffers on connection reset %7|1601868269.307|TERMINATE|driver.producer#producer-1| [thrd:main]: Internal main thread termination done %7|1601868269.307|TERMINATE|driver.producer#producer-1| [thrd:app]: Destroying op queues %7|1601868269.307|TERMINATE|driver.producer#producer-1| [thrd:app]: Termination done: freeing resources

This is docker-compose ps

            Name                         Command            State                 Ports              
------------------------------------------------------------------------------------------------------
kafka_docker_connect_1           /etc/confluent/docker/run   Up       0.0.0.0:8083->8083/tcp, 9092/tcp
kafka_docker_control-center_1    /etc/confluent/docker/run   Up       0.0.0.0:9021->9021/tcp          
kafka_docker_kafka_1             /etc/confluent/docker/run   Exit 1                                   
kafka_docker_ksql-server_1       /etc/confluent/docker/run   Up       0.0.0.0:8088->8088/tcp          
kafka_docker_schema-registry_1   /etc/confluent/docker/run   Exit 1                                   
kafka_docker_tools_1             /bin/sh                     Up       8083/tcp, 9092/tcp              
kafka_docker_zookeeper_1         /etc/confluent/docker/run   Up       2181/tcp, 2888/tcp, 3888/tcp  

It failed when running producer.py.

edenhill commented 4 years ago

Looks like you are not waiting for messages to be delivered before terminating the application. You need to use flush().

Chloejay commented 4 years ago

@edenhill thanks for the hint, solved it after your advice.