faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

Faust commits the wrong offset in case of a gap in acks #312

Closed ekerstens closed 2 years ago

ekerstens commented 2 years ago

Checklist

Steps to reproduce

Expected behavior

Faust should be able to identify a gap between the committed offset and the first acked message to avoid skipping messages. Per the documentation in consumer._new_offset

    # We iterate over it until we find a gap
    # then return the offset before that.
    # For example if acked[tp] is:
    #   1 2 3 4 5 6 7 8 9
    # the return value will be: 10
    # If acked[tp] is:
    #  34 35 36 40 41 42 43 44
    #          ^--- gap
    # the return value will be: 37

Since the above scenario, we never ack every 5th message, the offset should never continue as there is a gap.

Actual behavior

Versions

Traceback/Logs

──────────┴────────┘
2022/05/31 17:15:19.639 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.640 WARNI module=mode.redirect.logging.write Message=2781
2022/05/31 17:15:19.640 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {})
2022/05/31 17:15:19.643 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.643 WARNI module=mode.redirect.logging.write Message=2782
2022/05/31 17:15:19.643 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2781]})
2022/05/31 17:15:19.644 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2783   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:15:19.646 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.646 WARNI module=mode.redirect.logging.write Message=2783
2022/05/31 17:15:19.647 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): []})
2022/05/31 17:15:19.648 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.648 WARNI module=mode.redirect.logging.write Message=2784
2022/05/31 17:15:19.648 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783]})
2022/05/31 17:15:19.649 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.650 WARNI module=mode.redirect.logging.write Message=2785
2022/05/31 17:15:19.650 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784]})
2022/05/31 17:15:19.651 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.651 WARNI module=mode.redirect.logging.write Message=2786
2022/05/31 17:15:19.651 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784]})
2022/05/31 17:15:19.652 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.653 WARNI module=mode.redirect.logging.write Message=2787
2022/05/31 17:15:19.653 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786]})
2022/05/31 17:15:19.824 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.825 WARNI module=mode.redirect.logging.write Message=2788
2022/05/31 17:15:19.825 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787]})
2022/05/31 17:15:19.827 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.827 WARNI module=mode.redirect.logging.write Message=2789
2022/05/31 17:15:19.827 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787, 2788]})
2022/05/31 17:15:19.829 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.829 WARNI module=mode.redirect.logging.write Message=2790
2022/05/31 17:15:19.829 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787, 2788, 2789]})
2022/05/31 17:15:19.835 WARNI module=mode.redirect.logging.write Message=> test.py(222)test_event_loop2()
-> if self.counter == 30:
2022/05/31 17:15:19.835 WARNI module=mode.redirect.logging.write Message=(Pdb)
c
2022/05/31 17:15:50.917 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.917 WARNI module=mode.redirect.logging.write Message=2791
2022/05/31 17:15:50.917 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787, 2788, 2789]})
2022/05/31 17:15:50.919 INFO  module=mode.timers.timers.tick Message=Timer Monitor.sampler woke up too late, with a drift of +31.028453718987294 runtime=1.8850085325539112e-05 sleeptime=32.02845371898729
2022/05/31 17:15:50.921 INFO  module=mode.timers.timers.tick Message=Timer vap_kafka_event.faust.app.VapFaustApp.monitor_reoccuring_stats woke up too late, with a drift of +30.052488100947812 runtime=0.0016850839601829648 sleeptime=40.05248810094781
2022/05/31 17:15:50.921 INFO  module=mode.timers.timers.tick Message=Timer Recovery.stats woke up too late, with a drift of +29.30016235099174 runtime=7.025082595646381e-06 sleeptime=34.30016235099174
2022/05/31 17:15:50.922 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2785   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:15:50.924 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.924 WARNI module=mode.redirect.logging.write Message=2792
2022/05/31 17:15:50.924 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791]})
2022/05/31 17:15:50.925 INFO  module=mode.timers.timers.tick Message=Timer livelock woke up too late, with a drift of +27.104460972943343 runtime=0.0026684380136430264 sleeptime=34.10446097294334
2022/05/31 17:15:50.925 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.925 WARNI module=mode.redirect.logging.write Message=2793
2022/05/31 17:15:50.926 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792]})
2022/05/31 17:15:50.927 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.927 WARNI module=mode.redirect.logging.write Message=2794
2022/05/31 17:15:50.927 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793]})
2022/05/31 17:15:50.928 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.928 WARNI module=mode.redirect.logging.write Message=2795
2022/05/31 17:15:50.928 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794]})
2022/05/31 17:15:50.929 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.929 WARNI module=mode.redirect.logging.write Message=2796
2022/05/31 17:15:50.929 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794]})
2022/05/31 17:15:50.930 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.931 WARNI module=mode.redirect.logging.write Message=2797
2022/05/31 17:15:50.931 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796]})
2022/05/31 17:15:50.932 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.932 WARNI module=mode.redirect.logging.write Message=2798
2022/05/31 17:15:50.932 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797]})
2022/05/31 17:15:50.933 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.933 WARNI module=mode.redirect.logging.write Message=2799
2022/05/31 17:15:50.933 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798]})
2022/05/31 17:15:50.934 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=2800
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799]})
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=> test.py(222)test_event_loop2()
-> if self.counter == 30:
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=(Pdb)
c
2022/05/31 17:16:10.286 INFO  module=mode.timers.timers.tick Message=Timer commit woke up too late, with a drift of +28.317679641931317 runtime=19.386381829041056 sleeptime=31.117679641931318
2022/05/31 17:16:10.286 WARNI module=mode.timers.timers.tick Message=Timer commit is overlapping (interval=2.8 runtime=19.386381829041056)
2022/05/31 17:16:10.288 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.288 WARNI module=mode.redirect.logging.write Message=2801
2022/05/31 17:16:10.288 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799]})
2022/05/31 17:16:10.290 INFO  module=mode.timers.timers.tick Message=Timer Monitor.sampler woke up too late, with a drift of +18.39256372093223 runtime=1.7469050362706184e-05 sleeptime=19.39256372093223
2022/05/31 17:16:10.290 INFO  module=mode.timers.timers.tick Message=Timer Recovery.stats woke up too late, with a drift of +14.390604294952936 runtime=6.500980816781521e-06 sleeptime=19.390604294952936
2022/05/31 17:16:10.293 INFO  module=mode.timers.timers.tick Message=Timer vap_kafka_event.faust.app.VapFaustApp.monitor_reoccuring_stats woke up too late, with a drift of +9.391143504995853 runtime=0.003148776013404131 sleeptime=19.391143504995853
2022/05/31 17:16:10.294 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.294 WARNI module=mode.redirect.logging.write Message=2802
2022/05/31 17:16:10.294 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801]})
2022/05/31 17:16:10.296 INFO  module=mode.timers.timers.tick Message=Timer livelock woke up too late, with a drift of +12.38731794198975 runtime=0.0053144460543990135 sleeptime=19.38731794198975
2022/05/31 17:16:10.296 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.296 WARNI module=mode.redirect.logging.write Message=2803
2022/05/31 17:16:10.296 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802]})
2022/05/31 17:16:10.298 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.298 WARNI module=mode.redirect.logging.write Message=2804
2022/05/31 17:16:10.298 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803]})
2022/05/31 17:16:10.299 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.299 WARNI module=mode.redirect.logging.write Message=2805
2022/05/31 17:16:10.300 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804]})
2022/05/31 17:16:10.301 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.301 WARNI module=mode.redirect.logging.write Message=2806
2022/05/31 17:16:10.301 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804]})
2022/05/31 17:16:10.303 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.303 WARNI module=mode.redirect.logging.write Message=2807
2022/05/31 17:16:10.303 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806]})
2022/05/31 17:16:10.304 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.305 WARNI module=mode.redirect.logging.write Message=2808
2022/05/31 17:16:10.305 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806, 2807]})
2022/05/31 17:16:10.306 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.306 WARNI module=mode.redirect.logging.write Message=2809
2022/05/31 17:16:10.306 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806, 2807, 2808]})
2022/05/31 17:16:10.307 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.308 WARNI module=mode.redirect.logging.write Message=2810
2022/05/31 17:16:10.308 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806, 2807, 2808, 2809]})
2022/05/31 17:16:10.308 WARNI module=mode.redirect.logging.write Message=> test.py(222)test_event_loop2()
-> if self.counter == 30:
2022/05/31 17:16:10.309 WARNI module=mode.redirect.logging.write Message=(Pdb)
c
2022/05/31 17:16:30.163 INFO  module=mode.timers.timers.tick Message=Timer Monitor.sampler woke up too late, with a drift of +18.872945420909673 runtime=1.928501296788454e-05 sleeptime=19.872945420909673
2022/05/31 17:16:30.163 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2790   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:30.164 INFO  module=mode.timers.timers.tick Message=Timer Recovery.stats woke up too late, with a drift of +14.873592204065062 runtime=6.855931133031845e-06 sleeptime=19.873592204065062
2022/05/31 17:16:30.167 INFO  module=mode.timers.timers.tick Message=Timer vap_kafka_event.faust.app.VapFaustApp.monitor_reoccuring_stats woke up too late, with a drift of +9.87149757402949 runtime=0.002534606959670782 sleeptime=19.87149757402949
2022/05/31 17:16:30.168 INFO  module=aiokafka.conn.conn._do_sasl_handshake Message=Authenticated as X3OXJPRL4A7DJDAV via PLAIN
2022/05/31 17:16:30.169 INFO  module=mode.timers.timers.tick Message=Timer livelock woke up too late, with a drift of +12.869328050990589 runtime=0.0044637060491368175 sleeptime=19.86932805099059
2022/05/31 17:16:30.281 INFO  module=mode.timers.timers.tick Message=Timer commit woke up too late, with a drift of +17.07697979202494 runtime=0.11823615396860987 sleeptime=19.87697979202494
2022/05/31 17:16:32.983 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2795   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:35.971 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2800   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:38.740 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2805   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:41.737 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2810   │
└────────────────────────────────────────────────────────────┴────────┘
zerafachris commented 2 years ago

