robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Messages are not submitted into topic with cleanup policy set to "compact" #696

Open Vasiliy-Bondarenko opened 3 years ago

Vasiliy-Bondarenko commented 3 years ago

Checklist

Steps to reproduce

Hello. When i create topic with cleanup policy set to "compact" (in confluent cloud) and start producing messages into them i get an error:

 FutureMessage exception was never retrieved
future: <FutureMessage finished exception=UnknownError()> 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/faust/topics.py", line 442, in _on_published
    res: RecordMetadata = fut.result()
kafka.errors.UnknownError: [Error -1] UnknownError

setting the topic to "delete" policy resolves the issue. this is not how i expect faust to work :))))

Expected behavior

i believe faust should not care about compaction policy.

Actual behavior

none of the messages actually appear in the topic - it's empty exception thrown output: http://joxi.ru/4AkQejZTkePK5m

Replicate

test code to replicate the issue:

test_topic = app.topic("test", value_serializer='json')
@app.timer(interval=1.0)
async def test_producer():
    await test_topic.send(
        value="{}",
        key=None,
    )
    print(f"item created")

Versions

fonty422 commented 1 year ago

Did you ever find a resolution for this? For some reason, I've just started getting the same error when I attempted to introduce log compaction, but it's only an issue for one topic. For me it appears to be only on the topic that I produce to from an app that also happens to be the only consumer - other topics are not affected, even the topics that I use agents to produce an enriched message to.

fonty422 commented 1 year ago

It took a little bit of time to realise this, but for me it was that it was sent without any key, which I think log compaction requires. You sending a key=None might be the same issue.