robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Faust Tumbling Window not working when using relative_to_field #708

Open alokmenon opened 3 years ago

alokmenon commented 3 years ago

Checklist

Steps to reproduce

When the tumbling window used along with relative_to_field it is not working as expected. The values updated against a key itself are not getting reflected in the same window. It always returns the default value instead of the updated vlaue. When I am not using "relative_to_field" it is working as expected.

import time
from datetime import timedelta

import faust

class Count(faust.Record, serializer='json'):
    user: str = ""
    count: int = 0
    timestamp: float = 0

count_schema = faust.Schema(
    key_type=str,
    value_type=Count,
    # key_serializer='str',
    value_serializer='json'
)

app = faust.App("Test_windowed_new", broker="localhost:9092", store='rocksdb://')
count_topic = app.topic("count_topic", schema=count_schema)

def window_processor(key, event):
    print("Windowing event 1 : {}".format(key[0]) + "Event " + str(event))

tumbling_table = (
    app.Table(
        "test_table_diff_new",
        default=Count,
        key_type=str,
        value_type=Count,
        partitions=1,
        on_window_close=window_processor,
    ).tumbling(5, key_index=True, expires=timedelta(seconds=5)).relative_to_field(Count.timestamp)
)

@app.agent(count_topic)
async def count_message(msgs):
    async for key,msg in msgs.items():
        print("incoming message  "+str(msg) +" for key " + key)
        windowSet = tumbling_table[key]
        prev_count = windowSet.value()
        count=  Count()
        count.count = msg.count + prev_count.count
        count.user=msg.user
        count.timestamp = msg.timestamp
        tumbling_table[key]= count
        print("Going to update :: " + str(count) + " for key " + key + "for log_time_in_sec  " + str(msg.timestamp))
        print("Value after update :: " + str(tumbling_table[key].value()) + " for key " + key)

@app.timer(interval=1.0,  on_leader=True)
async def send_count_kafka():
    count = Count()
    count.user = "test"
    count.count = 100
    count.timestamp =time.time()
    await count_message.send(
        key=str(count.user),
        value=count
    )
    print('Count Message :: send messge')

if __name__ == '__main__':
    print("Test Application getting started")
    app.main()

Expected behavior

Below Print statement where we are printing the latest value from the tumbling window after the update should return the value which has been updated in the previous step "Value after update :: " + str(tumbling_table[key].value()) "

[2021-04-02 17:52:43,709] [29841] [WARNING] Value after update :: <Count: user='test', count=100, timestamp=1617366163.204891>  for key test 

Actual behavior

Below Print statement where we are printing the latest value from the tumbling window after the update is giving the default value instead of the updated value "Value after update :: " + str(tumbling_table[key].value()) "

[2021-04-02 17:52:43,698] [29841] [WARNING] incoming message  <Count: user='test', count=100, timestamp=1617366163.204891> for key test 
[2021-04-02 17:52:43,709] [29841] [WARNING] Going to update :: <Count: user='test', count=100, timestamp=1617366163.204891> for key test for log_time_in_sec  1617366163.204891 
[2021-04-02 17:52:43,709] [29841] [WARNING] Value after update :: <Count: user='', count=0, timestamp=0> for key test 

Versions

benbenbang commented 3 years ago

Hello, how did you install your RocksDB? For me, I got the same issue at the beginning. But after I installed the DB by using brew, everything works fine now.

bobh66 commented 3 years ago

You might want to try the fork at https://github.com/faust-streaming/faust

It has a lot of fixes that have not been applied to this project, which has been unmaintained for a while now.

alokmenon commented 3 years ago

Thank you bobh66