robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Tumbling window does not work with rocksdb store, until or unless we set NO_CYTHON=True. #705

Closed alokmenon closed 3 years ago

alokmenon commented 3 years ago

Checklist

Steps to reproduce

The same issue has been captured by the original author @ https://gitmemory.com/issue/robinhood/faust/510/582490538. I am also facing the same issue while running in multiple environments. This issue is further discussed under the below link: https://ask.csdn.net/questions/7339836

As soon as I edit the provided window example (https://github.com/robinhood/faust/blob/master/examples/windowed_aggregation.py) to use rocksdb the behavior suddenly changes. The value-function does always return an empty list. (The code is more or less the same as. I just shortened the example for brevity)

from datetime import datetime, timedelta
from time import time
import random
import faust

class RawModel(faust.Record):
    date: datetime
    value: float

TOPIC = 'raw-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 10
WINDOW_EXPIRES = 10
PARTITIONS = 1

app = faust.App('windowed-agg', broker=KAFKA, version=1, topic_partitions=1,
                store='rocksdb://')

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)

def window_processor(key, events):
    print(f'window_processor - events: {len(events)}')

tumbling_table = (
    app.Table(
        TABLE,
        default=list,
        partitions=PARTITIONS,
        on_window_close=window_processor,
    )
    .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
    .relative_to_field(RawModel.date)
)

@app.agent(source)
async def print_windowed_events(stream):
    async for event in stream:
        value_list = tumbling_table['events'].value()
        print(f'print_windowed_events before: {value_list}')
        print(event)
        value_list.append(event)
        tumbling_table['events'] = value_list
        value_list = tumbling_table['events'].value()
        print(f'print_windowed_events after: {value_list}')

@app.timer(0.1)
async def produce():
    await source.send(value=RawModel(value=random.random(), date=int(time())))

if __name__ == '__main__':
    app.main()

Expected behavior

I would expect the output like when using the in memory store:

[2020-01-10 14:52:49,442] [10873] [WARNING] print_windowed_events before: [] [2020-01-10 14:52:49,442] [10873] [WARNING] <RawModel: date=1578664368, value=0.3634263945834183> [2020-01-10 14:52:49,443] [10873] [WARNING] print_windowed_events after: [<RawModel: date=1578664368, value=0.3634263945834183>] [2020-01-10 14:52:49,943] [10873] [WARNING] print_windowed_events before: [<RawModel: date=1578664368, value=0.3634263945834183>] [2020-01-10 14:52:49,943] [10873] [WARNING] <RawModel: date=1578664369, value=0.4364065026849575> [2020-01-10 14:52:49,943] [10873] [WARNING] print_windowed_events after: [<RawModel: date=1578664368, value=0.3634263945834183>, <RawModel: date=1578664369, value=0.4364065026849575>]

Actual behavior

The value method does always return an empty list even when I just added an element. Therefore the window_processor always gets only one event instead of all events in the window.

[2020-01-10 14:51:52,332] [10833] [WARNING] print_windowed_events before: [] 
[2020-01-10 14:51:52,333] [10833] [WARNING] <RawModel: date=1578664311, value=0.1627785852779441> 
[2020-01-10 14:51:52,333] [10833] [WARNING] print_windowed_events after: [] 
[2020-01-10 14:51:52,849] [10833] [WARNING] print_windowed_events before: [] 
[2020-01-10 14:51:52,849] [10833] [WARNING] <RawModel: date=1578664312, value=0.5614135995691765> 
[2020-01-10 14:51:52,850] [10833] [WARNING] print_windowed_events after: [] 

Versions

Python version: 3..8.7 Faust version: 1.9.0 Operating system: Centos 7/ macOS Catalina (10.15.7)/AWS Linux Imag RocksDB version (if applicable): python-rocksdb (0.7.0)

alokmenon commented 3 years ago

Looks like this is a duplicate issue and it has been addressed as per https://github.com/robinhood/faust/issues/510 .