robinhood / faust

Python Stream Processing
Other
6.74k stars 533 forks source link

GlobalTable doesn't receive messages #541

Open jsurloppe opened 4 years ago

jsurloppe commented 4 years ago

Checklist

Steps to reproduce

import faust

app = faust.App("myapp")

topic = app.topic("mytopic")

gtable = app.GlobalTable("mygtable")

@app.agent(topic)
async def ag(stream):
    async for value in stream:
        ...

@app.timer(1)
async def dump():
    for k, v in gtable.items():
        print(k, v)

Expected behavior

All workers receive all messages from all partitions of the globaltable.

Actual behavior

Pushing data to GlobalTable topic doesn't propagate to every workers. GlobalTable doesn't receive all updates, some workers get but some doesn't, seems pretty random but at least half of them don't.

Versions

DhruvaPatil98 commented 4 years ago

How many workers did you bring up? From what I understand, the tables which have that partition as the active partition don't get the message when fed directly into the changelog topic, and others who have that partition as standby should get it. Also, from what I understand, tables weren't built to send messages to the changelog topic. You use tables like its a dictionary within the workers.

vinaygopalkrishnan commented 3 years ago

Hi,

Any updates on this issue? I am also facing the same issue. Using a Global Table across 3 workers. The Global Table is used to store metadata used for joining in stream processing. The Table is supposed to have 300,000 keys. However, only 1 of the 3 workers has 300,000 keys. The other 2 workers stop reading keys after a certain point. The 2nd worker has 104688 keys and the 3rd worker has only 23,583 keys.

Here is the table definition:

test_changelog_topic = app.topic(f"test-changelog", partitions=1, compacting=True, deleting=False )

lookup_upc = app.GlobalTable('test', default=None, value_type=ApexUpc, partitions=1, changelog_topic=test_changelog_topic, extra_topic_configs= { "cleanup.policy": 'compact', }
)