faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.67k stars 183 forks source link

The persisted offset for changelog topic partition is higher than the last offset in that topic (highwater) #158

Open berimbolo13 opened 3 years ago

berimbolo13 commented 3 years ago

Steps to reproduce

I cannot reproduce it manually but it happens in the production environment sometimes.

We have a single node environment that runs the solution via docker-compose. Kafka cluster contains only one broker. During log compaction inside kafka something went wrong and kafka was trying to remove segment twice then went down for 5 mins, then rebooted:

[2021-05-25 10:55:15,301] INFO [ProducerStateManager partition=faust_processing_app-cycle_stats_table-changelog-0] Writing producer snapshot at offset 613179 (kafka.log.ProducerStateManager)
[2021-05-25 10:55:15,301] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Rolled new log segment at offset 613179 in 0 ms. (kafka.log.Log)
[2021-05-25 10:55:15,302] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Scheduling segments for deletion List(LogSegment(baseOffset=613137, size=65912207, lastModifiedTime=1621940108000, largestTime=1621940108723)) (kafka.log.Log)
[2021-05-25 10:55:15,302] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Incrementing log start offset to 613179 (kafka.log.Log)
[2021-05-25 10:56:15,302] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Deleting segments List(LogSegment(baseOffset=613137, size=65912207, lastModifiedTime=1621940108000, largestTime=1621940108723)) (kafka.log.Log)
[2021-05-25 10:56:15,307] INFO Deleted log /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613137.log.deleted. (kafka.log.LogSegment)
[2021-05-25 10:56:15,307] INFO Deleted offset index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613137.index.deleted. (kafka.log.LogSegment)
[2021-05-25 10:56:15,307] INFO Deleted time index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613137.timeindex.deleted. (kafka.log.LogSegment)
[2021-05-25 10:57:23,529] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-05-25 11:00:15,372] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Found deletable segments with base offsets [613179] due to retention time 1000ms breach (kafka.log.Log)
[2021-05-25 11:00:15,372] INFO [ProducerStateManager partition=faust_processing_app-cycle_stats_table-changelog-0] Writing producer snapshot at offset 613221 (kafka.log.ProducerStateManager)
[2021-05-25 11:00:15,372] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Rolled new log segment at offset 613221 in 1 ms. (kafka.log.Log)
[2021-05-25 11:00:15,372] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Scheduling segments for deletion List(LogSegment(baseOffset=613179, size=65732273, lastModifiedTime=1621940408000, largestTime=1621940408695)) (kafka.log.Log)
[2021-05-25 11:00:15,372] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Incrementing log start offset to 613221 (kafka.log.Log)
[2021-05-25 11:01:15,372] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Deleting segments List(LogSegment(baseOffset=613179, size=65732273, lastModifiedTime=1621940408000, largestTime=1621940408695)) (kafka.log.Log)
[2021-05-25 11:01:15,378] INFO Deleted log /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613179.log.deleted. (kafka.log.LogSegment)
[2021-05-25 11:01:15,378] INFO Deleted offset index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613179.index.deleted. (kafka.log.LogSegment)
[2021-05-25 11:01:15,379] INFO Deleted time index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613179.timeindex.deleted. (kafka.log.LogSegment)
[2021-05-25 11:05:15,523] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Found deletable segments with base offsets [613221] due to retention time 1000ms breach (kafka.log.Log)
[2021-05-25 11:05:15,523] INFO [ProducerStateManager partition=faust_processing_app-cycle_stats_table-changelog-0] Writing producer snapshot at offset 613263 (kafka.log.ProducerStateManager)
[2021-05-25 11:05:15,523] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Rolled new log segment at offset 613263 in 0 ms. (kafka.log.Log)
[2021-05-25 11:05:15,524] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Scheduling segments for deletion List(LogSegment(baseOffset=613221, size=65722697, lastModifiedTime=1621940708000, largestTime=1621940708697)) (kafka.log.Log)
[2021-05-25 11:05:15,524] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Incrementing log start offset to 613263 (kafka.log.Log)
[2021-05-25 11:06:15,524] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Deleting segments List(LogSegment(baseOffset=613221, size=65722697, lastModifiedTime=1621940708000, largestTime=1621940708697)) (kafka.log.Log)
[2021-05-25 11:06:15,530] INFO Deleted log /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613221.log.deleted. (kafka.log.LogSegment)
[2021-05-25 11:06:15,530] INFO Deleted offset index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613221.index.deleted. (kafka.log.LogSegment)
[2021-05-25 11:06:15,530] INFO Deleted time index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613221.timeindex.deleted. (kafka.log.LogSegment)
[2021-05-25 11:07:23,529] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-05-25 11:07:51,512] INFO [Log partition=high.freq.analog.json-0, dir=/kafka/kafka-logs] Found deletable segments with base offsets [108871703] due to retention time 86400000ms breach (kafka.log.Log)
[2021-05-25 11:07:51,513] INFO [Log partition=high.freq.analog.json-0, dir=/kafka/kafka-logs] Scheduling segments for deletion List(LogSegment(baseOffset=108871703, size=1073634369, lastModifiedTime=1621854324000, largestTime=1621854321538)) (kafka.log.Log)
[2021-05-25 11:07:51,513] INFO [Log partition=high.freq.analog.json-0, dir=/kafka/kafka-logs] Incrementing log start offset to 108928787 (kafka.log.Log)
[2021-05-25 11:08:51,513] INFO [Log partition=high.freq.analog.json-0, dir=/kafka/kafka-logs] Deleting segments List(LogSegment(baseOffset=108871703, size=1073634369, lastModifiedTime=1621854324000, largestTime=1621854321538)) (kafka.log.Log)
[2021-05-25 11:08:51,627] INFO Deleted log /kafka/kafka-logs/high.freq.analog.json-0/00000000000108871703.log.deleted. (kafka.log.LogSegment)
[2021-05-25 11:08:51,628] INFO Deleted offset index /kafka/kafka-logs/high.freq.analog.json-0/00000000000108871703.index.deleted. (kafka.log.LogSegment)
[2021-05-25 11:08:51,628] INFO Deleted time index /kafka/kafka-logs/high.freq.analog.json-0/00000000000108871703.timeindex.deleted. (kafka.log.LogSegment)
[2021-05-25 11:08:56,366] INFO [ProducerStateManager partition=faust_processing_app-lfa_validation_table-changelog-0] Writing producer snapshot at offset 221435823 (kafka.log.ProducerStateManager)
[2021-05-25 11:08:56,366] INFO [Log partition=faust_processing_app-lfa_validation_table-changelog-0, dir=/kafka/kafka-logs] Rolled new log segment at offset 221435823 in 0 ms. (kafka.log.Log)
[2021-05-25 11:09:01,603] INFO [Log partition=faust_processing_app-lfa_validation_table-changelog-0, dir=/kafka/kafka-logs] Scheduling segments for deletion List(LogSegment(baseOffset=221150403, size=1073732969, lastModifiedTime=1621940936000, largestTime=1621940936363)) (kafka.log.Log)
[2021-05-25 11:10:01,603] INFO [Log partition=faust_processing_app-lfa_validation_table-changelog-0, dir=/kafka/kafka-logs] Deleting segments List(LogSegment(baseOffset=221150403, size=1073732969, lastModifiedTime=1621940936000, largestTime=1621940936363)) (kafka.log.Log)
[2021-05-25 11:10:01,656] INFO [Log partition=faust_processing_app-lfa_validation_table-changelog-0, dir=/kafka/kafka-logs] Found deletable segments with base offsets [221150403] due to retention time 60000ms breach (kafka.log.Log)
[2021-05-25 11:10:01,656] INFO [Log partition=faust_processing_app-lfa_validation_table-changelog-0, dir=/kafka/kafka-logs] Scheduling segments for deletion List(LogSegment(baseOffset=221150403, size=35116812, lastModifiedTime=1621940936000, largestTime=1621940936363)) (kafka.log.Log)
[2021-05-25 11:10:01,656] INFO [Log partition=faust_processing_app-lfa_validation_table-changelog-0, dir=/kafka/kafka-logs] Incrementing log start offset to 221435823 (kafka.log.Log)
[2021-05-25 11:10:01,711] INFO Deleted log /kafka/kafka-logs/faust_processing_app-lfa_validation_table-changelog-0/00000000000221150403.log.deleted. (kafka.log.LogSegment)
[2021-05-25 11:10:01,711] INFO Deleted offset index /kafka/kafka-logs/faust_processing_app-lfa_validation_table-changelog-0/00000000000221150403.index.deleted. (kafka.log.LogSegment)
[2021-05-25 11:10:01,712] INFO Deleted time index /kafka/kafka-logs/faust_processing_app-lfa_validation_table-changelog-0/00000000000221150403.timeindex.deleted. (kafka.log.LogSegment)
[2021-05-25 11:10:16,706] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Found deletable segments with base offsets [613263] due to retention time 1000ms breach (kafka.log.Log)
[2021-05-25 11:10:16,707] INFO [ProducerStateManager partition=faust_processing_app-cycle_stats_table-changelog-0] Writing producer snapshot at offset 613305 (kafka.log.ProducerStateManager)
[2021-05-25 11:10:16,707] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Rolled new log segment at offset 613305 in 0 ms. (kafka.log.Log)
[2021-05-25 11:10:16,707] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Scheduling segments for deletion List(LogSegment(baseOffset=613263, size=65725411, lastModifiedTime=1621941008000, largestTime=1621941008719)) (kafka.log.Log)
[2021-05-25 11:10:16,707] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Incrementing log start offset to 613305 (kafka.log.Log)
[2021-05-25 11:11:01,657] INFO [Log partition=faust_processing_app-lfa_validation_table-changelog-0, dir=/kafka/kafka-logs] Deleting segments List(LogSegment(baseOffset=221150403, size=35116812, lastModifiedTime=1621940936000, largestTime=1621940936363)) (kafka.log.Log)
[2021-05-25 11:11:01,660] INFO Deleted log /kafka/kafka-logs/faust_processing_app-lfa_validation_table-changelog-0/00000000000221150403.log.deleted. (kafka.log.LogSegment)
[2021-05-25 11:11:01,660] INFO Failed to delete offset index /kafka/kafka-logs/faust_processing_app-lfa_validation_table-changelog-0/00000000000221150403.index.deleted because it does not exist. (kafka.log.LogSegment)
[2021-05-25 11:11:01,660] INFO Failed to delete time index /kafka/kafka-logs/faust_processing_app-lfa_validation_table-changelog-0/00000000000221150403.timeindex.deleted because it does not exist. (kafka.log.LogSegment)
[2021-05-25 11:11:16,707] INFO [Log partition=faust_processing_app-cycle_stats_table-changelog-0, dir=/kafka/kafka-logs] Deleting segments List(LogSegment(baseOffset=613263, size=65725411, lastModifiedTime=1621941008000, largestTime=1621941008719)) (kafka.log.Log)
[2021-05-25 11:11:16,714] INFO Deleted log /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613263.log.deleted. (kafka.log.LogSegment)
[2021-05-25 11:11:16,714] INFO Deleted offset index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613263.index.deleted. (kafka.log.LogSegment)
[2021-05-25 11:11:16,714] INFO Deleted time index /kafka/kafka-logs/faust_processing_app-cycle_stats_table-changelog-0/00000000000000613263.timeindex.deleted. (kafka.log.LogSegment)
waiting for kafka to be ready
[Configuring] 'inter.broker.listener.name' in '/opt/kafka/config/server.properties'
Excluding KAFKA_HOME from broker config
[Configuring] 'message.max.bytes' in '/opt/kafka/config/server.properties'
[Configuring] 'port' in '/opt/kafka/config/server.properties'
[Configuring] 'auto.create.topics.enable' in '/opt/kafka/config/server.properties'
[Configuring] 'advertised.listeners' in '/opt/kafka/config/server.properties'
[Configuring] 'listener.security.protocol.map' in '/opt/kafka/config/server.properties'
[Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
Excluding KAFKA_VERSION from broker config
[Configuring] 'listeners' in '/opt/kafka/config/server.properties'
[Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
[Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
[Configuring] 'offsets.topic.replication.factor' in '/opt/kafka/config/server.properties'
waiting for kafka to be ready
waiting for kafka to be ready
[2021-05-25 11:16:02,390] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-05-25 11:16:04,664] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2021-05-25 11:16:05,057] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2021-05-25 11:16:05,093] INFO starting (kafka.server.KafkaServer)
[2021-05-25 11:16:05,194] INFO Connecting to zookeeper on zookeeper:2181 (kafka.server.KafkaServer)
[2021-05-25 11:16:05,382] INFO [ZooKeeperClient Kafka server] Initializing a new session to zookeeper:2181. (kafka.zookeeper.ZooKeeperClient)
[2021-05-25 11:16:05,431] INFO Client environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT (org.apache.zookeeper.ZooKeeper)
[2021-05-25 11:16:05,431] INFO Client environment:host.name=39ba66c4306e (org.apache.zookeeper.ZooKeeper)
[2021-05-25 11:16:05,431] INFO Client environment:java.version=1.8.0_212 (org.apache.zookeeper.ZooKeeper)
[2021-05-25 11:16:05,431] INFO Client environment:java.vendor=IcedTea (org.apache.zookeeper.ZooKeeper)
[2021-05-25 11:16:05,431] INFO Client environment:java.home=/usr/lib/jvm/java-1.8-openjdk/jre (org.apache.zookeeper.ZooKeeper)

