akka / akka-edge-rs

Akka Edge support in Rust
https://doc.akka.io/docs/akka-edge/current/
Other
9 stars 1 forks source link

gRPC Produce #14

Closed huntc closed 1 year ago

huntc commented 1 year ago

Produce events using the new consumer API. A new type of handler was introduced that is known as a PendingHandler. These pending handlers are expected to return a result of a future that will be completed when the request is fully handled. Meanwhile, projections can accept the next event envelope to process.

Along the way, the entity manager had to be upgraded to generate timestamps and sequence numbers given that the gRPC producer protocol requires them. We now store the sequence number as a header in the commit log.

I've also qualified the Protobuf types in the consumer instead of importing them, as it then becomes much clearer what is in the realm of Protobuf.

Lastly, for additional quality, I've updated CI to run clippy and tests on the IoT example when in gRPC mode.

TODO (as separate PRs):

johanandren commented 1 year ago

Not sure I understand your question right, but, on the Akka side we only talk about Handler on the consuming side, transformations of the pushed events (for example internal representation -> protobuf messages) are done through a "transformation" (EventProducer.Transformation).

On the consuming side of event producer push the user does not provide a handler, but the events are written directly into the journal. One or more projections/handlers can then be run against the local journal to consume those events.

huntc commented 1 year ago

Not sure I understand your question right, but, on the Akka side we only talk about Handler on the consuming side, transformations of the pushed events (for example internal representation -> protobuf messages) are done through a "transformation" (EventProducer.Transformation).

The proto file states:

  1. Events are stored in the event journal on the producer side.
  2. Producer side starts an Akka Projection which locally tracks its offset and as projection handler pushes events to a consuming EventConsumerService
  3. ...

... which states that there's a handler on the producer side. This is what I was therefore thinking as an approach. Is the proto comment incorrect?

I'll dig further into the Transformation you mention though.

On the consuming side of event producer push the user does not provide a handler, but the events are written directly into the journal. One or more projections/handlers can then be run against the local journal to consume those events.

Yep. That makes sense and lines up with items 3 and 4 of that proto comment.

patriknw commented 1 year ago

which states that there's a handler on the producer side

yes, that is correct. The handler is implemented by the library, not by the end user.

huntc commented 1 year ago

which states that there's a handler on the producer side

yes, that is correct. The handler is implemented by the library, not by the end user.

OK, so I've got the handler being implemented by the end user right now... I think that's necessary though as the envelope is coming from the commit log and then being sent to the gRPC sink as a gRPC envelope.

huntc commented 1 year ago

Further to last night's chat on this PR, one thing I took away (perhaps incorrectly) was that events can be streamed out to the consumer without waiting on an ack for each. Having looked further at the Scala source today, I'm not sure if this is correct. The InternalProjectionState will be driven by a flow based handler, but there's nothing parallel I can see in terms of each handler flow being run. And the contract of the Handler for Akka projections is that an offset will be stored, so this would make sense. Thoughts?

Relevant code: https://github.com/akka/akka-projection/blob/main/akka-projection-core/src/main/scala/akka/projection/internal/InternalProjectionState.scala#L191-L211

huntc commented 1 year ago

which states that there's a handler on the producer side

yes, that is correct. The handler is implemented by the library, not by the end user.

Commit 587a776bd0e0ec28635035211d036e016a06c2a0 now has a handler implemented by the library.

patriknw commented 1 year ago

events can be streamed out to the consumer without waiting on an ack for each

That's right. The relevant code on the producer side is https://github.com/akka/akka-projection/blob/main/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala There you also find the EventPusherStage that receives the Ack and emits the Done, which will then be the signal to downstream storage of the offset (can happen later because of at-least-once).

On the consumer side, which you will not implement, the relevant code is https://github.com/akka/akka-projection/blob/main/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala#L66 and there you see mapAsyncPartitioned that allows parallelism of writing event for different persistenceId, but will emit the Acks in original order.

huntc commented 1 year ago

events can be streamed out to the consumer without waiting on an ack for each

I think I get it now. The flow is split with envelopes going out to the consumer endpoint, and acks being sourced from the consumer endpoint separately. So, the handler stage isn't emitting an element out until a corresponding ack is found, but that doesn't prevent the stage from receiving another element and sending it on. Is that right?

huntc commented 1 year ago

This should be relatively easy to achieve for me by spawning a task for each handler invocation. A limit on concurrency could be provided to this run function, and perhaps default to 1.

Sound reasonable?

patriknw commented 1 year ago

spawning a task for each handler invocation

ok, we can try that approach, even though it still doesn't address my second concern about user DX (open for mistakes). It also changes concurrency semantics of a projection handler, e.g. around mutable state in the handler. Still, it's worth trying and if we later see it not fit I think the same approach could be moved to internal implementation.

huntc commented 1 year ago

ok, we can try that approach, even though it still doesn't address my second concern about user DX (open for mistakes).

I've got an idea on how to address this by having a .handler method on the GrpcEventProducer, which in turn accepts a closure for performing the transformation. Pretty similar to the JVM approach. I'll experiment with this.

huntc commented 1 year ago

ok, we can try that approach, even though it still doesn't address my second concern about user DX (open for mistakes).

I've got an idea on how to address this by having a .handler method on the GrpcEventProducer, which in turn accepts a closure for performing the transformation. Pretty similar to the JVM approach. I'll experiment with this.

Commit 7c2e00df7a0790f350d48317fba0f7a1a1b71474 implements the approach I was thinking and should hopefully address the issue you raise re. DX.

huntc commented 1 year ago

Commit fb6d4c802681891f7bd152cbc19d3d22c0dda64b puts in place a foundation for processing handlers sequentially or as a flow. More implementation to follow as I was focused mainly on the DX.

huntc commented 1 year ago

This should be relatively easy to achieve for me by spawning a task for each handler invocation. A limit on concurrency could be provided to this run function, and perhaps default to 1.

Sound reasonable?

Turns out that I didn't need to spawn a new task each time. It is efficiently inline with the existing task.