akka / akka-edge-rs

Akka Edge support in Rust
https://doc.akka.io/docs/akka-edge/current/
Other
9 stars 1 forks source link

Entity manager and other conveniences #121

Closed huntc closed 12 months ago

huntc commented 12 months ago

Improves the API ergonomics by providing a convenience function for running an entity manager and the offset store. It also streamlines the projections and a few other bits.

By example, the entity manager function returns both a future to run and a channel for sending commands.

So, this:

let (temperature_commands, command_receiver) = mpsc::channel(MAX_COMMANDS);

...
entity_manager::run(
        Behavior { events },
        file_log_topic_adapter,
        command_receiver,
        MAX_ENTITIES,
    )
    .await

...becomes this:

 let (entity_manager_task, temperature_commands) = entity_manager::task(
        Behavior { events },
        file_log_topic_adapter,
        MAX_COMMANDS,
        MAX_ENTITIES,
    );

...
entity_manager_task.await

Another thing this PR does is to streamline the consumer filter creation. For example:

let initial_consumer_filters = vec![FilterCriteria::IncludeEntityIds {
    entity_id_offsets: vec![EntityIdOffset {
        entity_id: persistence_id.entity_id.clone(),
        seq_nr: 0,
    }],
}];

let channel = Channel::from_static("http://127.0.0.1:50051");
let mut source_provider = GrpcSourceProvider::<u32, _>::new(
    || channel.connect(),
    StreamId::from("some-string-id"),
)
.with_initial_consumer_filters(initial_consumer_filters);

let consumer_filters = source_provider.consumer_filters().unwrap();
assert!(consumer_filters
    .send(vec![consumer_filter::exclude_all()])
    .is_ok());

In the above, if no initial filters are required then an empty vec can be passed into with_initial_consumer_filters. And, if consumer filters are subsequently required to be dynamically updated then the consumer_filters method can be used to take the sender.

Finally, the producer side of things has been slightly improved by returning some channels. Probably some more wins to be had down the line there. I'll look further at the production side separately.

Along the way, I also removed the NonZeroUsize type as a parameter as it is ugly, and not intended for APIs.

Fixes: #116 Fixes: #117 Fixes: #118

Related: https://github.com/lightbend/akka-projection-temp/pull/20