robinhood / faust

Python Stream Processing
Other
6.74k stars 533 forks source link

Different behavior when switching to rocksdb store #510

Open huli opened 4 years ago

huli commented 4 years ago

Checklist

Steps to reproduce

As soon as I edit the provided window example (https://github.com/robinhood/faust/blob/master/examples/windowed_aggregation.py) to use rocksdb the behavior suddenly changes. The value-function does always return an empty list.

(The code is more or less the same as. I just shortened the example for brevity)

from datetime import datetime, timedelta
from time import time
import random
import faust

class RawModel(faust.Record):
    date: datetime
    value: float

TOPIC = 'raw-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 10
WINDOW_EXPIRES = 10
PARTITIONS = 1

app = faust.App('windowed-agg', broker=KAFKA, version=1, topic_partitions=1,
                store='rocksdb://')

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)

def window_processor(key, events):
    print(f'window_processor - events: {len(events)}')

tumbling_table = (
    app.Table(
        TABLE,
        default=list,
        partitions=PARTITIONS,
        on_window_close=window_processor,
    )
    .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
    .relative_to_field(RawModel.date)
)

@app.agent(source)
async def print_windowed_events(stream):
    async for event in stream:
        value_list = tumbling_table['events'].value()
        print(f'print_windowed_events before: {value_list}')
        print(event)
        value_list.append(event)
        tumbling_table['events'] = value_list
        value_list = tumbling_table['events'].value()
        print(f'print_windowed_events after: {value_list}')

@app.timer(0.1)
async def produce():
    await source.send(value=RawModel(value=random.random(), date=int(time())))

if __name__ == '__main__':
    app.main()

Expected behavior

I would expect the output like when using the in memory store:

[2020-01-10 14:52:49,442] [10873] [WARNING] print_windowed_events before: [] [2020-01-10 14:52:49,442] [10873] [WARNING] [2020-01-10 14:52:49,443] [10873] [WARNING] print_windowed_events after: [] [2020-01-10 14:52:49,943] [10873] [WARNING] print_windowed_events before: [] [2020-01-10 14:52:49,943] [10873] [WARNING] [2020-01-10 14:52:49,943] [10873] [WARNING] print_windowed_events after: [, ]

Actual behavior

The value method does always return an empty list even when I just added an element. Therefore the window_processor always gets only one event instead of all events in the window.

[2020-01-10 14:51:52,332] [10833] [WARNING] print_windowed_events before: [] 
[2020-01-10 14:51:52,333] [10833] [WARNING] <RawModel: date=1578664311, value=0.1627785852779441> 
[2020-01-10 14:51:52,333] [10833] [WARNING] print_windowed_events after: [] 
[2020-01-10 14:51:52,849] [10833] [WARNING] print_windowed_events before: [] 
[2020-01-10 14:51:52,849] [10833] [WARNING] <RawModel: date=1578664312, value=0.5614135995691765> 
[2020-01-10 14:51:52,850] [10833] [WARNING] print_windowed_events after: [] 

Let me know if I miss something. I also checked python-rocksdb separately and it seems to work.

Versions

Comment

Thanks a lot for any suggestions on this!

ingenuity2k commented 4 years ago

I can reproduce this. When using store='memory://', the example works as expected.

However, upon using rocksdb as store, I encounter the same behaviour as reported by @huli .

[2020-02-05 17:13:34,489] [23824] [WARNING] print_windowed_events before: []
[2020-02-05 17:13:34,489] [23824] [WARNING] <RawModel: date=1580919214, value=0.23741364623156347>
[2020-02-05 17:13:34,490] [23824] [WARNING] print_windowed_events after: []
[2020-02-05 17:13:34,594] [23824] [WARNING] print_windowed_events before: []
[2020-02-05 17:13:34,594] [23824] [WARNING] <RawModel: date=1580919214, value=0.7504472921876363>
[2020-02-05 17:13:34,595] [23824] [WARNING] print_windowed_events after: []

Versions

SeanZicari commented 4 years ago

Confirmed I have experienced the same thing. I spent a few days debugging other areas of my application over this. The tests pass (using memory) but the app always returns the default value with a single aggregation applied. Does anyone have a workaround they use? It’s been several months and no official word about this bug.

forsberg commented 4 years ago

Are you by any chance getting differing behaviours with Cython enabled/disabled? Cython can be disabled by setting the environment variable NO_CYTHON=True.

I'm experiencing similar behaviour which seems to be related to how the Cython implementation of HoppingWindow returns timestamp. Could be the same bug.

Are the values stored in RocksDB? RocksDB can be installed by installing the ldb tool and then run ldb scan --db=<path to .db directory>, the path can be found when faust worker starts.

taybin commented 4 years ago

@forsberg I'm seeing this as well. Setting NO_CYTHON=True did fix the behavior!

forsberg commented 3 years ago

@taybin - interesting. You may want to try running with the code from #675.