faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

Faust Tumbling Window not working when there are multiple partitions #172

Closed alokmenon closed 3 years ago

alokmenon commented 3 years ago

Checklist

Steps to reproduce

  1. Create a topic with 4 partitions
  2. Use Faust to create a tumbling window with the desired expiry with partitions defined as 4 (Both for the app and Table)
     WINDOW = 5
    WINDOW_EXPIRES = 1
    PARTITIONS = 4

    app = faust.App('tumbling_window_app_2', broker=KAFKA, version=1, store='rocksdb://', topic_partitions=4, producer_linger_ms=20, producer_acks=1)
    app.conf.table_cleanup_interval = 1
    source = app.topic(TOPIC, value_type=RawModel, partitions=4)

    def window_processor(key, events):
        print(f'processing window: key {key}, dump {events.dumps()}')

    tumbling_table = (
        app.Table(
            TABLE,
            default=RawModel,
            partitions=PARTITIONS,
            on_window_close=window_processor,
            key_type=str,
            value_type=RawModel,

        )
       .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
       .relative_to_field(RawModel.date)
    )
  1. Send messages to each partition with a different key.
  2. After tumbling window expiry you expect to see messages reaching on close window process function for messages sent to each partition

Tell us what you did to cause something to happen.

Created Multiple partitions for the incoming message Topic. The issue does not happen if the topic has only one single partition. It happens only in the case of multiple partitions.

Expected behavior

After tumbling window expiry you expect to see messages reaching on close window process function for messages sent to each partition.

Actual behavior

We are only constantly seeing messages from a single partition getting emitted from the Window Processor and messages in all other partitions are getting completely discarded. For example, in the below example there is the message being emitted to partitions 0 and 1 each second for two locations. The tumbling window duration is 5 seconds and the expiry time is 1 second. You would expect the tumbling window on_window_close function to be for both partition 0 and partition 1 . From the program output, you can easily make out that it is happening only for one partition.

Source Code


          from datetime import datetime, timedelta
          from time import time
          import faust

          class RawModel(faust.Record):
              item: int = 0
              date: float = float(1)

          TOPIC = 'rawevent2'
          TABLE = 'tumbling_table'
          KAFKA = 'kafka://localhost:9092'
          WINDOW = 5
          WINDOW_EXPIRES = 1
          PARTITIONS = 4

          app = faust.App('tumbling_window_app_2', broker=KAFKA, version=1, store='rocksdb://', topic_partitions=4, producer_linger_ms=20, producer_acks=1)
          app.conf.table_cleanup_interval = 1
          source = app.topic(TOPIC, value_type=RawModel, partitions=4)

          def window_processor(key, events):
              print(f'processing window: key {key}, dump {events.dumps()}')

          tumbling_table = (
              app.Table(
                  TABLE,
                  default=RawModel,
                  partitions=PARTITIONS,
                  on_window_close=window_processor,
                  key_type=str,
                  value_type=RawModel,

              )
              .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
              .relative_to_field(RawModel.date)
          )

          @app.agent(source)
          async def print_windowed_events(stream):
              #async for event in stream:
              async for key, event in stream.items():
                  windowSet = tumbling_table.get(key, None)
                  raw_model = windowSet.value()
                  event.item += raw_model.item
                  tumbling_table[key] = event
                  print(f"consumer::: {key},value: {event.item} date ::: {event.date}")

          @app.timer(1)
          async def produce0():
              location_Id = f"location_10"
              raw_model = RawModel(item=1, date=float(time()))
              #print(f"produce::: {location_Id} value: {raw_model.item} date{raw_model.date}")
              await source.send(key=location_Id, partition=0, value=raw_model)

          @app.timer(1)
          async def produce1():
              location_Id = f"location_11"
              raw_model = RawModel(item=1, date=float(time()))
              #print(f"produce::: {location_Id} value: {raw_model.item} date{raw_model.date}")
              await source.send(key=location_Id, partition=1, value=raw_model)

          if __name__ == '__main__':
              app.main()

Application logs:

