robinhood / faust

Python Stream Processing
Other
6.75k stars 534 forks source link

Standby workers hang after broker_max_poll_interval expires #517

Open mainTAP opened 4 years ago

mainTAP commented 4 years ago

Checklist

Steps to reproduce

-run multiple workers printing from a topic with 1 partition

-trigger re-balance by shutting down one of the workers ( the standby worker takes over )

-after broker_max_poll_interval expires, the standby workers won't be able to take over if the active worker fails

-trigger re-balance again, the standby workers running longer than the broker_max_poll_interval get stuck and will never join the group

Expected behavior

-one of the workers to keep printing the messages from the kafka topic -the other two workers to be stand-by and take over if the active worker fails

[2020-01-29 15:13:04,213] [510] [WARNING] Heartbeat failed for group kafka because it is rebalancing
[2020-01-29 15:13:04,213] [510] [INFO] Revoking previously assigned partitions frozenset() for group kafka 
[2020-01-29 15:13:04,215] [510] [INFO] (Re-)joining group kafka 
[2020-01-29 15:13:04,219] [510] [INFO] Joined group 'kafka' (generation 104) with member_id faust-1.10.1-72376069-cc84-44f1-83c5-c22300e8f857 
[2020-01-29 15:13:04,224] [510] [INFO] Successfully synced group kafka with generation 104 
[2020-01-29 15:13:04,225] [510] [INFO] Setting newly assigned partitions set() for group kafka 
[2020-01-29 15:13:04,227] [510] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-29 15:13:04,228] [510] [INFO] [^---Recovery]: Resuming streams with empty assignment 
[2020-01-29 15:13:04,228] [510] [INFO] [^---Recovery]: Worker ready

Actual behavior

-it works as expected until the broker_max_poll_interval expires and then the stand-by workers get stuck after the re-balance get initiated :

[2020-01-29 15:13:12,238] [510] [WARNING] Heartbeat failed for group kafka because it is rebalancing 
[2020-01-29 15:13:12,239] [510] [INFO] Revoking previously assigned partitions frozenset() for group kafka

-the worker needs to be restarted to be able to work correctly again

Full traceback

+ƒaµS† v1.10.1+-----------------------------------------------------------------------+
| id          | kafka                                                                 |
| transport   | [URL('kafka://x.x.x.x:9092'), URL('kafka://x.x.x.x:9092')]            |
| store       | memory:                                                               |
| web         | http://localhost:6066/                                                |
| log         | -stderr- (info)                                                       |
| pid         | 510                                                                   |
| platform    | CPython 3.6.6 (Linux x86_64)                                          |
| drivers     |                                                                       |
|   transport | aiokafka=1.1.3                                                        |
|   web       | aiohttp=3.6.2                                                         |
+-------------+-----------------------------------------------------------------------+
[2020-01-29 14:56:25,407] [510] [INFO] [^Worker]: Starting... 
[2020-01-29 14:56:25,412] [510] [INFO] [^-App]: Starting... 
[2020-01-29 14:56:25,413] [510] [INFO] [^--Monitor]: Starting... 
[2020-01-29 14:56:25,413] [510] [INFO] [^--Producer]: Starting... 
[2020-01-29 14:56:25,413] [510] [INFO] [^---ProducerBuffer]: Starting... 
[2020-01-29 14:56:25,424] [510] [INFO] [^--CacheBackend]: Starting... 
[2020-01-29 14:56:25,424] [510] [INFO] [^--Web]: Starting... 
[2020-01-29 14:56:25,424] [510] [INFO] [^---Server]: Starting... 
[2020-01-29 14:56:25,425] [510] [INFO] [^--Consumer]: Starting... 
[2020-01-29 14:56:25,426] [510] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2020-01-29 14:56:25,433] [510] [INFO] [^--LeaderAssignor]: Starting... 
[2020-01-29 14:56:25,434] [510] [INFO] [^--Producer]: Creating topic 'kafka-__assignor-__leader' 
[2020-01-29 14:56:25,444] [510] [INFO] [^--ReplyConsumer]: Starting... 
[2020-01-29 14:56:25,444] [510] [INFO] [^--AgentManager]: Starting... 
[2020-01-29 14:56:25,444] [510] [INFO] [^---Agent: kafka_to_eedb.mystream]: Starting... 
[2020-01-29 14:56:25,450] [510] [INFO] [^----OneForOneSupervisor: (1@0x7fd0a6f70048)]: Starting... 
[2020-01-29 14:56:25,450] [510] [INFO] [^---Conductor]: Starting... 
[2020-01-29 14:56:25,451] [510] [INFO] [^--TableManager]: Starting... 
[2020-01-29 14:56:25,451] [510] [INFO] [^---Conductor]: Waiting for agents to start... 
[2020-01-29 14:56:25,452] [510] [INFO] [^---Conductor]: Waiting for tables to be registered... 

