Closed ghostdogpr closed 1 year ago
Looking forward to this feature! 😃
I'm also very interested in this feature. Is there anything I can help to get the existing PR through the finish line?
The CounterMessage.StreamingChange
in the PR is pretty much spot on with the use case I have and I would love to be able to make it that simple 😄
The one thing I didn't care about yet is how to handle interruptions (for example when the remote actor is rebalanced or the pod is stopped).
I also wanted to do more testing but finally got caught up with other things. If you locally build my branch and test it with your own application, that would already be helpful!
Indeed, after I posted my comment above, I saw your comment here which made me realize that when the entity is rebalanced to a different pod, the response stream cannot be easily restarted or restored...
The use case I have in mind is to be able to listen to the changes on an entity (like the CounterMessage.StreamingChange
example) and to send them over a websocket or similar. So the "calling side" is tided to a pod (since it is linked to an HTTP connection) while the "streaming side" (i.e. the entity) may move around because of rebalancing. Also I need to be able to stop the stream of changes from the "calling side" when the HTTP connection closes.
So it seems to me that a "publish/subscribe" kind of solution would be appropriate. That is being able to send a "subscribe" message to the entity which allow the entity to send a stream of responses. We also need to be able to send an "unsubscribe" message to the entity. The entity takes care of maintaining a list of active subscriptions and the subscriptions must be serializable so they can be persisted and restored during rebalance.
So the API could look like:
case class Subscription[-R](subscriber: PodAddress, id: String) { self =>
def publish(replies: ZStream[Any, Nothing, R]): URIO[Sharding, Unit] = ???
}
trait Messenger[-Msg] {
...
def subscribe[Res](entityId: String)(
subscribe: Subscription[Res] => Msg,
unsubscribe: Subscription[Res] => Msg
): Task[ZStream[Any, Throwable, Res]]
}
To make this work, we need to reverse the grpc call to send streaming responses so it is initiated by the pod containing the entity. On the subscriber side, the Sharding
component needs to maintain a list of "active local subscriptions" so it can dispatch the responses. The Sharding
component also takes care of sending the unsubscribe
message automatically when the stream returned by Messenger.subscribe
ends.
I can try to implement that approach if you think that makes sens.
PubSub is already supported, see registerTopic
and Broadcaster
. Doesn't it allow what you want?
The implementation is different though, the broadcaster sends the messages to each node, not just those who listen.
Also the PubSub is more for something that keeps listening forever while replyStream can be stopped when finished.
For dealing with the rebalance issue, I was thinking about having the client pod reconnect, which would then go to the new pod. Now the question is how do "continue" where we left off and avoid resending the whole data through the stream and receive duplicates. Maybe we could have a concept of Cursor that we pass when we reconnect so that the entity has the ability to continue... I need to think about this more.
For my use case, I don't think Broadcaster
is appropriate because I don't need to send the update to all pods. In principle, it could work but that would lead to a lot of unnecessary exchanges. If I have only 2 pods really listening on the entity changes and 20 pods in total then 90% of the broadcasted messages would be useless.
Another possibility that I though of is related to the notion of ReplyChannel
in your PR. What if it was a first class citizen. Basically having a notion of actors located on a pod (not moving around) to which I can send a message from anywhere (in particular from an entity). This starts to really feel like akka and I'm not sure that's the direction you want to go 😅, but I think my idea of subscribing to an entity can be built on top of that and maybe other communication patterns could also be built with it.
I've spent some time playing with your branch and created a livechat demo app which can be found here. I did it as a new module in my fork of shardcake so I could easily make changes and tweaks but I will make that as a standalone demo repo.
Overall I think the streaming response feature works well. The question of restarting the stream after a rebalanced is doable. I came up with this somewhat generic solution: https://github.com/mleclercq/shardcake/blob/6d7204ccd05d4d8212b314d9e0eb0d85f325bcf6/entities/src/main/scala/com/devsisters/shardcake/Messenger.scala#L28-L53
There is one thing I think could be changed: The Replier
should be split in two traits so it is not possible to respond with a single response to a Messenger.sendStream
(or vise-versa).
Is there anything I can help to finalize the PR ?
Apologies for the lack of activity, we are still stuck with zio 1 at work so I haven't got a chance to use this code, and I've been spending my free time working on Caliban 😄
Good to hear that it works well. If you would like to submit a PR targeting my branch with the changes that you are proposing, I will definitely take a look at it. The branch needs a rebase as well. I am currently load testing our project with zio 2 at work so I could potentially try a run with this code once it's finalized to make sure there's no regression for the regular use case.
No worries, we know how things go. I also left that thing on the back burner for 3 months 😅.
I did some extra tests to make sure that if the client-side stop listening for the changes from the entity, then the entity stop sending them. I don't really know how that works but it does 😄.
I've pushed a rebased branch on my fork with some additional changes: https://github.com/mleclercq/shardcake/tree/streaming
The additional changes are: https://github.com/mleclercq/shardcake/compare/9ce471b3c183124e2967a0df800f06090157d9b2...bcc40808a0388bceff52a6d995f45c12b21634e1
Messenger.sendStreamAutoRestart
that automatically restarts the stream on rebalanceReplier
into two, the original Replier
and a StreamReplier
so we can't reply with a single response when a stream is expected or vise-versaSince I rebased your branch, I don't think my branch can be merged into yours but I can open another PR
I rebased my branch so you should be able to make a PR targeting it.
Also I just remembered one point I left aside: currently all streams targeting one pod will use the same ZManagedChannel
, I was wondering if that was okay or if we needed to create more channels.
Here is my PR targeting your branch: https://github.com/devsisters/shardcake/pull/75
What are your concerns about the ZManagedChannel
? That we would have contentions on these channels so streaming responses from different entities would block each others or something like that? I'm not very familiar with zio-grpc or grpc in general...
I did the following test in my "livechat" demo app:
Here are some logs:
replier.sendStream(theResponseStream.tap(e => ZIO.log(s"Sending $e to $replier"))
17:59:15.424 INFO zio-fiber-332 Sending UserWrote(1688680755419,3,alice,guntentag) to StreamReplier(77ebd7ef-733e-4b6f-a100-1b02c8f2668b)
17:59:15.424 INFO zio-fiber-206 Sending UserWrote(1688680755419,3,alice,guntentag) to StreamReplier(7a7ead0e-a467-43ec-be9f-03816fbbc46a)
17:59:15.424 INFO zio-fiber-425 Sending UserWrote(1688680755419,3,alice,guntentag) to StreamReplier(131d5283-8d05-4f16-bd56-6034724ca840)
17:59:15.424 INFO zio-fiber-147 Sending UserWrote(1688680755419,3,alice,guntentag) to StreamReplier(3d2911ca-58ec-446e-b94d-1474cca8ecc5)
17:59:15.425 INFO zio-fiber-252 Sending UserWrote(1688680755419,3,alice,guntentag) to StreamReplier(6203db8f-d61f-4542-b197-8e0a154c852e)
17:59:15.430 INFO zio-fiber-163 Send chat event UserWrote(1688680755419,3,alice,guntentag) in room room1 to alice
17:59:15.430 INFO zio-fiber-115 Send chat event UserWrote(1688680755419,3,alice,guntentag) in room room1 to john
17:59:15.430 INFO zio-fiber-198 Send chat event UserWrote(1688680755419,3,alice,guntentag) in room room1 to paula
17:59:15.433 INFO zio-fiber-265 Send chat event UserWrote(1688680755419,3,alice,guntentag) in room room1 to eugine
17:59:15.439 INFO zio-fiber-345 Send chat event UserWrote(1688680755419,3,alice,guntentag) in room room1 to tom
So a single ZManagedChannel
can handle multiple response streams simultaneously. Not sure if that addresses your concerns. Of course that's not a proper load test so not sure how that scales with hundreds or thousands of simultaneous streams...
@mleclercq that sounds great then!
The idea was suggested during a Zymposium and I think it might be doable. It would basically be a variant
sendStream
that returns a stream and pass aReplier
ofZStream
. We could back it by a gRPC stream for a full streaming experience.