@ekerstens Use stream.noack().noack_take(1,1): instead of events(): as events is bugged.

ekerstens commented 2 years ago

@zerafachris can you share any issues or details about the issue with events()?

ostetsenko commented 2 years ago

Looks like about that too: https://github.com/faust-streaming/faust/pull/208

zerafachris commented 2 years ago

@ekerstens The issue with events is that upon restart, the offset of the event causing the failure gets skipped and does not process. For example, consider that you have the following:

Topic Source Payloads:

{customer_id: 1, details: "something_1"}
{customer_id: 2, details: "something_2"}
{customer_id: 3, details: "something_3"}
{customer_id: 4, details: "something_4"}
{customer_id: 5, details: "something_5"}
{customer_id: 6, details: "something_6"}
{customer_id: 7, details: "something_7"}
{customer_id: 8, details: "something_8"}
{customer_id: 9, details: "something_9"}

If these are sent to the following agent:

@app.agent(topic_source, sink=[topic_sink])
async def process(stream):
    async for record in stream.noack().event():
           if record.value.get('customer_id') == 5:
                      raise exception
            yield record
            await stream.ack(record)

The agent will stop processing at customer_id 5. If you comment out the if-exception block, the app should continue processing and there should be a total of 9 unique distant payloads in the sink topic. However, what happens is that payload for 'customer_id' == 5 will get ignored upon restart.

