salesforce / mirus

Mirus is a cross data-center data replication tool for Apache Kafka
BSD 3-Clause "New" or "Revised" License
203 stars 43 forks source link

restart task when commit failed #74

Closed YongGang closed 3 years ago

YongGang commented 3 years ago

When task failed to commit offset for an extensive period of time (can be configured by commit.failure.restart.ms), restart task to reestablish Kafka connection. From Kafka WorkerSourceTask class, after each successful poll there will be an offset commit. If offset is committed successfully, the commit method in SourceTask will be called.

So here we compare the time difference between task poll and commit to see whether there are commit failure, then throw exception if the failure lasts for a while.

codecov-commenter commented 3 years ago

Codecov Report

Merging #74 (9941743) into master (4b48244) will increase coverage by 1.11%. The diff coverage is 100.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master      #74      +/-   ##
============================================
+ Coverage     58.00%   59.12%   +1.11%     
- Complexity      177      185       +8     
============================================
  Files            28       28              
  Lines          1055     1079      +24     
  Branches         77       79       +2     
============================================
+ Hits            612      638      +26     
+ Misses          408      407       -1     
+ Partials         35       34       -1     
Impacted Files Coverage Δ
.../salesforce/mirus/config/TaskConfigDefinition.java 92.30% <ø> (ø)
...c/main/java/com/salesforce/mirus/KafkaMonitor.java 69.93% <100.00%> (ø)
...ain/java/com/salesforce/mirus/MirusSourceTask.java 78.41% <100.00%> (+5.53%) :arrow_up:
...java/com/salesforce/mirus/config/SourceConfig.java 85.71% <100.00%> (ø)
...alesforce/mirus/config/SourceConfigDefinition.java 100.00% <100.00%> (ø)
...n/java/com/salesforce/mirus/config/TaskConfig.java 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 4b48244...9941743. Read the comment docs.

YongGang commented 3 years ago

I'm wondering what will happen when there is no new data for a long time, and then a sudden burst of continuous data. I think the task will fail, even though the next commit would likely be successful, because the first call to checkCommitFailure after data starts arriving will trigger the exception (since the async commit is unlikely to happen within that short time window). Do we need to reset the commit time when there is no data after a successful commit?

Updated. Yeah, thought about this case but my understanding was commit offset is sync which is wrong.