robinhood / faust

Python Stream Processing
Other
6.73k stars 533 forks source link

Windows Ranges Miscalculated #171

Open jddsc opened 6 years ago

jddsc commented 6 years ago

Checklist

Steps to reproduce

  1. docker-compose.yml
version: "2"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - '39092:39092'
      - '29092:29092'
      - '9092:9092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:39092,PLAINTEXT_OUT_OF_DOCKER://localhost:9092,PLAINTEXT_INTRA_DOCKER://kafka:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_OUT_OF_DOCKER:PLAINTEXT,PLAINTEXT_INTRA_DOCKER:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-manager:
    image: sheepkiller/kafka-manager:latest
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
      - kafka
    environment:
      ZK_HOSTS: zookeeper:32181
      KM_ARGS: -Djava.net.preferIPv4Stack=true
  1. Run the following script

    import datetime
    import os
    import random
    import time
    import uuid
    
    import faust
    import requests
    from datetime import timedelta, datetime
    from typing import Set
    
    os.environ["TZ"] = "UTC"
    time.tzset()
    
    class User(faust.Record, isodates=True):
       name: str = None
       username: str = None
       random_interaction: str = None
       random_duration: int = 0
       timestamp: datetime = None
    
    class RawUser(faust.Record, isodates=True):
       name: str
       username: str
       timestamp: datetime = None
       random_id: str = None
       timestamp_epoch: float = None
    
    app = faust.App('exploration-app',
                   broker='kafka://localhost:9092',
                   key_serializer='raw')
    
    topic = app.topic('exploration-topic', value_type=RawUser)
    
    views = app.Table('views', default=int).hopping(
       size=timedelta(minutes=2),
       expires=timedelta(minutes=2),
       step=timedelta(seconds=30)
    )
    
    @app.agent(topic)
    async def process(stream):
       async for event in stream.events():
           message = event.message
           topic = event.message.topic
           partition = event.message.partition
           offset = event.message.offset
    
           key_deserialized = event.key
           value_deserialized = event.value
    
           views[value_deserialized.username] += 1
    
           timer_aux = time.time()
    
           with open('somefile.txt', 'a') as the_file:
                the_file.write('Message_TS: '+str(message.timestamp)+'\t ACTUAL_TS: '+str(timer_aux)+"\t --->"+str(list(views[value_deserialized.username].items()))+'\n')
    
    @app.timer(interval=1.0)
    async def example_sender(app):
       # r = requests.get('https://jsonplaceholder.typicode.com/users/{}'.format(str(random.randint(1, 10))))
       r = requests.get('https://jsonplaceholder.typicode.com/users/{}'.format(str(random.randint(1, 1))))
       json_request = r.json()
    
       curr_date = datetime.now() - timedelta(minutes=3)
    
       await process.send(
           value=RawUser(name=json_request['name'],
                         username=json_request['username'],
                         random_id=str(uuid.uuid4()),
                         timestamp_epoch=time.time(),
                         timestamp=curr_date)
       )
    
    if __name__ == '__main__':
       app.main()
    
    1. Evaluate somefile.txt 1st line:

      Message_TS: 1537975684.608 ACTUAL_TS: 1537975750.541059

      [(('Bret', WindowRange(start=1537975590.0, end=1537975710.0)), 1), (('Bret', WindowRange(start=1537975620.0, end=1537975740.0)), 1), (('Bret', WindowRange(start=1537975650.0, end=1537975770.0)), 1), (('Bret', WindowRange(start=1537975680.0, end=1537975800.0)), 1), (('Bret', WindowRange(start=1537975710.0, end=1537975830.0)), 1), (('Bret', WindowRange(start=1537975740.0, end=1537975860.0)), 1), (('Bret', WindowRange(start=1537975770.0, end=1537975890.0)), 1)]

Expected behavior

I am expecting an incrementation only in WindowRange(s) that matches the message.timestamp:

[(('Bret', WindowRange(start=1537975590.0, end=1537975710.0)), 1), (('Bret', WindowRange(start=1537975620.0, end=1537975740.0)), 1), (('Bret', WindowRange(start=1537975650.0, end=1537975770.0)), 1), (('Bret', WindowRange(start=1537975680.0, end=1537975800.0)), 1), ....

Actual behavior

All WindowRange(s) are incremented.

Further Analysis

from faust.windows import HoppingWindow

window = HoppingWindow(step=30, size=120, expires=180)

window.ranges(1000)

# Produces: [WindowRange(start=900.0, end=1020.0), WindowRange(start=930.0, end=1050.0), WindowRange(start=960.0, end=1080.0), WindowRange(start=990.0, end=1110.0), WindowRange(start=1020.0, end=1140.0), WindowRange(start=1050.0, end=1170.0), WindowRange(start=1080.0, end=1200.0)]

window._timestamp_window(1001)

# Produces WindowRange(start=990, end=1110)

.ranges is producing windows where the timestamp is not included. The .ranges code seems to assume that _timestamp_window would return the “older” window that contains the timestamp (in this case it would be WindowRange(start=900.0, end=1020.0)) and is therefore generating more ranges than expected.

Versions

vineetgoel commented 6 years ago

Thanks for reporting this. You are right, this is indeed a bug.

The last range should indeed be WindowRange(start=990, end=1110) but the problem is that for start in range(int(earliest), int(curr.end), int(self.step)) loops until 1110. We need to fix this to be min(int(curr.end), int(timestamp)).