robinhood / faust

Python Stream Processing
Other
6.7k stars 538 forks source link

GlobalTable not working when using multiple instances #751

Open Hamdiovish opened 2 years ago

Hamdiovish commented 2 years ago

Checklist

Steps to reproduce

Running the following app in 2 instances, then by pushing a message to one partition, the instance listening to the related partition detect the message and update the GlobalTable, whilst the other instance will not update its version of the GlobalTable and keep showing nothing.

import faust
from faust.cli import option

app = faust.App(
    'hello-world',
    broker='kafka://localhost:29092',
    key_serializer='raw',
    value_serializer='raw',
    store="rocksdb://", 
    topic_disable_leader=True,
    )

greetings_topic = app.topic('greetings-topic',partitions=2,key_type=str, value_type=str,internal=True)
greetings_table = app.GlobalTable('greetings-table',partitions=2,key_type=str, value_type=str,default=str)

@app.agent(greetings_topic)
async def greet(greetings):
    """run: faust -A app worker"""
    async for event in greetings.events():
        k = event.key
        v = event.value
        print(f"processing: {k}: {v} on partition: {event.message}")
        greetings_table[k]=v

@app.command(
    option('--k', type=str, default='a',help='The key.'),
    option('--v', type=str, default='0',help='The value.'))
async def sim_greeting(self, k, v, p):
    """simulate: faust -A app01 sim-greeting --k a --v 0"""
    await greetings_topic.send(key=k,value=v)

@app.timer(interval=1.0)
async def plot(app):
    for k in greetings_table:
        print(f'[{k}:{greetings_table[k]}]')

Expected behavior

All instances should detect GlobalTable updates.

Actual behavior

The GlobalTable is not synced across instances.

Versions