robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

how to manual commit offset #191

Closed hys20151008 closed 5 years ago

hys20151008 commented 5 years ago

I would like to processing message from kafka broker, and I don't want to missing or re-read message from kafka, so I want to manual commit offset when program execute sucess。I try to set acks=Falsein faust.App.topic,but when I set like that faust will not commit offset,even if call app.comit(topics=None).so I ask for help what should i do for that situation. thinks!

pabloariasmora commented 5 years ago

@hys20151008 where you able to fix the issue? I'm having the same problem.

seifertm commented 5 years ago

I ended up with the following code:

@app.agent(topic)
async def process(stream):
    async for event in stream.noack().events():
        async with event:
            …
        # Event is acked after leaving the context

Stream.noack() creates a new stream that does not automatically commit[1] events. To manually commit events, you can use the stream.ack(Event) coroutine or use the event's context manager.

The event context manager is mentioned in the streams docs

[1] In fact, Faust has an abstraction for message commits, hence the name "acks". The difference is explained somewhere in the docs.

zailaib commented 3 years ago

stream.noack().takes(n)

tejasa97 commented 2 years ago

I ended up with the following code:

@app.agent(topic)
async def process(stream):
    async for event in stream.noack().events():
        async with event:
            …
        # Event is acked after leaving the context

Stream.noack() creates a new stream that does not automatically commit[1] events. To manually commit events, you can use the stream.ack(Event) coroutine or use the event's context manager.

The event context manager is mentioned in the streams docs

[1] In fact, Faust has an abstraction for message commits, hence the name "acks". The difference is explained somewhere in the docs.

@seifertm This still doesn't work for me, i.e: the message doesn't auto commit after leaving the context. Also, stream.ack(Event) doesn't work either

seifertm commented 2 years ago

@tejasa97 I wish I could help out, but I'm no longer a Faust user, sorry.

tejasa97 commented 2 years ago

No worries thanks for the response. What did you switch to, might I ask?

seifertm commented 2 years ago

I ended up using Kafka client libraries rather than a framework.

There is also a maintained Faust fork, but I have not looked into it: https://github.com/faust-streaming/faust

tejasa97 commented 2 years ago

@seifertm do you mean the kafka-python library? the KafkaConsumer class? we're also contemplating using this instead of faust, any pointers you can give us?

seifertm commented 2 years ago

@tejasa97 The project that used Faust is no longer acitvely used, so I never had to migrate away from Faust. It was the other way round: we used a hand-crafted solution and later switched to Faust.

kafka-python is a pure Python package and therefore easy to handle as a dependency. Faust itself used a fork of aiokafa. You might also look at confluent-kafka which is a based on the C/C++ library librdkafka. confluent-pythonis probably the fastest and most fully featured Kafka client implementations. I'd check the development trajectories of the individual libraries, test them, and see what works best for your use case.

I hope this helps. Feel free to contact me via e-mail.