faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Off by one error fetching changelog #155

Closed forsberg closed 1 year ago

forsberg commented 3 years ago

Checklist

Steps to reproduce

Faust application that uses a Table with Tumbling Window.

Scaling up from 4 to 6 workers, where the two newly started workers already have a RocksDB on disk, but the offset commited in RocksDB has a lower value than the earliest available offset in Kafka, i.e. triggering the need to read the full changelog.

Expected behavior

Full changelog is read into RocksDB and application starts processing data.

Actual behavior

Application gets stuck doing nothing, waiting for data on the changelog partition.

In the below example, the oldest offset for partition 9 as reported by Kafka metrics is 18457042, which is exactly one more than the offset Faust is trying to reset to. This makes me suspect this is an off by one error.

Inspecting the code in aiokafka that emits the error message about Fetch offset being out of range, I'm guessing that the default reset strategy is set to latest which is the default, something that will lead to Faust not getting any messages and getting stuck.

2021-05-21 08:18:33,482 INFO ferrostream.authentication Configuring Kafka SASL Authentication using SASL username ferrostream-production-ferroaggregate
+ƒaµS† v0.6.4-+--------------------------------------------------------+
| id          | production-ferroaggregate                              |
| transport   | [URL('kafka://kafka-kafka-bootstrap.kafka.svc:9092')]  |
| store       | rocksdb:                                               |
| web         | http://localhost:6066/                                 |
| log         | -stderr- (info)                                        |
| pid         | 1                                                      |
| hostname    | production-ferroaggregate-4                            |
| platform    | CPython 3.8.9 (Linux x86_64)                           |
|        +    | Cython (GCC 8.3.0)                                     |
| drivers     |                                                        |
|   transport | aiokafka=0.7.0                                         |
|   web       | aiohttp=3.7.4.post0                                    |
| datadir     | /ferrostream/production-ferroaggregate-data            |
| appdir      | /ferrostream/production-ferroaggregate-data/v1         |
+-------------+--------------------------------------------------------+
[2021-05-21 08:18:33,727] [1] [INFO] [^Worker]: Starting... 
[2021-05-21 08:18:33,731] [1] [INFO] [^-App]: Starting... 
[2021-05-21 08:18:33,731] [1] [INFO] [^--PrometheusMonitor]: Starting... 
[2021-05-21 08:18:33,731] [1] [INFO] [^--Producer]: Starting... 
[2021-05-21 08:18:33,731] [1] [INFO] [^---ProducerBuffer]: Starting... 
[2021-05-21 08:18:33,754] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:33,778] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:33,784] [1] [INFO] [^--CacheBackend]: Starting... 
[2021-05-21 08:18:33,784] [1] [INFO] [^--Web]: Starting... 
[2021-05-21 08:18:33,785] [1] [INFO] [^---ServerThread]: Starting... 
[2021-05-21 08:18:33,786] [1] [INFO] [^--Consumer]: Starting... 
[2021-05-21 08:18:33,787] [1] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2021-05-21 08:18:33,810] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:33,830] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:33,836] [1] [INFO] [^--LeaderAssignor]: Starting... 
[2021-05-21 08:18:33,836] [1] [INFO] [^--ReplyConsumer]: Starting... 
[2021-05-21 08:18:33,836] [1] [INFO] [^--AgentManager]: Starting... 
[2021-05-21 08:18:33,837] [1] [INFO] [^---Agent: ferrostrea[.]aggregate_inputdata]: Starting... 
[2021-05-21 08:18:33,840] [1] [INFO] [^----OneForOneSupervisor: (1@0x7f0c10a6ac40)]: Starting... 
[2021-05-21 08:18:33,841] [1] [INFO] [^---Conductor]: Starting... 
[2021-05-21 08:18:33,841] [1] [INFO] [^--TableManager]: Starting... 
[2021-05-21 08:18:33,842] [1] [INFO] [^---Conductor]: Waiting for agents to start... 
[2021-05-21 08:18:33,843] [1] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2021-05-21 08:18:34,842] [1] [INFO] [^---Table: inputdata-avg-1m]: Starting... 
[2021-05-21 08:18:34,852] [1] [INFO] [^----Store: rocksdb:inputdata-avg-1m]: Starting... 
[2021-05-21 08:18:34,853] [1] [INFO] [^--Producer]: Creating topic 'production-ferroaggregate-inputdata-avg-1m-changelog' 
[2021-05-21 08:18:34,882] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:34,892] [1] [INFO] [^---Recovery]: Starting... 
[2021-05-21 08:18:34,892] [1] [INFO] [^--Producer]: Creating topic 'production-ferroaggregate-inputdata-avg-1m-changelog' 
[2021-05-21 08:18:34,922] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:34,924] [1] [INFO] Updating subscribed topics to: 
+Requested Subscription--------------------------------+
| topic name                                           |
+------------------------------------------------------+
| inputdata-production                                 |
| production-ferroaggregate-inputdata-avg-1m-changelog |
+------------------------------------------------------+ 
[2021-05-21 08:18:34,925] [1] [INFO] Subscribed to topic(s): 
+Final Subscription------------------------------------+
| topic name                                           |
+------------------------------------------------------+
| inputdata-production                                 |
| production-ferroaggregate-inputdata-avg-1m-changelog |
+------------------------------------------------------+ 
[2021-05-21 08:18:34,943] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:34,966] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:34,990] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:34,990] [1] [INFO] Discovered coordinator 1 for group production-ferroaggregate 
[2021-05-21 08:18:34,992] [1] [INFO] Revoking previously assigned partitions set() for group production-ferroaggregate 
[2021-05-21 08:18:34,993] [1] [INFO] (Re-)joining group production-ferroaggregate 
[2021-05-21 08:18:38,071] [1] [INFO] Joined group 'production-ferroaggregate' (generation 100) with member_id faust-0.6.4-a18e8a39-e539-4209-b81b-72dfb817154c 
[2021-05-21 08:18:38,083] [1] [INFO] Successfully synced group production-ferroaggregate with generation 100 
[2021-05-21 08:18:38,083] [1] [INFO] Setting newly assigned partitions set() for group production-ferroaggregate 
[2021-05-21 08:18:38,084] [1] [INFO] Executing _on_partitions_assigned 
[2021-05-21 08:18:38,112] [1] [INFO] generation id 100 app consumers id 100 
[2021-05-21 08:18:53,119] [1] [WARNING] Heartbeat failed for group production-ferroaggregate because it is rebalancing 
[2021-05-21 08:18:53,120] [1] [INFO] Revoking previously assigned partitions frozenset() for group production-ferroaggregate 
[2021-05-21 08:18:53,121] [1] [INFO] (Re-)joining group production-ferroaggregate 
[2021-05-21 08:18:53,385] [1] [INFO] Joined group 'production-ferroaggregate' (generation 101) with member_id faust-0.6.4-a18e8a39-e539-4209-b81b-72dfb817154c 
[2021-05-21 08:18:53,401] [1] [INFO] Successfully synced group production-ferroaggregate with generation 101 
[2021-05-21 08:18:53,402] [1] [INFO] Setting newly assigned partitions 
+Topic Partition Set-----------------------------------+------------+
| topic                                                | partitions |
+------------------------------------------------------+------------+
| inputdata-production                                 | {0, 9}     |
| production-ferroaggregate-inputdata-avg-1m-changelog | {0-4, 9}   |
+------------------------------------------------------+------------+ for group production-ferroaggregate 
[2021-05-21 08:18:53,403] [1] [INFO] Executing _on_partitions_assigned 
[2021-05-21 08:18:53,440] [1] [INFO] opening partition 0 for gen id 101 app id 101 
[2021-05-21 08:18:53,470] [1] [INFO] Authenticated as ferrostream-production-ferroaggregate via PLAIN 
[2021-05-21 08:18:53,822] [1] [INFO] opening partition 9 for gen id 101 app id 101 
[2021-05-21 08:18:53,844] [1] [INFO] generation id 101 app consumers id 101 
[2021-05-21 08:18:56,602] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +1.757745940994937 runtime=1.8578983144834638e-05 sleeptime=2.757745940994937 
[2021-05-21 08:18:56,603] [1] [INFO] Timer livelock woke up too late, with a drift of +1.812735896994127 runtime=1.0554998880252242e-05 sleeptime=8.812735896994127 
[2021-05-21 08:18:56,603] [1] [INFO] Timer Recovery.stats woke up too late, with a drift of +1.708480691013392 runtime=6.540009053424001e-06 sleeptime=6.708480691013392 
[2021-05-21 08:19:26,665] [1] [INFO] [^---Recovery]: Highwater for active changelog partitions:
+Highwater - Active------------------------------------+-----------+-----------+
| topic                                                | partition | highwater |
+------------------------------------------------------+-----------+-----------+
| production-ferroaggregate-inputdata-avg-1m-changelog | 0         | 20950700  |
| 〃                                                   | 9         | 26584769  |
+------------------------------------------------------+-----------+-----------+ 
[2021-05-21 08:19:28,172] [1] [INFO] [^---Recovery]: active offsets at start of reading:
+Reading Starts At - Active----------------------------+-----------+----------+
| topic                                                | partition | offset   |
+------------------------------------------------------+-----------+----------+
| production-ferroaggregate-inputdata-avg-1m-changelog | 0         | 20815621 |
| 〃                                                   | 9         | 18457041 |
+------------------------------------------------------+-----------+----------+ 
[2021-05-21 08:19:29,677] [1] [INFO] [^---Recovery]: standby offsets at start of reading:
+Reading Starts At - Standby---------------------------+-----------+----------+
| topic                                                | partition | offset   |
+------------------------------------------------------+-----------+----------+
| production-ferroaggregate-inputdata-avg-1m-changelog | 1         | 20428183 |
| 〃                                                   | 2         | 19706521 |
| 〃                                                   | 3         | 17811662 |
| 〃                                                   | 4         | 18976318 |
+------------------------------------------------------+-----------+----------+ 
[2021-05-21 08:19:29,680] [1] [INFO] [^---Recovery]: Restoring state from changelog topics... 
[2021-05-21 08:19:29,680] [1] [INFO] [^---Recovery]: Resuming flow... 
[2021-05-21 08:19:29,681] [1] [INFO] [^---Fetcher]: Starting... 
[2021-05-21 08:19:29,835] [1] [INFO] Fetch offset 20815621 is out of range for partition TopicPartition(topic='production-ferroaggregate-inputdata-avg-1m-changelog', partition=0), resetting offset 
[2021-05-21 08:19:29,836] [1] [INFO] Fetch offset 18457041 is out of range for partition TopicPartition(topic='production-ferroaggregate-inputdata-avg-1m-changelog', partition=9), resetting offset 
[2021-05-21 08:20:01,512] [1] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='production-ferroaggregate-inputdata-avg-1m-changelog', partition=0) since recovery start (started 31.83 seconds ago) 
[2021-05-21 08:20:01,513] [1] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='production-ferroaggregate-inputdata-avg-1m-changelog', partition=9) since recovery start (started 31.83 seconds ago) 
[2021-05-21 08:20:06,613] [1] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='production-ferroaggregate-inputdata-avg-1m-changelog', partition=0) since recovery start (started 36.93 seconds ago) 
[2021-05-21 08:20:06,614] [1] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='production-ferroaggregate-inputdata-avg-1m-changelog', partition=9) since recovery start (started 36.93 seconds ago) 

