dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.06k stars 2.03k forks source link

Direct streaming between grains #7830

Closed bill-poole closed 8 months ago

bill-poole commented 2 years ago

We have a requirement to send streams of messages directly between grains - i.e., one grain sending a stream of messages directly to another grain. Furthermore, we need these streams to be particularly high performance, where the sender does not wait for acknowledgement of one message before sending another message - like with gRPC streaming requests.

We would be happy to use gRPC to stream messages between grains, but that would require being able to establish a gRPC connection from one grain to another, and that doesn't seem possible.

Sending individual messages is too slow given the latency between grains. I recognise that the messages can be batched, but that still requires each batch to be sent and processed one at a time, which means the next batch is not being sent while the previous batch is being processed.

This performance limitation also applies to Orleans streams because each stream event is delivered from the agent to consumers via the standard Orleans grain messaging - i.e., one at a time.

It would be great if a grain interface method could accept and/or return an IAsyncEnumerable<TMessage>, or something along those lines, which Orleans then understood/interpreted as inbound/outbound message streams.

benjaminpetit commented 2 years ago

I have a prototype of a working IAsyncEnumerable<TMessage>, but it is backed by persistent streams.

Sending individual messages is too slow given the latency between grains

Is it the latency between grains the issue, or the time needed to process a request by the target grain? You could use [OneWay] messages to not wait for a response from the target, but it can make error handling difficult.

This performance limitation also applies to Orleans streams because each stream event is delivered from the agent to consumers via the standard Orleans grain messaging - i.e., one at a time.

You can use batching with some streaming providers.

bill-poole commented 2 years ago

Thanks @benjaminpetit. I'm actually looking for the same behaviour as gRPC server streaming, where one grain (grain A) requests a stream of messages from another grain (grain B). Grain B then responds with a stream of messages to grain A as fast as grain A can process them. Furthermore, grain B generates the messages it is sending to grain A as it is sending them. i.e., it is generating messages one one thread and pushing them to grain A on another thread.

Note that batching doesn't achieve this because the whole batch must be generated and sent, then the next batch is generated and sent.

With "server streaming", messages received by grain A are not acknowledged by grain A because they are responses from grain B. If grain A fails to process a message, then it is up to grain A to re-request the message stream.

If grain A were to request the messages from grain B one at a time, then the issue would be the time taken by each roundtrip, which includes the network latency and the time grain B takes to process each request.

I'm therefore considering interleaving requests from grain A to grain B - i.e., sending the next request while still waiting for the response from the previous request. This could meet our performance requirements if the requests are sent from grain A to grain B interleaved over a single connection. Do you see any problems with this approach?

Note that this approach carries some risk that grain A overloads grain B - i.e., grain A will need to limit the number of concurrent requests it sends to grain B. If something akin to gRPC "server streaming" were to be employed, then grain B would send messages as fast as it can, but flow control on the message stream would ensure messages were sent only fast fast as grain A can process them.

This approach of interleaving requests is also less efficient than "server streaming" because a request message must be sent for each response messages; whereas with "server streaming", a single request is sent for an entire stream of messages sent in response.

Note that the approach of interleaving requests only works where grain A knows the list of information it is requesting from grain B - which happens to be the case for me. However, if that were not the case, then grain A would only be able to request the entire message stream, not each message in the stream individually. Therefore, I believe there is still a case for "server streaming" between grains.

Note also that client streaming and bi-directional streaming are also valid scenarios and my proposed approach wouldn't work for those scenarios either.

arnoldp-workstreampeople-com commented 2 years ago

I have a prototype of a working IAsyncEnumerable<TMessage>, but it is backed by persistent streams.

Is there a chance that this prototype will become production ready code and released?

ReubenBond commented 2 years ago

xref #940 & #1916

benjaminpetit commented 2 years ago

I have a prototype of a working IAsyncEnumerable<TMessage>, but it is backed by persistent streams.

Is there a chance that this prototype will become production ready code and released?

Not for 4.0, but hopefully for the next release.


@bill-poole I think "traditional" streaming is still applicable for your use case (especially MemoryStreams).

Note that batching doesn't achieve this because the whole batch must be generated and sent, then the next batch is generated and sent.

If you are using MemoryStreams, the stream publisher can push event one by one, but the consumer can get batches. If B was able to push 3 events while A was processing something, the next OnNextAsync from A can return these 3 events.

Here is how I would do:

You might want to tweak caching if those streams are 1:1 and short lived, but it can also bring some benefits (easy for the consumer to reprocess some events by rewinding the stream for example).

bill-poole commented 2 years ago

If you are using MemoryStreams, the stream publisher can push event one by one, but the consumer can get batches. If B was able to push 3 events while A was processing something, the next OnNextAsync from A can return these 3 events.

This seems similar to gRPC client streaming, where the sender "pushes" messages to the receiver - except, with client streaming:

If a failure occurs, the initiating part re-sends the entire stream.