Faust was restarted automatically but faced the next exception:

[2021-05-25 11:12:01,631] [1] [INFO] Running lfa window validation with key = TT-011506, start = 2021-05-25 11:10:00, end = 2021-05-25 11:10:59.900000 
[2021-05-25 11:18:44,829] [1] [INFO] [^---Agent: proj[.]process_strict_rpm_values]: Starting... 
[2021-05-25 11:18:44,829] [1] [INFO] [^----OneForOneSupervisor: (1@0x7f895ee42d50)]: Starting... 
[2021-05-25 11:18:44,830] [1] [INFO] [^---Conductor]: Starting... 
[2021-05-25 11:18:44,830] [1] [INFO] [^--TableManager]: Starting... 
[2021-05-25 11:18:44,830] [1] [INFO] [^---Conductor]: Waiting for agents to start... 
[2021-05-25 11:18:44,831] [1] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2021-05-25 11:18:45,832] [1] [INFO] [^---Table: cycle_stats_table]: Starting... 
[2021-05-25 11:18:45,842] [1] [INFO] [^----Store: rocksdb:cycle_stats_table]: Starting... 
[2021-05-25 11:18:45,842] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-cycle_stats_table-changelog' 
[2021-05-25 11:18:45,845] [1] [INFO] [^---Table: last_processed_data_quality_timestamp_values]: Starting... 
[2021-05-25 11:18:45,845] [1] [INFO] [^----Store: rocksdb:last_processed_data_quality_timestamp_values]: Starting... 
[2021-05-25 11:18:45,846] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-last_processed_data_quality_timestamp_values-changelog' 
[2021-05-25 11:18:45,848] [1] [INFO] [^---Table: hfa_window_state_table]: Starting... 
[2021-05-25 11:18:45,848] [1] [INFO] [^----Store: rocksdb:hfa_window_state_table]: Starting... 
[2021-05-25 11:18:45,849] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-hfa_window_state_table-changelog' 
[2021-05-25 11:18:45,852] [1] [INFO] [^---Table: lfa_validation_table]: Starting... 
[2021-05-25 11:18:45,852] [1] [INFO] [^----Store: rocksdb:lfa_validation_table]: Starting... 
[2021-05-25 11:18:45,852] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-lfa_validation_table-changelog' 
[2021-05-25 11:18:45,855] [1] [INFO] [^---Table: digital_edges_table]: Starting... 
[2021-05-25 11:18:45,855] [1] [INFO] [^----Store: rocksdb:digital_edges_table]: Starting... 
[2021-05-25 11:18:45,856] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-digital_edges_table-changelog' 
[2021-05-25 11:18:45,859] [1] [INFO] [^---Table: two_last_rpm_table]: Starting... 
[2021-05-25 11:18:45,859] [1] [INFO] [^----Store: rocksdb:two_last_rpm_table]: Starting... 
[2021-05-25 11:18:45,859] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-two_last_rpm_table-changelog' 
[2021-05-25 11:18:45,862] [1] [INFO] [^---Recovery]: Starting... 
[2021-05-25 11:18:45,862] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-hfa_window_state_table-changelog' 
[2021-05-25 11:18:45,865] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-lfa_validation_table-changelog' 
[2021-05-25 11:18:45,872] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-cycle_stats_table-changelog' 
[2021-05-25 11:18:45,879] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-last_processed_data_quality_timestamp_values-changelog' 
[2021-05-25 11:18:45,890] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-digital_edges_table-changelog' 
[2021-05-25 11:18:45,892] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-two_last_rpm_table-changelog' 
[2021-05-25 11:18:45,894] [1] [INFO] [^--Producer]: Creating topic 'faust_processing_app-__assignor-__leader' 
[2021-05-25 11:18:45,897] [1] [INFO] Updating subscribed topics to: 
+Requested Subscription-------------------------------------------------------+
| topic name                                                                  |
+-----------------------------------------------------------------------------+
| faust_processing_app-__assignor-__leader                                    |
| faust_processing_app-cycle_stats_table-changelog                            |
| faust_processing_app-digital_edges_table-changelog                          |
| faust_processing_app-hfa_window_state_table-changelog                       |
| faust_processing_app-last_processed_data_quality_timestamp_values-changelog |
| faust_processing_app-lfa_validation_table-changelog                         |
| faust_processing_app-two_last_rpm_table-changelog                           |
| high.freq.analog.json                                                       |
| high.freq.digital.json                                                      |
| low.freq.analog.json                                                        |
+-----------------------------------------------------------------------------+ 
[2021-05-25 11:18:45,898] [1] [INFO] Subscribed to topic(s): 
+Final Subscription-----------------------------------------------------------+
| topic name                                                                  |
+-----------------------------------------------------------------------------+
| faust_processing_app-__assignor-__leader                                    |
| faust_processing_app-cycle_stats_table-changelog                            |
| faust_processing_app-digital_edges_table-changelog                          |
| faust_processing_app-hfa_window_state_table-changelog                       |
| faust_processing_app-last_processed_data_quality_timestamp_values-changelog |
| faust_processing_app-lfa_validation_table-changelog                         |
| faust_processing_app-two_last_rpm_table-changelog                           |
| high.freq.analog.json                                                       |
| high.freq.digital.json                                                      |
| low.freq.analog.json                                                        |
+-----------------------------------------------------------------------------+ 
[2021-05-25 11:18:45,913] [1] [INFO] Discovered coordinator 1 for group faust_processing_app 
[2021-05-25 11:18:45,913] [1] [INFO] Revoking previously assigned partitions set() for group faust_processing_app 
[2021-05-25 11:18:45,914] [1] [INFO] (Re-)joining group faust_processing_app 
[2021-05-25 11:18:45,923] [1] [INFO] Joined group 'faust_processing_app' (generation 5) with member_id faust-0.6.1-eade0099-d6bf-42c7-9158-0252b902cb0f 
[2021-05-25 11:18:45,923] [1] [INFO] Elected group leader -- performing partition assignments using faust 
[2021-05-25 11:18:45,935] [1] [INFO] Successfully synced group faust_processing_app with generation 5 
[2021-05-25 11:18:45,935] [1] [INFO] Setting newly assigned partitions 
+Topic Partition Set----------------------------------------------------------+------------+
| topic                                                                       | partitions |
+-----------------------------------------------------------------------------+------------+
| faust_processing_app-__assignor-__leader                                    | {0}        |
| faust_processing_app-cycle_stats_table-changelog                            | {0}        |
| faust_processing_app-digital_edges_table-changelog                          | {0}        |
| faust_processing_app-hfa_window_state_table-changelog                       | {0}        |
| faust_processing_app-last_processed_data_quality_timestamp_values-changelog | {0}        |
| faust_processing_app-lfa_validation_table-changelog                         | {0}        |
| faust_processing_app-two_last_rpm_table-changelog                           | {0}        |
| high.freq.analog.json                                                       | {0}        |
| high.freq.digital.json                                                      | {0}        |
| low.freq.analog.json                                                        | {0}        |
+-----------------------------------------------------------------------------+------------+ for group faust_processing_app 
[2021-05-25 11:18:45,936] [1] [INFO] Executing _on_partitions_assigned 
[2021-05-25 11:18:45,973] [1] [INFO] opening partition 0 for gen id 5 app id 5 
[2021-05-25 11:18:46,211] [1] [INFO] opening partition 0 for gen id 5 app id 5 
[2021-05-25 11:18:46,394] [1] [INFO] opening partition 0 for gen id 5 app id 5 
[2021-05-25 11:18:46,538] [1] [INFO] opening partition 0 for gen id 5 app id 5 
[2021-05-25 11:18:46,680] [1] [INFO] opening partition 0 for gen id 5 app id 5 
[2021-05-25 11:18:46,816] [1] [INFO] opening partition 0 for gen id 5 app id 5 
[2021-05-25 11:18:46,942] [1] [INFO] generation id 5 app consumers id 5 
[2021-05-25 11:18:47,626] [1] [INFO] [^---Recovery]: Highwater for active changelog partitions:
+Highwater - Active-----------------------------------------------------------+-----------+-----------+
| topic                                                                       | partition | highwater |
+-----------------------------------------------------------------------------+-----------+-----------+
| faust_processing_app-cycle_stats_table-changelog                            | 0         | 613304    |
| faust_processing_app-digital_edges_table-changelog                          | 0         | 5409638   |
| faust_processing_app-hfa_window_state_table-changelog                       | 0         | 119278    |
| faust_processing_app-last_processed_data_quality_timestamp_values-changelog | 0         | 4201717   |
| faust_processing_app-lfa_validation_table-changelog                         | 0         | 221443996 |
| faust_processing_app-two_last_rpm_table-changelog                           | 0         | 61506     |
+-----------------------------------------------------------------------------+-----------+-----------+ 
[2021-05-25 11:18:49,127] [1] [INFO] [^---Recovery]: active offsets at start of reading:
+Reading Starts At - Active---------------------------------------------------+-----------+-----------+
| topic                                                                       | partition | offset    |
+-----------------------------------------------------------------------------+-----------+-----------+
| faust_processing_app-cycle_stats_table-changelog                            | 0         | 613304    |
| faust_processing_app-digital_edges_table-changelog                          | 0         | 5409638   |
| faust_processing_app-hfa_window_state_table-changelog                       | 0         | 119278    |
| faust_processing_app-last_processed_data_quality_timestamp_values-changelog | 0         | 4201717   |
| faust_processing_app-lfa_validation_table-changelog                         | 0         | 221444246 |
| faust_processing_app-two_last_rpm_table-changelog                           | 0         | 61506     |
+-----------------------------------------------------------------------------+-----------+-----------+ 
[2021-05-25 11:18:49,127] [1] [ERROR] [^---Recovery]: Crashed reason=ConsistencyError("The persisted offset for changelog topic partition TP(topic='faust_processing_app-lfa_validation_table-changelog', partition=0) is higher\nthan the last offset in that topic (highwater) (221444246 > 221443996).\n\nMost likely you have removed data from the topics without\nremoving the RocksDB database file for this partition.\n") 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 413, in _restart_recovery
    active_highwaters[tp],