[2020-01-29 14:56:26,452] [510] [INFO] [^---Recovery]: Starting... 
[2020-01-29 14:56:26,453] [510] [INFO] [^--Producer]: Creating topic 'kafka-__assignor-__leader' 
[2020-01-29 14:56:26,457] [510] [INFO] Updating subscribed topics to: frozenset({'sdwan_messages', 'kafka-__assignor-__leader'}) 
[2020-01-29 14:56:26,458] [510] [INFO] Subscribed to topic(s): {'sdwan_messages', 'kafka-__assignor-__leader'} 
[2020-01-29 14:56:26,474] [510] [INFO] Discovered coordinator 2 for group kafka 
[2020-01-29 14:56:26,475] [510] [INFO] Revoking previously assigned partitions set() for group kafka 
[2020-01-29 14:56:26,477] [510] [INFO] (Re-)joining group kafka 
[2020-01-29 14:56:26,705] [510] [INFO] Joined group 'kafka' (generation 79) with member_id faust-1.10.1-72376069-cc84-44f1-83c5-c22300e8f857 
[2020-01-29 14:56:26,711] [510] [INFO] Successfully synced group kafka with generation 79 
[2020-01-29 14:56:26,711] [510] [INFO] Setting newly assigned partitions set() for group kafka 
[2020-01-29 14:56:26,715] [510] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-29 14:56:26,716] [510] [INFO] [^---Recovery]: Resuming streams with empty assignment 
[2020-01-29 14:56:26,716] [510] [INFO] [^---Fetcher]: Starting... 
[2020-01-29 14:56:26,716] [510] [INFO] [^---Recovery]: Worker ready 
[2020-01-29 14:56:26,717] [510] [INFO] [^Worker]: Ready

[2020-01-29 15:12:54,150] [510] [WARNING] Heartbeat failed for group kafka because it is rebalancing 
[2020-01-29 15:12:54,150] [510] [INFO] Revoking previously assigned partitions frozenset() for group kafka 
[2020-01-29 15:12:54,152] [510] [INFO] (Re-)joining group kafka 
[2020-01-29 15:12:54,156] [510] [INFO] Joined group 'kafka' (generation 101) with member_id faust-1.10.1-72376069-cc84-44f1-83c5-c22300e8f857 
[2020-01-29 15:12:54,161] [510] [INFO] Successfully synced group kafka with generation 101 
[2020-01-29 15:12:54,162] [510] [INFO] Setting newly assigned partitions set() for group kafka 
[2020-01-29 15:12:54,164] [510] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-29 15:12:54,165] [510] [INFO] [^---Recovery]: Resuming streams with empty assignment 
[2020-01-29 15:12:54,165] [510] [INFO] [^---Recovery]: Worker ready 

[2020-01-29 15:12:57,169] [510] [WARNING] Heartbeat failed for group kafka because it is rebalancing 
[2020-01-29 15:12:57,170] [510] [INFO] Revoking previously assigned partitions frozenset() for group kafka 
[2020-01-29 15:12:57,172] [510] [INFO] (Re-)joining group kafka 
[2020-01-29 15:12:57,175] [510] [INFO] Joined group 'kafka' (generation 102) with member_id faust-1.10.1-72376069-cc84-44f1-83c5-c22300e8f857 
[2020-01-29 15:12:57,181] [510] [INFO] Successfully synced group kafka with generation 102 
[2020-01-29 15:12:57,182] [510] [INFO] Setting newly assigned partitions set() for group kafka 
[2020-01-29 15:12:57,183] [510] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-29 15:12:57,184] [510] [INFO] [^---Recovery]: Resuming streams with empty assignment 
[2020-01-29 15:12:57,184] [510] [INFO] [^---Recovery]: Worker ready

[2020-01-29 15:13:02,192] [510] [WARNING] Heartbeat failed for group kafka because it is rebalancing 
[2020-01-29 15:13:02,193] [510] [INFO] Revoking previously assigned partitions frozenset() for group kafka 
[2020-01-29 15:13:02,194] [510] [INFO] (Re-)joining group kafka 
[2020-01-29 15:13:02,198] [510] [INFO] Joined group 'kafka' (generation 103) with member_id faust-1.10.1-72376069-cc84-44f1-83c5-c22300e8f857 
[2020-01-29 15:13:02,204] [510] [INFO] Successfully synced group kafka with generation 103 
[2020-01-29 15:13:02,205] [510] [INFO] Setting newly assigned partitions set() for group kafka 
[2020-01-29 15:13:02,209] [510] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-29 15:13:02,209] [510] [INFO] [^---Recovery]: Resuming streams with empty assignment 
[2020-01-29 15:13:02,209] [510] [INFO] [^---Recovery]: Worker ready

[2020-01-29 15:13:04,213] [510] [WARNING] Heartbeat failed for group kafka because it is rebalancing 
[2020-01-29 15:13:04,213] [510] [INFO] Revoking previously assigned partitions frozenset() for group kafka 
[2020-01-29 15:13:04,215] [510] [INFO] (Re-)joining group kafka 
[2020-01-29 15:13:04,219] [510] [INFO] Joined group 'kafka' (generation 104) with member_id faust-1.10.1-72376069-cc84-44f1-83c5-c22300e8f857 
[2020-01-29 15:13:04,224] [510] [INFO] Successfully synced group kafka with generation 104 
[2020-01-29 15:13:04,225] [510] [INFO] Setting newly assigned partitions set() for group kafka 
[2020-01-29 15:13:04,227] [510] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-29 15:13:04,228] [510] [INFO] [^---Recovery]: Resuming streams with empty assignment 
[2020-01-29 15:13:04,228] [510] [INFO] [^---Recovery]: Worker ready 

[2020-01-29 15:13:12,238] [510] [WARNING] Heartbeat failed for group kafka because it is rebalancing 
[2020-01-29 15:13:12,239] [510] [INFO] Revoking previously assigned partitions frozenset() for group kafka

After around 1000 seconds, it hangs

Versions

mainTAP commented 4 years ago

This seems to be happening because the "-assignor-leader" topic is created with only one partition. How can one set the amount of partitions for the internal "-assignor-leader" topic ?

ask commented 4 years ago

The leader topic must have a single partition: whoever ends up with the partition is the leader. If it has two partitions there would be two leaders.

ask commented 4 years ago

Not sure why that code is there in aiokafka. The fetcher idle time is not going to be updated unless the fetcher is running, so this predicate will just hang forever.

Removing it and continuing the rebalance works just fine, so going with that.