faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.59k stars 177 forks source link

Storing commits on a database #461

Open jamesellis1999 opened 1 year ago

jamesellis1999 commented 1 year ago

Hi,

This is more of a general question than an issue to reproduce, so please pardon me not following the template.

Is it possible to store the offset of a consumer in an external database? Ideally, I would like to commit my offsets with the other data I have in a single database transaction. How would I go about doing this in Faust?

Thanks!

fonty422 commented 1 year ago

Not sure if this helps, but you can get the offset as each event comes into your stream processor, then use that while doing something with the other data too:

@app.agent(some_topic)
async def process_event_with_offset(stream):
  async for event in stream.events():
    # use event.value for your data
    # event.message.offset for the offset of the event
    ....