robinhood / faust

Python Stream Processing
Other
6.74k stars 533 forks source link

Global Table doesn't sync completely in the leader worker during bootup if the workers starts processing new messages before fetching all the old ones #580

Open DhruvaPatil98 opened 4 years ago

DhruvaPatil98 commented 4 years ago

Checklist

Steps to reproduce

  1. Use the below script for app and bring up 2 or more workers:
    
    from faust import App

app = App( 'app_main', broker='kafka://kafka:9094', store='rocksdb://', )

PARTITITONS = 4

event_topic = app.topic( 'event_topic_write', internal=True, partitions=PARTITITONS, key_serializer='json', value_serializer='json', )

event_table = app.GlobalTable( 'event_table', partitions=PARTITITONS, )

@app.agent(event_topic) async def event_topic_write(streams): async for payload in streams.events(): print(f'Got data: {payload}') event_table[payload.key] = payload.value

@app.timer(interval=2) async def table_length(): print(f'Number of keys: {len(event_table)}')

if name == 'main': app.main()


2. Load 20,000 events into the topic using python-kafka. I used a simple python script like below:
```py
import json
import random
import string

from kafka import KafkaProducer

producer_instance = KafkaProducer(
    bootstrap_servers=['kafka:9094'],
)

def random_string(string_length=10):
    """Generate a random string of fixed length """
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(string_length))

total_keys = 20000
for _ in range(total_keys):
    key_bytes = bytes(json.dumps(random_string()), encoding='utf-8')
    value_bytes = bytes(json.dumps('test'), encoding='utf-8')

    topic_name = 'event_topic_write'

    producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
    producer_instance.flush()
    print(key_bytes)
print(f'Added {total_keys} keys')
  1. After the workers have processed the messages, bring down both the workers and delete it's local directory with the rocksdb data

  2. With the faust workers down, use the same script used in 2 to generate 10,000 more keys

  3. Bring the faust workers back up and wait until they process the new messages

Expected behavior

All workers show the same total number of keys in the global table

Actual behavior

In my testing, the table in the leader worker has consistently (~4 times out of 5) been out of sync with the other workers. The other workers seem to sync up with no issues in all my runs. The number of keys out of sync (i.e.,the keys that the table in the leader doesn't have) varies, but it has gone upto a 1000 keys. This doesn't seem to sync up over time either.

Full traceback

Worker 1 (Leader):
[2020-04-23 08:12:42,395] [50] [WARNING] Number of keys: 29965
Worker 2:
[2020-04-23 08:12:27,031] [27] [WARNING] Number of keys: 30000

Versions

mevinbabuc commented 4 years ago

@ask any idea, what is going on here ?