faust.exceptions.ConsistencyError: The persisted offset for changelog topic partition TP(topic='faust_processing_app-lfa_validation_table-changelog', partition=0) is higher
than the last offset in that topic (highwater) (221444246 > 221443996).

Most likely you have removed data from the topics without
removing the RocksDB database file for this partition.

[2021-05-25 11:18:49,128] [1] [INFO] [^Worker]: Stopping... 
[2021-05-25 11:18:49,129] [1] [INFO] [^-App]: Stopping... 
[2021-05-25 11:18:49,129] [1] [INFO] [^---Fetcher]: Stopping... 
[2021-05-25 11:18:49,129] [1] [INFO] [^-App]: Wait for streams... 
[2021-05-25 11:18:49,129] [1] [INFO] [^--TableManager]: Stopping... 

Expected behavior

Faust should start successfully.

Actual behavior

Persisted offset in rocksdb is greater than offset in kafka.

Notes

From my understanding, it could happen due to a single Kafka broker:

  1. Faust sent updates to changelog topic
  2. Faust updated persisted offset in RocksDB
  3. Kafka broker failed before flushing data to disk
  4. Kafka lost unflushed data after restart
  5. Persisted offset in Rocksdb > kafka offset

Did someone face the same issue?

mtjin96 commented 3 years ago

I have the same issue. Also wondering if anyone know how to fix this?

wbarnha commented 2 years ago

I've encountered this a few times when some topics in our Kafka broker behaved erratically. The solution we had was to delete our RocksDB files and rebuild the files. It's not an ideal fix, but it did fix our environment.