apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
51 stars 39 forks source link

Failed to reconnect producer: ProducerBusy #155

Open TCGOGOGO opened 10 months ago

TCGOGOGO commented 10 months ago

Descripition

The connection between client and server may be broken in some cases, for example, network fluctuations, so I was trying to have a retry logic when producer send messages, when I close the network connection forcely I get some logs like:

2023-10-07 17:06:20.322 ERROR [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:1234 | [src:58348 -> dest:32006] Connection closed with ConnectError
2023-10-07 17:06:20.323 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-1, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:20.323 ERROR [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:1234 | [src:58345 -> dest:32006] Connection closed with ConnectError  
2023-10-07 17:06:20.323 ERROR [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:1234 | [src:58346 -> dest:32006] Connection closed with ConnectError  
2023-10-07 17:06:20.323 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-2, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:20.324 ERROR [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:1234 | [src:58347 -> dest:32006] Connection closed with ConnectError  
2023-10-07 17:06:20.324 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-0, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:20.324 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-3, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:20.324 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:263 | [src:58348 -> dest:32006] Destroyed connection
2023-10-07 17:06:20.324 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:263 | [src:58345 -> dest:32006] Destroyed connection
2023-10-07 17:06:20.324 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:263 | [src:58346 -> dest:32006] Destroyed connection
2023-10-07 17:06:20.324 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:263 | [src:58347 -> dest:32006] Destroyed connection
2023-10-07 17:06:20.427 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-1, test] Getting connection from pool
2023-10-07 17:06:20.427 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:87 | Deleting stale connection from pool for pulsar://dest:32006 use_count: -1 @ 0000000000000000
2023-10-07 17:06:20.427 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:184 | [<none> -> pulsar://dest:32006] Create ClientConnection, timeout=3000  
2023-10-07 17:06:20.427 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:106 | Created connection for pulsar://dest:32006
2023-10-07 17:06:20.427 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-2, test] Getting connection from pool   
2023-10-07 17:06:20.427 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-3, test] Getting connection from pool   
2023-10-07 17:06:20.428 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-0, test] Getting connection from pool

and then I saw the reconnction logic but got a lot of failure logs like:

2023-10-07 17:06:23.430 ERROR [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:1234 | [<none> -> pulsar://dest:32006] Connection closed with ConnectError   
2023-10-07 17:06:23.430 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-1, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:23.430 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-2, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:23.430 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-3, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:23.430 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:147 | [persistent://public/default/test-p-partition-0, test] Schedule reconnection in 0.1 s
2023-10-07 17:06:23.430 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:263 | [<none> -> pulsar://dest:32006] Destroyed connection
2023-10-07 17:06:23.538 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-1, test] Getting connection from pool
2023-10-07 17:06:23.538 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:87 | Deleting stale connection from pool for pulsar://dest:32006 use_count: -1 @ 0000000000000000
2023-10-07 17:06:23.538 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:184 | [<none> -> pulsar://dest:32006] Create ClientConnection, timeout=3000  
2023-10-07 17:06:23.539 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:106 | Created connection for pulsar://dest:32006
2023-10-07 17:06:23.539 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-0, test] Getting connection from pool   
2023-10-07 17:06:23.539 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-2, test] Getting connection from pool   
2023-10-07 17:06:23.540 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-3, test] Getting connection from pool   
2023-10-07 17:06:23.568 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:382 | [10.10.10.131:58515 -> dest:32006] Connected to broker
2023-10-07 17:06:23.627 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:87 | Deleting stale connection from pool for pulsar://brokers-broker-1.brokers-broker-headless.pulsar.svc.cluster.local:6650 use_count: -1 @ 0000000000000000
2023-10-07 17:06:23.627 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:184 | [<none> -> pulsar://dest:32006] Create ClientConnection, timeout=3000  
2023-10-07 17:06:23.627 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:106 | Created connection for pulsar://brokers-broker-1.brokers-broker-headless.pulsar.svc.cluster.local:6650
2023-10-07 17:06:23.632 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:87 | Deleting stale connection from pool for pulsar://brokers-broker-0.brokers-broker-headless.pulsar.svc.cluster.local:6650 use_count: -1 @ 0000000000000000
2023-10-07 17:06:23.632 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:184 | [<none> -> pulsar://dest:32006] Create ClientConnection, timeout=3000  
2023-10-07 17:06:23.633 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:106 | Created connection for pulsar://brokers-broker-0.brokers-broker-headless.pulsar.svc.cluster.local:6650
2023-10-07 17:06:23.633 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:87 | Deleting stale connection from pool for pulsar://brokers-broker-2.brokers-broker-headless.pulsar.svc.cluster.local:6650 use_count: -1 @ 0000000000000000
2023-10-07 17:06:23.633 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:184 | [<none> -> pulsar://dest:32006] Create ClientConnection, timeout=3000  
2023-10-07 17:06:23.633 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ConnectionPool:106 | Created connection for pulsar://brokers-broker-2.brokers-broker-headless.pulsar.svc.cluster.local:6650
2023-10-07 17:06:23.656 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:384 | [10.10.10.131:58516 -> dest:32006] Connected to broker through proxy. Logical broker: pulsar://brokers-broker-1.brokers-broker-headless.pulsar.svc.cluster.local:6650
2023-10-07 17:06:23.664 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:384 | [10.10.10.131:58519 -> dest:32006] Connected to broker through proxy. Logical broker: pulsar://brokers-broker-2.brokers-broker-headless.pulsar.svc.cluster.local:6650
2023-10-07 17:06:23.673 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:384 | [10.10.10.131:58518 -> dest:32006] Connected to broker through proxy. Logical broker: pulsar://brokers-broker-0.brokers-broker-headless.pulsar.svc.cluster.local:6650
2023-10-07 17:06:23.722 WARN  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ClientConnection:1630 | [10.10.10.131:58516 -> dest:32006] Received error response from server: ProducerBusy (org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'test' is already connected to topic) -- req_id: 4
2023-10-07 17:06:23.722 WARN  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ProducerImpl:281 | [persistent://public/default/test-p-partition-1, test] Failed to reconnect producer: ProducerBusy

finally, after a long time reconnection, the client recovery

2023-10-07 17:07:12.180 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-1, test] Getting connection from pool
2023-10-07 17:07:12.196 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-3, test] Getting connection from pool
2023-10-07 17:07:12.196 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-0, test] Getting connection from pool   
2023-10-07 17:07:12.197 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\HandlerBase:72 | [persistent://public/default/test-p-partition-2, test] Getting connection from pool   
2023-10-07 17:07:12.237 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ProducerImpl:209 | [persistent://public/default/test-p-partition-1, test] Created producer on broker [src:58516 -> dest:32006]
2023-10-07 17:07:12.253 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ProducerImpl:209 | [persistent://public/default/test-p-partition-3, test] Created producer on broker [src:58518 -> dest:32006]
2023-10-07 17:07:12.253 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ProducerImpl:209 | [persistent://public/default/test-p-partition-0, test] Created producer on broker [src:58518 -> dest:32006]
2023-10-07 17:07:12.256 INFO  [49156] D:\a\pulsar-client-cpp\pulsar-client-cpp\lib\ProducerImpl:209 | [persistent://public/default/test-p-partition-2, test] Created producer on broker [src:58519 -> dest:32006]

The entire reconntion from first try to final success cost nearly 60 seconds, which seems too long

Reproduce

Version

pulsar: 3.1.0.1 pulsar python sdk: 3.3.0

Step

  1. run pulsar server
  2. using python client producer to send message, sample code:
    
    import time
    import pulsar

client = pulsar.Client('pulsar server address', connection_timeout_ms=3000, operation_timeout_seconds=3)

producer = client.create_producer(topic='persistent topic name', send_timeout_millis=1000, producer_name='test')

def send_message(content): while True: try: print(f'connection: state: {producer.is_connected()}') msg_id = producer.send(content.encode('utf-8')) print(msg_id) time.sleep(5) break except Exception as e: print(e)

while True: send_message(input())


3. input something to make sure the connection is ok
4. close the network connection to the server manully
5. reconnect the network
## Expection
I believe this could not be the client issue, and please help triage if necessarily. The client reconnection is expected to be more quick and smooth
BewareMyPower commented 10 months ago

There is a similar fix (https://github.com/apache/pulsar/pull/21144) in core repo but I didn't look into this fix currently

TCGOGOGO commented 10 months ago

There is a similar fix (apache/pulsar#21144) in core repo but I didn't look into this fix currently

Thx Yunze for the information