Open prabello opened 4 years ago
@prabello have you tried using Broadway.stop(NameOfYourTopology)
?
Is it possible to change the offset of broadway consumer group to skip some messages or even replay?
I don't think it's possible with the current implementation as the producer will always acknowledge the messages, even when they fail. See Handling failed messages. Maybe implementing a custom acknowledger could work somehow, but I think it can be tricky to make it play nicely with the current producer.
@josevalim no, I may have missed, is there any docs that explain it? What exactly would define my broadway topology?
@msaraiva thanks for pointing it out, in some cases, I just want to stop everything and fix the upstream and having to re-read things on the dead-letter can be cumbersome But I see the reason for this strategy, thanks for the fast reply
@prabello it is the name you give to Broadway.start_link
. Apparently the function is not documented though, we would have to fix that first.
I am also curious on this. According to the docs, Broadway Kafka always acks messages. I don't think that is always the desired behaviour.
In some cases, such as a downstream dependency being unavailable, we may chose to completely stop this consumer and fail over. In other cases, such as bad serialization of this particular message, the current solution is ideal.
What does Broadway.stop()
do in these scenarios?
Broadway.stop
will send a shutdown exit, this will make producers stop fetching and processors to clear their buffers. For the partitions assigned to the processor that has called Broadway.stop
, no more elements will be processed. For the other partitions running concurrently within other processes, they will all flush accordingly first.
For things such as downstream being unavailable, I would rather consider things like retries with back-offs (say 1s, 2s, 5s, 10s). This means that some messages may still go to the dead-letter queue but the processing speed will be quite reduced.
Thank you for the quick reply on that.
So anything that was pre-fetched will be acked too.
Does this mean that broadway_kafka will ack before sending messages to handle_message
?
No! It is always at the end. I have updated the previous comment for clarity. :)
Whew - scared me for a second there!
It's easy for Jared or me to say the right answer is to "stop consuming", but harder to translate that into something actionable. I have 3 thoughts in my head:
Stop the consumer in the supervision tree Pros: Easy Cons: The problematic partition will be reassigned to another pod/node until all are shutdown. That creates another manual step to restart apps or otherwise get consumers started again.
Implement backoff in Broadway itself (looks like it's on the todo list) Pros: Easiest for application devs, looks like it's on the roadmap Cons: An existing ecosystem to migrate
Recursively retry in the business logic, sleeping if backoff is needed. Pros: Requires no changes to broadway Cons: High likelihood of inconsistencies/mistakes
It looks like backoff is on the todo list for broadway - are there previous discussions I could review to see if I might be able to contribute?
The proposed back-off in Broadway is that it is really just a safety net. For example, imagine that in your handle_message
you need to perform 3 tasks. If the task in the middle fails, it would be best to provide retry and backoffs specific to that task. So once it succeeds it goes to the third task.
If the back-off is in Broadway, then the best we can do is to assume it has failed altogether and just slow things down. So it is a safety net.
So my suggestion is to do #3 and potentially add #2 as a safety net.
Perhaps we should add to the docs one very important piece of information: even if handle_failed/2
callback raises/throws the offset will be commited.
As I understand (and I might be saying something dumb here), this is complicated. A common practice is to publish failed messages to another topic/queue so that you can handle it separately. Though, if by any means, publishing fails but commiting the offset succeeds we loose the ability to handle that failed message. Considering Murphy's law: if anything CAN go wrong, it WILL go wrong.
I think the only way to go about this on consumer code is to always wrap whatever code we put in handle_failed/2
with a catch
clause and call Broadway.stop/1
in case it raises/throws otherwise we risk loosing messages on the stream.
@victorolinasc the docs are already explicit about this:
broadway_kafka never stops the flow of the stream, i.e. it will always ack the messages even when they fail.
However, if you think there are other places we can add this note to make it clearer, pull requests are welcome!
I've read that part but somehow considered that handle_failed/2
would not commit if it raised / returned an error. It is the error of a failure rescuing after all (Inception feelings). I'll think about including it in the sentence but I see now that was my misreading the sentence.
Thanks!
I am using the latest version of Broadway Kafka and i do not see any method called Broadway.stop/1
where would this method be defined ?
Broadway.stop
will send a shutdown exit, this will make producers stop fetching and processors to clear their buffers. For the partitions assigned to the processor that has calledBroadway.stop
, no more elements will be processed. For the other partitions running concurrently within other processes, they will all flush accordingly first.
@josevalim as @amacciola says there seems to be no Broadway.stop
function. Is there another way to shut down the producer?
You should be able to call GenServer.stop(BroadwayPIpeline)
but keep in mind that starting and stopping Kafka partitions is asking for trouble because Kafka doesn't deal with nodes quickly going down and coming up well. There is a discussion here: https://github.com/dashbitco/broadway_kafka/issues/47.
@lucacorti i also found that I was shutting down Broadway pipelines when what I really wanted to be doing was suspending the pipelines
You can see our conversation here
https://github.com/dashbitco/broadway_kafka/issues/56
If the goal is to just pause and start pipelines use the solution in that discussion
If the goal is to completely shutdown the pipeline then use Genserver.stop(pipeline_name)
I'm using Broadway.stop(MODULE, :shutdown) to stop the pipeline, but the underlying :brod_sup restarts it after a minute. In the logs, I could see that the restart_type in child's spec is set to permanent - I'm guessing that's the cause. Could you tell if it's the same behavior you see as well or am I missing something?
Logs here - https://github.com/dashbitco/broadway_kafka/issues/86
Btw, this discussion made it clear there are different desired approaches to handling failure. If anyone wants to submit pull requests documenting individual techniques, it will be very welcome!
Full credit to @slashmili based on our conversation on Slack. Just collecting more ideas here.
So there are a few cases where an error can occurs which I already mentioned in the issue.
If you manage to tag a message as failed, your handle_failure
will be called, then you can push the message to some other topic as DLQ.
When pushing the failed messages to the DLQ topic, add a retry_attempts
to the body(if it’s JSON content or you could add a retry_attempts
counter to the header).
Setup another broadway module that consumes the DLQ topic, and does the same action as the original broadway. If it fails bump the retry_attempts
and push it as the new msg to the DLQ.
Do this until retry_attempts
reaches max and then put it in another topic(3rd) topic which needs a human inspection to find out what is wrong with the message. It's not optimal and you’ll have the message duplicated in multiple topics and in DLQ (every time you put it back to DLQ again, it will be a new message)
Using Kafka for two use cases, one is for high throughput, we can't add Oban and database to the flow, deal with DLQ by putting the failed messages into another topic For another use case which is like even driven architecture, we always enqueue the jobs into Oban(on handle_mesasge), and never process any data in broadway!
@yordis i am just double checking but all of this you mentioned can already be done right now correct ? This would not need any new PR against Broadway Kafka lib to support ? Because I am already doing something very similar to what you mentioned, just instead of Kafka for the RetryQueue using RabbitMQ and the RabbitMQ Broadway lib because I don't need the failed messages to persist unless they reach max failed attempts and I push them into the DLQ
@yordis I am just double-checking but all of this you mentioned can already be done right now correct?
Yes
This would not need any new PR against Broadway Kafka lib to support ?
I am not sure what you mean by "supporting".
I am a big fan of https://diataxis.fr/ documentation framework, so if you leave it to me, I would love the Elixir ecosystem to start adding "How-To Guides" to showcase potential implementations like your situation using RabbitMQ, or my situation where we may use Oban for it ...
I am not sure what is the right call here.
For this particular case,
Showcase the way we solve the problems
Correct. This is a specific request for more documentation to be added via PRs. :)
How can I configure kafka_broadway to stop once an error occurs? Once an error occurs, I would like to stop consuming messages and keep the same offset, this may be due to the publisher sending wrong information or something that needs to be changed on my application pipeline.
Right now I'm using the handle_failed to publish into a dead-letter-topic, but its not the ideal behavior for my use case.
Is it possible to change the offset of broadway consumer group to skip some messages or even replay?