danieldg / ordered-stream

Apache License 2.0
11 stars 2 forks source link

Join remains Pending until both streams yield an item #12

Open zeenix opened 1 year ago

zeenix commented 1 year ago

This code hangs:

            let stream1 = FromStream::with_ordering(pending::<Message>(), |m| m.serial);
            let stream2 = FromStream::with_ordering(
                futures_util::stream::once(Box::pin(async { Message { serial: 1 } })),
                |m| m.serial,
            );
            let mut joined = join(stream1, stream2);
            let msg = joined.next().await.unwrap();
            assert_eq!(msg.serial, 1);
zeenix commented 1 year ago

@danieldg Is this intentional? :thinking:

danieldg commented 1 year ago

@danieldg Is this intentional?

Yes, this was intentional. The fix (and therefore the 0.1.3 and 0.1.4 releases) will produce a stream with items out of order if one of the producing streams is being processed by another task or thread.

zeenix commented 1 year ago

Yes, this was intentional. The fix (and therefore the 0.1.3 and 0.1.4 releases) will produce a stream with items out of order if one of the producing streams is being processed by another task or thread.

But this breaks zbus. I want items in order but that doesn't mean I want things to just hang forever to make it happen. I could add a timeout in the client but that's a bad workaround.

zeenix commented 1 year ago

Also now that you're not the only one maintaining/developing this, please don't push directly to master and only via PRs, giving me a chance to have a look.

zeenix commented 1 year ago

Yes, this was intentional. The fix (and therefore the 0.1.3 and 0.1.4 releases) will produce a stream with items out of order if one of the producing streams is being processed by another task or thread.

I don't understand something: how will one of the streams be polled by another task if Join has take over all its streams and does the polling?

danieldg commented 1 year ago

Example: Task 1: zbus socket reader, produces Message events on a broadcast channel B Task 2: reads from B, filters, and sends messages to a new mpsc channel C Task 3: filters B to produce D, then runs join(C, D)

Assume task 1 produces two messages M2 and M3: M3 is accepted by the filter in task 3, and M2 is accepted by the filter in task 2. If task 3 runs prior to task 2, then the join will observe D is ready (M3 is present) but C is Pending as task 2 has not yet run.

In this instance, the Join must wait until C either produces an item, because once task 2 runs, M2 will be available in C - but it will be Pending until that happens. This is the main difference between Pending and NoneBefore - only if C has returned NoneBefore is safe to return M3.

zeenix commented 1 year ago

Thanks for the example. I thought about it on the tram and realized that the issue is multiple producers. In case of zbus, this doesn't happen and we've a single producer. I guess this means, that within zbus I don't need to use UnorderedStream even.

danieldg commented 1 year ago

I think zbus shouldn't run into this as a visible problem, since any stream it provides should be correctly returning NoneBefore on any message serial numbers that have been sent.

zeenix commented 1 year ago

Well, you can run cargo test in zbus with git master of ordered-stream and see at least 3 tests falling. Did you read my comments on the related PR?

zeenix commented 1 year ago

Here is the problematic code.

danieldg commented 1 year ago

I've found the problem: because of how Subscriptions now works in zbus, the MessageStream is not able to update its last_seq field to match the last message considered. As a result, it never returns NoneBefore and so ordered-stream is correct in continuing to wait.

Fixing this is a bit trickier because you need to communicate new sequence numbers to all interested listeners, not just those whose rule matches. I'll take a shot at doing that.

zeenix commented 1 year ago

Fixing this is a bit trickier because you need to communicate new sequence numbers to all interested listeners, not just those whose rule matches.

That would be very unfortunate and beat one of the main points of the new match rule based API. As I wrote before, I'm not even sure we need this in zbus since we only have 1 producer running in a single task.

zeenix commented 1 year ago

Fixing this is a bit trickier because you need to communicate new sequence numbers to all interested listeners, not just those whose rule matches.

That would be very unfortunate and beat one of the main points of the new match rule based API. As I wrote before, I'm not even sure we need this in zbus since we only have 1 producer running in a single task.

guicostaarantes commented 1 year ago

Thanks for the discussion @danieldg and @zeenix. My use case for this crate is that I'm dealing with an event-driven system and I need to consume messages (the payloads of the events) from multiple Kafka topics in order to generate a database whose state is the accumulation of all events. But they need to be consumed orderly based on the Kafka timestamp rather than in a race, otherwise I risk getting an incorrect state.

With the current behavior, I can only consume an event once all streams have a message ready. This is good because in case one stream has an issue with connection, the others will wait and I'll be sure events are always being consumed in the right order. The not so good news is that I'll never reach a state where all my events are consumed. If one topic has 1,000 messages ready and the other one doesn't receive a message in a week, all 1,000 events will not be in the database during this week.

Both behaviors have advantages, but based on the fact that this crate is oriented towards guaranteeing the correct order of items, my understanding is that waiting for all to complete is what most users will expect and like. I'm trying to fix my scenario by making the event streams return Option<KafkaMessage> instead of KafkaMessage, where receiving None indicates the connection is healthy but there are no new messages available.

danieldg commented 1 year ago

@guicostaarantes that is actually one of the use-cases that I considered when making this crate. My best thought for a solution for the "long-idle stream" problem is to have each stream produce periodic (hourly or whatever) heartbeat messages that serve to show that the stream is alive but nothing is ready to be sent. If the original stream is adapted using FromStream and heartbeat messages are filtered out by OrderedStreamExt::filter, this produces the behavior you describe without actually making the heartbeats visible in the combined stream.