faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.61k stars 180 forks source link

Behaviour of noack() - events always being acknowledged #189

Closed karlokr-p closed 2 years ago

karlokr-p commented 3 years ago

Checklist

Steps to reproduce

I am running an agent similar to the following:

@app.agent(notification_topic)
async def downtime_notification(stream):
    async for event in stream.noack().events():

        logger.info(f"Notification received with id={event.value.id}, retry={event.value.retry}")

        if int(event.value.id) % 2 == 0:
            logger.info(f"Acknoledge {event.value.id}")
            await stream.ack(event)
        else:
            raise Exception("not acknowledged") 

        yield event

This is slightly different than what I intend to run for production, but in essence its the same. In the above example I am manually sending messages to the topic and only want to ack the message if the message id is divisible by 2.

You can see the full code for this test scenario here: https://github.com/panevo/faust-retry-experiment

Expected behavior

I am expecting that any message that does not have id divisible by 2 to raise an exception and not be acknowledged, and thus to be re-processed by the agent since the offset should not change

Actual behavior

All messages are always acknowledged. Exception is raised for id's indivisible by 2, but is not retried by the agent

Versions

davidfarrugia commented 3 years ago

I am also experiencing this behaviour. I am currently using this solution. It would be cool if we can have it implemented.

karlokr-p commented 2 years ago

@davidfarrugia When you're using this solution, what is the behavior of the agent when a message is not acknowledged? I am expecting the agent to continuously re-process the latest event which has not been acknowledged but I'm not seeing this behavior, instead I just see that the event is processed once but I don't see that the event is processed again.

I am trying to use this code to manually acknowledge requests depending on the response code of an HTTP request so that the HTTP request can be retried until success, but I can't seem to get the noack_take() solution to work, it seems like events are still being acknowledged. I have tried with the following simple code in my agent:

async def test_noack_agent(stream):
    async for events in stream.noack().noack_take(1, within=1):
        for event in events:
            app.logger.info("this should repeat indefinitely")

but I dont see the logging message being repeated, any advice? It is my understanding that the event should be continuously reprocessed if it is not acknowledged. Am I misunderstanding something?

Thank you very much for the help

zerafachris commented 2 years ago

Hey @karlokr-p, I am a developer with @davidfarrugia. The implementation presented does not handle the repetition. You would need to handle the repetition via code. You could do this before the yield via:

karlokr-p commented 2 years ago

Hi @zerafachris, thank you so much for the help, this solved my issue!

ostetsenko commented 2 years ago

@zerafachris @karlokr-p @mdrago98

Hello, sorry for the noise... does it work using async for event in stream.noack().events() now? What does @zerafachris' implementation from the description above look like?

mishranik commented 2 years ago

@ostetsenko were you able to get the working solution for Noack streams? It still seems to acknowledge the event even when not marked acknowledge.

karlokr-p commented 2 years ago

Hi @ostetsenko and @mishranik, sorry I didnt see your messages sooner!

Here's some boilerplate code showing our solution:

@app.agent(some_topic)
async def broadcast_message(stream):
    """Broadcast message"""
    async for events in stream.noack().noack_take(10, within=1):
        for event in events:

            message = get_message(event.value)
            url = get_url(event.value)

            while not event.acked:
                try:
                    timeout = aiohttp.ClientTimeout(total=settings.DEFAULT_HTTP_TIMEOUT_S)
                    async with aiohttp.ClientSession(json_serialize=ujson.dumps, timeout=timeout) as session:
                        async with session.post(
                            url=url,
                            json=message,
                        ) as response:
                            if response.status == 200:
                                app.logger.info(f"Message sent successfully")
                                await stream.ack(event)
                                event.acked = True
                                yield event.value
                            elif response.status == 400:
                                app.logger.error(f"Bad request, continue")
                                await stream.ack(event)
                                event.acked = True
                            else:
                                await asyncio.sleep(0.5)  # sleep 1s so we do not send too many request at once
                                app.logger.warning(f"Received HTTP {response.status} from {url}. Retrying")
                            yield event.value
                except Exception as error:
                    app.logger.error("Uknown error, try again")
                    yield event.value
                    await asyncio.sleep(1)

            yield event.value

This has worked for us in testing and production

ostetsenko commented 2 years ago

@karlokr-p Looks good. What happens with yield event.value / How and when it will be reprocessed? Will it be reprocessed in the next iteration of "async for events in stream.noack().noack_take(10, within=1):" loop?

gpkc commented 1 year ago

For me, the noack solution is working. To reprocess it I added the processing_guarantee="exactly_once" setting to the Faust app though. I am not sure why this is the case, since this seems to just change the isolation level of the consumers, and I am not using Faust or transactions for producing events.