A workaround for this is to use noack_take(1, 1). The agent becomes:

@app.agent(topic_source, sink=[topic_sink])
async def process(stream):
    async for record in stream.noack().noack_take(1, 1):
          new_record = record[0]
           if new_record.value.get('customer_id') == 5:
                      raise exception
            yield new_record
            await stream.ack(record[0])
ekerstens commented 2 years ago

@zerafachris is there a separate issue open for this? #312 is fixed by #313 and can be closed. I'd like to understand more about events vs. noack_take but am not sure it's relevant to this ticket.

zerafachris commented 2 years ago

@ekerstens I did not open a different issue for this. If you do open one, let me know and I can contribute to it.

ekerstens commented 2 years ago

Fixed by https://github.com/faust-streaming/faust/pull/313. @zerafachris I've separately created https://github.com/faust-streaming/faust/issues/315 to discuss events vs noack_take

ekerstens commented 2 years ago

Any stream processors using stream.noack() should be aware of a potential change which is required to upgrade to faust-streaming==0.8.5 due to this fix. Previously, since unacked messages were eventually committed anyway if a message with a higher offset was acked, unacked messages could go unnoticed. However, as a side effect this means that if a processor fails to ack any event, then the processor will never commit an offset past that point. For any apps with a flow like

def process(stream):
  async for event in stream.noack().events():
    do_something(event)
    event.ack()

the code has to change to something like

def process(stream):
  async for event in stream.noack().events():
    try:
      do_something(event)
    finally:
      event.ack()

or

async for event in stream.events():
  async with event:
    do_something(event)

or some other similar pattern. Otherwise, in case of an exception during do_something the event would never be acked, in which case offsets will never progress past that point.