You can see that as per the application "processing window: key " log should have come consistently every 20 seconds for partition 0 and partition 1 . You can see from the logs that it is happening randomly and that too only for a single partition [2021-07-19 22:38:56,467] [98904] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626714536.461031 [2021-07-19 22:38:56,470] [98904] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626714536.460512 [2021-07-19 22:38:57,466] [98904] [WARNING] consumer::: b'location_11',value: 2 date ::: 1626714537.459011 [2021-07-19 22:38:57,469] [98904] [WARNING] consumer::: b'location_10',value: 2 date ::: 1626714537.45929 [2021-07-19 22:38:58,471] [98904] [WARNING] consumer::: b'location_11',value: 3 date ::: 1626714538.4657621 [2021-07-19 22:38:58,473] [98904] [WARNING] consumer::: b'location_10',value: 3 date ::: 1626714538.465481 [2021-07-19 22:38:59,465] [98904] [WARNING] consumer::: b'location_11',value: 4 date ::: 1626714539.460535 [2021-07-19 22:38:59,468] [98904] [WARNING] consumer::: b'location_10',value: 4 date ::: 1626714539.4607072 [2021-07-19 22:39:00,477] [98904] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626714540.471303 [2021-07-19 22:39:00,481] [98904] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626714540.471112 [2021-07-19 22:39:01,264] [98904] [WARNING] processing window: key (b'location_11', (1626714535.0, 1626714539.9)), dump {'item': 4, 'date': 1626714539.460535, 'faust': {'ns': 'HelloWorld.RawModel'}} [2021-07-19 22:39:01,467] [98904] [WARNING] consumer::: b'location_11',value: 2 date ::: 1626714541.462759 [2021-07-19 22:39:01,470] [98904] [WARNING] consumer::: b'location_10',value: 2 date ::: 1626714541.462971 [2021-07-19 22:39:02,481] [98904] [WARNING] consumer::: b'location_11',value: 3 date ::: 1626714542.475777 [2021-07-19 22:39:02,483] [98904] [WARNING] consumer::: b'location_10',value: 3 date ::: 1626714542.475573 [2021-07-19 22:39:03,473] [98904] [WARNING] consumer::: b'location_11',value: 4 date ::: 1626714543.467694 [2021-07-19 22:39:03,475] [98904] [WARNING] consumer::: b'location_10',value: 4 date ::: 1626714543.467873 [2021-07-19 22:39:04,483] [98904] [WARNING] consumer::: b'location_11',value: 5 date ::: 1626714544.4778671 [2021-07-19 22:39:04,486] [98904] [WARNING] consumer::: b'location_10',value: 5 date ::: 1626714544.4780412 [2021-07-19 22:39:05,478] [98904] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626714545.472276 [2021-07-19 22:39:05,481] [98904] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626714545.472069 [2021-07-19 22:39:06,269] [98904] [WARNING] processing window: key (b'location_11', (1626714540.0, 1626714544.9)), dump {'item': 5, 'date': 1626714544.4778671, '__faust': {'ns': 'HelloWorld.RawModel'}} [2021-07-19 22:39:06,485] [98904] [WARNING] consumer::: b'location_11',value: 2 date ::: 1626714546.4793768 [2021-07-19 22:39:06,487] [98904] [WARNING] consumer::: b'location_10',value: 2 date ::: 1626714546.4795399 [2021-07-19 22:39:07,482] [98904] [WARNING] consumer::: b'location_11',value: 3 date ::: 1626714547.477281 [2021-07-19 22:39:07,486] [98904] [WARNING] consumer::: b'location_10',value: 3 date ::: 1626714547.477118 [2021-07-19 22:39:08,485] [98904] [WARNING] consumer::: b'location_11',value: 4 date ::: 1626714548.4804978 [2021-07-19 22:39:08,488] [98904] [WARNING] consumer::: b'location_10',value: 4 date ::: 1626714548.4807072 [2021-07-19 22:39:09,486] [98904] [WARNING] consumer::: b'location_11',value: 5 date ::: 1626714549.481013 [2021-07-19 22:39:09,488] [98904] [WARNING] consumer::: b'location_10',value: 5 date ::: 1626714549.4808052 [2021-07-19 22:39:10,487] [98904] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626714550.4832761 [2021-07-19 22:39:10,490] [98904] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626714550.483419 [2021-07-19 22:39:11,276] [98904] [WARNING] processing window: key (b'location_11', (1626714545.0, 1626714549.9)), dump {'item': 5, 'date': 1626714549.481013, 'faust': {'ns': 'HelloWorld.RawModel'}}

Versions

Python version : 3.8.8 Faust version : 0.6.9 Operating system : OSX Big Sur 11.2.3 Kafka version : kafka_2.12-2.3.0 RocksDB version : 0.7.0

alokmenon commented 3 years ago

This issue only happens if Robinhood Faust is installed as a dependency. We didn't remove it, we simply updated our dependency to Faust streaming. After removing Robinhood Faust , the application started working as expected. I tired the same code running again using Robinhood Faust by creating a virtual environment, There I am able to consistently reproduce the issue. Looks like this bug exists with Robinhood Faust 1.10.4 . So if you ever hit this issue please make sure you remove Robinhood Faust .