dashbitco / broadway

Concurrent and multi-stage data ingestion and data processing with Elixir
https://elixir-broadway.org
Apache License 2.0
2.44k stars 161 forks source link

Skip `c:handle_message/3` for messages failed in `c:prepare_messages/2` #316

Closed lmarlow closed 2 years ago

lmarlow commented 2 years ago

Before all messages from c:prepare_messages/2 were passed along to c:handle_message/3 unless an exception was raised. Now selectively failed messages will skip c:handle_message/3.

This could be useful when preloading data, but there aren't matching records for each message.

whatyouhide commented 2 years ago

Hey Lee! ๐Ÿ‘‹

So your point is that I would use handle_message/3 only for good messages? The docs for c:prepare_messages/2 is pretty clear that this function must return all messages since they're send over to handle_message/3, so I'm not sure if it's a good idea to not pass around failed messages.

Also, unless Iโ€™m mistaken, this is a breaking change?

lmarlow commented 2 years ago

I guess it is a breaking change according to the last phrase of the docs for the callback.

as handle_message/3 is still called individually for each message afterwards

My thought was to keep handle_message/3 focused on messages that it can do more with. If we already know in prepared_messages/2 that it will fail, let's fail it early. Otherwise we need to write a handle_message/3 clause to just skip failed messages.

def handle_message(_, %{status: {:failed, _}}, _) = message, do: message

If you want to handle messages that did not follow the happy path when preparing them, you would not have to fail them. You could still handle the different cases as you already have to.

Hi Andrea!!! ๐Ÿ‘‹

whatyouhide commented 2 years ago

Yeah, I get the utility of this. Unfortunately, I think it introduces a bit of "subtle" behavior by having a callback that has the ability to change whether the messages are processed or not.

Also, for me the breaking change is a big problem. It's gonna be hard to break this in a way that doesn't make users surprised I think ๐Ÿ˜•

josevalim commented 2 years ago

Furthermore, there is no callback before prepare_messages, which means the messages already arrive as failed and thatโ€™s pretty uncommon.

lmarlow commented 2 years ago

I can easily work with the current implementation. This change would definitely break the current contract from the documentation as @whatyouhide pointed out. Thank you both for taking a look at it.

which means the messages already arrive as failed

@josevalim, I'm not sure I follow what you mean.

My scenario is a list of ids comes into the pipeline. The pipeline tries to load the associated records in one query to replace the id in each message with the actual record. There are cases where there will no longer be a record for each id, which makes me think I should fail it then and there in prepared_messages/2 and have it skip handle_message/3.

Currently, all messages are passed to handle_message/3 after prepared_messages/2 runs without raising an exception. If an exception is raised, all messages are failed and skip handle_message/3. These lines read like a mixture of successful and failed messages could come out of prepare_messages/2

        {prepared_messages, prepared_failed_messages} = maybe_prepare_messages(messages, state)
        {successful_messages, failed_messages} = handle_messages(prepared_messages, [], [], state)
        failed_messages = prepared_failed_messages ++ failed_messages

However, it is either {nonempty_list(Message.t()), []} or {[], nonempty_list(Message.t())}.