robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

When relative_to_field is used with a datetime field delta windows are always empty #223

Open mstump opened 5 years ago

mstump commented 5 years ago

Checklist

Steps to reproduce

import asyncio
import faust
import random
import string
import uuid
import json
import dateutil.parser

from datetime import datetime, timedelta
from faust import windows
from faust.serializers import codecs
from typing import Any

class PageView(faust.Record, serializer='page_view_json'):
    id: str = None
    user: str = None
    occurred_at: datetime = None

    def _jsonSupport(*args):

        class JSONSupport(codecs.Codec):

            def _dumps(self, obj: Any) -> bytes:
                obj["occurred_at"] = obj["occurred_at"].isoformat()
                return json.dumps(obj).encode()

            def _loads(self, s: bytes) -> Any:
                obj = json.loads(s)
                obj["occurred_at"] = dateutil.parser.parse(obj["occurred_at"])
                return PageView(**obj)

        codecs.register('page_view_json', JSONSupport())

    _jsonSupport()

start_time = datetime.utcnow()
ttl = 4

app = faust.App(
    'page_views',
    broker='kafka://localhost:9092',
    topic_partitions=4,
)

page_view_topic = app.topic('page_views', value_type=PageView)

active_users_table = app.Table(
    'active_users',
    default=None).tumbling(
        ttl,
        expires=timedelta(seconds=30),
        key_index=True
    ).relative_to_field(PageView.occurred_at)

@app.timer(interval=2, on_leader=True)
async def generator():
    user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
    page_view = PageView(str(uuid.uuid4()), user, datetime.utcnow())
    await page_view_topic.send(value=page_view)

@app.agent(page_view_topic)
async def print_windowed_events(stream):
    async for page_view in stream:
        active_users_table[page_view.user] = page_view.occurred_at

        print('-- New Event (every 2 secs) --')
        print(f"seconds since start: {page_view.occurred_at - start_time}")
        print(f"{repr([k for (k,v) in active_users_table.items()])}")
        print(f"{repr([k for (k,v) in active_users_table.items().delta(ttl)])}")

Expected behavior

Delta window should include items from time window at the specified delta.

Actual behavior

When window.relative_to_field is used items for delta windows are always empty.

[2018-11-23 23:19:51,171: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:51,171: WARNING]: seconds since start: -1 day, 23:57:36.936266
[2018-11-23 23:19:51,172: WARNING]: ['D599WCN']
[2018-11-23 23:19:51,172: WARNING]: []
[2018-11-23 23:19:51,783: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:51,784: WARNING]: seconds since start: 0:00:18.157651
[2018-11-23 23:19:51,784: WARNING]: ['7PU9N73']
[2018-11-23 23:19:51,784: WARNING]: []
[2018-11-23 23:19:53,813: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:53,814: WARNING]: seconds since start: 0:00:20.163283
[2018-11-23 23:19:53,814: WARNING]: ['MCPRP7E']
[2018-11-23 23:19:53,815: WARNING]: []
[2018-11-23 23:19:55,782: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:55,783: WARNING]: seconds since start: 0:00:22.164461
[2018-11-23 23:19:55,783: WARNING]: ['MCPRP7E', '1834GWX']
[2018-11-23 23:19:55,783: WARNING]: []
[2018-11-23 23:19:57,789: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:57,789: WARNING]: seconds since start: 0:00:24.170478
[2018-11-23 23:19:57,789: WARNING]: ['BKZ9XTO']
[2018-11-23 23:19:57,790: WARNING]: []
[2018-11-23 23:19:59,789: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:19:59,790: WARNING]: seconds since start: 0:00:26.171417
[2018-11-23 23:19:59,790: WARNING]: ['BKZ9XTO', 'SB8SCH4']
[2018-11-23 23:19:59,790: WARNING]: []
[2018-11-23 23:20:01,793: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:20:01,793: WARNING]: seconds since start: 0:00:28.175309
[2018-11-23 23:20:01,793: WARNING]: ['OH389W1']
[2018-11-23 23:20:01,794: WARNING]: []
[2018-11-23 23:20:03,798: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:20:03,798: WARNING]: seconds since start: 0:00:30.180400
[2018-11-23 23:20:03,799: WARNING]: ['OH389W1', 'B0AQNYM']
[2018-11-23 23:20:03,799: WARNING]: []
[2018-11-23 23:20:05,804: WARNING]: -- New Event (every 2 secs) --
[2018-11-23 23:20:05,804: WARNING]: seconds since start: 0:00:32.185951
[2018-11-23 23:20:05,804: WARNING]: ['7EKFGZR']
[2018-11-23 23:20:05,805: WARNING]: []

Versions

mstump commented 5 years ago

It turns out that this is only reproducible if the field is a datetime and not if the field is a float.

The code snip-it below works as anticipated.

import asyncio
import faust
import random
import string
import uuid
import time

from faust import windows

class PageView(faust.Record):
    id: str = None
    user: str = None
    occurred_at: float = None

start_time = time.time()
ttl = 4

app = faust.App(
    'page_view_windows',
    broker='kafka://localhost:9092',
    topic_partitions=4,
)

page_view_topic = app.topic('page_views', value_type=PageView)

active_users_table = app.Table(
    'active_users',
    default=None).tumbling(
        ttl,
        expires=timedelta(seconds=30),
        key_index=True
    ).relative_to_field(PageView.occurred_at)

@app.timer(interval=2, on_leader=True)
async def generator():
    user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
    page_view = PageView(str(uuid.uuid4()), user, time.time())
    await page_view_topic.send(value=page_view)

@app.agent(page_view_topic)
async def print_windowed_events(stream):
    async for page_view in stream:
        active_users_table[page_view.user] = page_view.occurred_at

        print('-- New Event (every 2 secs) --')
        print(f"seconds since start: {page_view.occurred_at - start_time}")
        print(f"{len(active_users_table.keys())}, {repr([k for (k,v) in active_users_table.items()])}")
        print(f"{repr([k for (k,v) in active_users_table.items().delta(ttl)])}")
ask commented 5 years ago

Note: You can have datetimes in json models using the isodates option like this:

class PageView(faust.Record, isodates=True, serializer='json'):
    id: str = None
    user: str = None
    occurred_at: datetime = None
mstump commented 5 years ago

Thanks for the note about isodates, missed that in the docs until I started reading the model code.

smboy commented 5 years ago

@mstump could you please post your page_view_json data? I'm trying to refer your example to build an example process. Wondering how the structure of your incoming data is.

forsberg commented 3 years ago

@mstump - does the problem go away if you disable Cython? You can do so by setting the environment variable NO_CYTHON=True