My use case is more akin to gRPC server streaming (as opposed to client streaming), where one party sends a message to request a stream from another party, and then the requested stream of messages is sent to the requesting party. Like with client streaming, flow control limits the number of messages that have been sent but not yet received/processed.

If a failure occurs, the initiating party re-requests the stream (or part thereof).

If senders/receivers are decoupled with a MemoryStream, then it seems that if a failure occurs, then neither the sender nor receiver can compensate for the failure - i.e., messages are lost. Is that correct? If each message sent by a sender via a stream is not acknowledged as received/processed by the receiver, then the sender cannot know to re-send messages that are not successfully received/processed by the receiver.

To my understanding, any time messages are sent via an intermediary such that the sender only knows about successful receipt by the intermediary (not the end receiving party), then the messages must be held in durable storage by the intermediary if at-least-once delivery is required to the end receiving party.

benjaminpetit commented 2 years ago

When a consumer receives an event with OnNextAsync, it also gets a StreamSequenceToken. This token allows to resume a subscription to a given stream position. Events stay in the cache for a given time, so the subscriber is free to "rewind" if needed, no need to notify the producer to send everything again.

If the subscriber gets a CacheMissException, then it means that some messages may have been lost. In this case, the consumer could send a request to the publisher to send again all the events to a new stream.

ghost commented 2 years ago

Hi @bill-poole We have added the "Needs: Author Feedback" label to this issue, which indicates that we have an open question for you before we can take further action. This issue will be closed automatically in 7 days if we do not hear back from you by then - please feel free to re-open it if you come back to this issue after that time.

ghost commented 2 years ago

Hi @bill-poole We have added the "Needs: Author Feedback" label to this issue, which indicates that we have an open question for you before we can take further action. This issue will be closed automatically in 7 days if we do not hear back from you by then - please feel free to re-open it if you come back to this issue after that time.

ghost commented 2 years ago

We're moving this issue to the 4.0-Planning milestone for future evaluation / consideration. Because it's not immediately obvious that this is a bug in our framework, we would like to keep this around to collect more feedback, which can later help us determine the impact of it. We will re-evaluate this issue, during our next planning meeting(s). If we later determine, that the issue has no community involvement, or it's very rare and low-impact issue, we will close it - so that the team can focus on more important and high impact issues.

bill-poole commented 2 years ago

When a consumer receives an event with OnNextAsync, it also gets a StreamSequenceToken. This token allows to resume a subscription to a given stream position. Events stay in the cache for a given time, so the subscriber is free to "rewind" if needed, no need to notify the producer to send everything again.

Thanks @benjaminpetit. If the stream is not durable (which I understand a MemoryStream is not), then upon a failure of the node hosting the stream, messages would be lost. I'm new at the Orleans/streams architecture so apologies if I've misunderstood.

i.e., the sender grain could send messages into the stream and then complete the operation, persisting a state that says that those messages were sent such that they will never be sent again. The consumer/receiver could keep track of the StreamSequenceToken as it consumes the stream such that if it fails, it can restart from the stream position where it failed. However, if the stream itself fails, then am I correct in assuming the messages would be lost?

benjaminpetit commented 2 years ago

That's correct, you may lose messages in case of failure.

I assumed that in your case, the publisher would enqueue a special message to denote the last event of a stream (OnComplete is there for historical reason, but was never implemented), and the consumer would send a ack when all messages were processed.

If the consumer doesn't get the last message after a given time (or get a cache miss exception), or if the subscriber doesn't receive this ack in a timely manner, you could force a full delivery to another stream maybe?

bill-poole commented 2 years ago

Okay, so yes it is possible that the receiver/consumer grain could send a message back to the sender/producer grain when the stream has been fully consumed. Good point! I hadn't considered that. So you're correct that it could be made to work. But, it would be more complex than direct streaming between grains.

I guess it's somewhat similar to the added complexity of implementing a request-reply interaction through two correlated one-way calls - a request call and a reply callback - rather than as a synchronous request-reply interaction.

If the sender and receiver are decoupled over an unreliable streaming channel with a manual callback/ack method call, then the sender and receiver must both implement a workflow that manually compensates for any failures. However, if the sender and receiver are coupled over a direct connection, then the relevant party is immediately informed when a failure occurs as it occurs, which means the failed operation is naturally immediately retried.

It's also worth noting that streams deliver messages to consuming grains via standard grain message calls (as far as I understand them) - which means that the "last mile" of stream delivery is standard message-by-message method calls, meaning each message is received, processed and acknowledged one at a time. If the stream is hosted on a different silo than the consuming grain, then this "last mile" would be a bottleneck in message delivery performance.

So, I think there would be benefit in grains supporting a streaming interface (i.e., methods accepting/returning IAysncEnumerable<TMessage>), which could be used for more efficient Orleans stream production/consumption - but also for direct grain-to-grain streaming.

ReubenBond commented 8 months ago

Orleans supports returning IAsyncEnumerable<T> from a grain method as of v7.2.0: https://github.com/dotnet/orleans/pull/8416

bill-poole commented 8 months ago

Thanks @ReubenBond!