The above message will repeat forever.

There is also a different behaviour that I think happen when one of the partitions can be fetched, but not the other, when it get stuck repeating the following message:

 [2021-05-21 09:29:10,563] [1] [INFO] [^---Recovery]: Still fetching changelog topics for recovery, estimated time remaining 25.25 minutes (tota │
 +Remaining for active recovery-------------------------+-----------+-------------+-------------+-----------+                                    │
 | topic                                                | partition | need offset | have offset | remaining |                                    │
 +------------------------------------------------------+-----------+-------------+-------------+-----------+                                    │
 | production-ferroaggregate-inputdata-avg-1m-changelog | 10        | 26878019    | 13027618    | 13850401  |                                    │
 +------------------------------------------------------+-----------+-------------+-------------+-----------+ 

Versions

forsberg commented 3 years ago

Inspecting code, line 692 seems very suspicious. I guess it tries to compensate for line 644

In _seek_offsets we see that the code handles the case when earliest_offset was 0, but it's not handling the case when the earliest offset in Kafka is > 0 and offset variable is set to one number less than that.

Note: Same behaviour when scaling up on empty disk, i.e no previous RocksDB state.

PJ-Schulz commented 3 years ago

Hello, I had a similar error and described it in #176. My application is trying to restore an offset that no longer exists due to the segment time.

Have you already found a solution to this problem?

dima-orca commented 2 years ago

Same issue here. @forsberg did your fix solve the issue ?

wbarnha commented 2 years ago

We have code for https://github.com/forsberg/faust-streaming/commit/26cb488add47759df68c99842674a54d72f322a1 merged in at this point merged in by db6a3ae28ace1112132ef5dc07f4cf50afcdc427, I think this issue is safe to close unless someone can provide a reproducible example of this issue still persisting.