Open tyranron opened 5 years ago
This is very much in our plans, and we have designed our traits so that it will not be too painful to make that transition. It will be a breaking change, but we are prepared for that. I generally am aiming to begin that work once/as std::future is stabilized (rust-lang/rust#59725), as I'd like to make the change only once, and use the new APIs internally (This may require using the compatibility layer to use with the futures
crate for a bit).
This stabilization is targeted for 1.35, planned for May 23. We should be able to do some preliminary work against nightly, and then move to beta to validate. I'd love to have a new release with async-first support when that drops on stable.
As for collaboration, I am very happy to have additional input/code contributions. Additional backends, or potential improvements are happily accepted. We are still trying to figure out how best to model and support "reactors", so that part is still a bit experimental, but the core of the event/command work has worked well for us so far.
@neoeinstein thank you for the feedback.
So, if you don't mind on collaboration, I'll dig deeper in your design and primitives in the next few days to see how they apply to my cases/needs. I'll share my thoughts and comparisons along with my design/code pieces to discuss the possible design improvements (if there will be a need). After all the design questions will be resolved and there will be a clear vision for me how to fit the framework for my projects, I can start to work on nightly async implementation to deliver it on stable with 1.35 landing in May.
Is it OK for you?
Works for me.
That sounds great people, thanks for the effort!
@neoeinstein I'm sorry for a huge delay. Right after my post above I was accidentally involved into some serious issues with legacy projects of my company, so last 5-6 weeks had no opportunity to even touch our Rust-based projects. Now I'm in the game again, so able to continue.
First, I'll try to give a grasp of my solution design goals and what it's like. In our project this sub-crate is called cqrusty
(crusty), so I'll refer it with this word below.
Designing crusty
is inspired in many aspects by Axon Framework. The goal was to provide such CQRS/ES abstraction layer, that domain layer can be built on top of it and remain mainly untouched later, while the bottom infra can be drastically changed/evolved (starting with in-process in-memory command/event buses and strong consistency for changing state and scaling to distributed buses (Kafka-based, for example) and eventual consistency). So the abstractions should be quite generic over a bunch of stuff (IDs, consistency models, etc), while remain obvious and ergonomic in use. While the actual framework code is somewhat LEGO blocks where everyone can choose the desired infra and guarantees (or produce its own block).
Now, let's share crusty
abstractions and compare it with cqrs
abstractions one by one.
Let's start with the basis: Aggregate
Such separation is introduced for ability to choose the desired guarantees (not every CQRS-based application is ES, so framework user should have an option to not use Event
s and go with raw Aggregate
s directly) and to control certain aspects in framework code on type level (some framework building block may require VersionedAggregate
while other may work with any Aggregate
), so making guarantees clear even for the compiler.
cqrs
has somewhat similar design. There are Aggregate
, AggregateId
and VersionedAggregate
. Differences:
Aggregate
is required to be event-sourced due to apply()
method in trait definition. This eliminates ability to use CQRS without ES.Default
implementation rather than requiring initial_state()
method implementation (how crusty
does).VersionedAggregate
being a wrapper-type rather than a trait. This makes more clear separation between payload/version and frees us from declaring version in the aggregate type (for example, almost all our aggregates have ver
field in the actual code). However, I'd like to have an ability to abstract over the Version
type. In current cqrs
implementation it's EventNumber
(NonZeroU64
under-the-hood). In our code we're too concerned about type safety, so every aggregate has its own version type. I believe this can be accomplished simply by pub enum Version<N = EventNumber>
and pub struct VersionedAggregate<A, N = EventNumber>
.AggregateId
is a separate trait rather than an associated type I am not really able to figure out.Again, the design of events is similar in many parts. However, I don't really like the current cqrusty
design as it doesn't feel elegantly enough.
cqrusty::Event
has a version. The docs are quite self-explanatory why. Event
has no. So, it seems that this is a not wide-spread case to use them, while it's good for framework to have them. I guess it may be decoupled and introduced as an optional stuff (like VersionedEvent
or something, however, VersionedEvent
already means something else in cqrs
).cqrusty::AggregateEvent
is a bad hack required to deal with serialization/deserialization issues in generic code. As far as I see, in cqrs
this problem is solved by operating with SerializableEvent
and DeserializableEvent
on framework level (where we're dealing with enum
-wrapper) rather than with Serialize
/Deserialize
. I like that, though I'm not sure whether relaying only to &[u8]
/Vec<u8>
is enough here for a variety of cases.VersionedEventWithMetadata
is a rough equivalent of cqrusty::EventMessage
(despite that "Event
version" has a different meaning in cqrusty
). cqrusty::EventMessage
, however, abstracts over EventId
due to the same reason as with aggregate versions - ability to choose. For example, in our system IDs of events are UUID
s, so unordered, while the order of events is tracked by event creation time (for our data it's OK and simplifies sharding), and this is quite non-standard way where these two aspects are relaying on monotonically increasing sequential numbers.cqrs
does the same just with an AggregateCommand
.
cqrusty
Aggregate
that implements CommandHandler
consumes Command
, in cqrs
AggregateCommand
is self-consumed onto a given Aggregate
.cqrusty::CommandHandler
is decoupled with an assumption that Command
may not be always handled directly by Aggregate
, but rather another type may be responsible for doing this. However, in practice this was never used, so I'm totally OK with such simplification.AggregateCommand
, however, a quite verbose with its associative types. We always should declare a Result<Self::Events, Self::Error>
. I believe we can improve ergonomics here by fully abstracting over result, so provide an associative type type Result: Into<CommandExecuteResult>
, where CommandExecuteResult
is something like Future<Result<I: Events, E: CqrsError>>>
and provide out-of-the box implementations for types like ()
, E: Event
, E: Events
, and so on... This should allow to write clean code in command handlers with an explicit result (some commands may have no result at all, some cannot fail, some a sync, some are async), while the converting-to-a-single-type stuff happens on framework's side.cqrusty::CommandHandler::Context
is quite a useful thing. We're using it quite a lot with passing there database contexts and similar stuff if required. I wonder how you're doing in such situations.Here is the part where I'm quite impressed with your simple and elegant design choices. All the EventSource
, EventSink
, SnapshotSource
and SnapshotSink
traits are understandable and decoupled which will allow fine-grained control over returning type to abstract both for sync and async world simultaneously. I really like an explicit SnapshotStrategy
too, while keeping in mind the CQRS /ES case where no event exist (currently cqrs
doesn't allow that).
Additional aspect worth mentioning:
In cqrs
any DB transactions happen on store implementation level, which may be a bit inflexible and implicit. In cqrusty
we've introduced a notion of UnitOfWork
for that, which allows to choose desired guarantees on infra level and keep store implementation transaction-agnostic.
Its design far from ideal, but does the job for us at the moment. It's somewhat a corner stone in all our system.
Here is the actual and interesting part: how all the things above interact with each other.
What I've understood from cqrs
examples:
Command
and passes it directly to EntitySink
(which is essentially an EventSink
+ SnapshotSink
).EntitySink
loads Aggregate
(optionally), executes the Command
and receives Events
, applies the Events
to Aggregate
and saves it.EntitySink
returns updated Aggregate
.So, the actual "framework" logic happens in EntitySink
implementation.
There also Reactor
s which are supposed to react on to happened Event
in some way, however, I didn't understand how exactly (maybe due to lack of examples).
In cqrusty
we've designed the following flow:
Command
and passes it to CommandGateway
, which is responsible to route the Command
to a concrete Aggregate
. CommandGateway
can be a single code piece which does all the job in-place, or rather backed be distributed system which spawns Command
to some CommandBus
(Kafka-based, for example) which delivers Command
to a concrete Aggregate
.CommandGateway
machinery/implementation is the actual "framework" logic, which starts UnitOfWork
and performs inside pretty all the same that EntitySink
does: loads/instantiates Aggregate
, executes Command
, applies Events
, stores updated Aggregate
.CommandGateway
implementation it may return updated Aggregate
or any other desired result for this case.Events
is intended to happen via EventHandlers
which may be invoked directly or indirectly (via distributed events listening), or don't invoked at all depending on chosen CommandGateway
implementation.Future plans for cqrusty
were:
CommandInterceptor
s, CommandValidator
s.Phew... this took quite a time 🙃
@neoeinstein sorry for the bad English (not my native lingua). I'm looking for some constructive feedback from you. What are you thinking about all above?
In the next post I'll be more concrete and describe the concrete necessities our projects require for switching to cqrs
.
@neoeinstein ping
@neoeinstein ping
First off, sorry for such a long time in getting back to this.
On to the merits. Note, your English is perfectly fine, nothing to be self-conscious of there.
Such separation is introduced for ability to choose the desired guarantees (not every CQRS-based application is ES, so framework user should have an option to not use
Event
s and go with rawAggregate
s directly) and to control certain aspects in framework code on type level (some framework building block may requireVersionedAggregate
while other may work with anyAggregate
), so making guarantees clear even for the compiler.
cqrs
has somewhat similar design. There areAggregate
,AggregateId
andVersionedAggregate
. Differences:
Aggregate
is required to be event-sourced due toapply()
method in trait definition. This eliminates ability to use CQRS without ES.
I'm open to separating these two. Having an Aggregate
that stands alone, and then a Projection
, which would receive events.
- I really like the idea to require
Default
implementation rather than requiringinitial_state()
method implementation (howcrusty
does).
That was also my thought on seeing your Aggregate
definition.
- I like the idea for
VersionedAggregate
being a wrapper-type rather than a trait. This makes more clear separation between payload/version and frees us from declaring version in the aggregate type (for example, almost all our aggregates havever
field in the actual code). However, I'd like to have an ability to abstract over theVersion
type. In currentcqrs
implementation it'sEventNumber
(NonZeroU64
under-the-hood). In our code we're too concerned about type safety, so every aggregate has its own version type. I believe this can be accomplished simply bypub enum Version<N = EventNumber>
andpub struct VersionedAggregate<A, N = EventNumber>
.
I would like to change Version
to be a generic, allowing GUIDs or other forms of identifiers. I like this.
- The reasons why
AggregateId
is a separate trait rather than an associated type I am not really able to figure out.
AggregateId
really should be turned back into an associated type. I'm not certain why we did that either. I think it was to deal with having struct IDs, and then conforming them to postgres
. The better path is to have an additional constraint to use an aggregate ID with postgres
, not to muck up the AggregateId
.
cqrs
does the same just with anAggregateCommand
.
It's somewhat mirrored: while in
cqrusty
Aggregate
that implementsCommandHandler
consumesCommand
, incqrs
AggregateCommand
is self-consumed onto a givenAggregate
.
cqrusty::CommandHandler
is decoupled with an assumption thatCommand
may not be always handled directly byAggregate
, but rather another type may be responsible for doing this. However, in practice this was never used, so I'm totally OK with such simplification.
AggregateCommand
, however, a quite verbose with its associative types. We always should declare aResult<Self::Events, Self::Error>
. I believe we can improve ergonomics here by fully abstracting over result, so provide an associative typetype Result: Into<CommandExecuteResult>
, whereCommandExecuteResult
is something likeFuture<Result<I: Events, E: CqrsError>>>
and provide out-of-the box implementations for types like()
,E: Event
,E: Events
, and so on... This should allow to write clean code in command handlers with an explicit result (some commands may have no result at all, some cannot fail, some a sync, some are async), while the converting-to-a-single-type stuff happens on framework's side.
I like this, but see below.
- Also,
cqrusty::CommandHandler::Context
is quite a useful thing. We're using it quite a lot with passing there database contexts and similar stuff if required. I wonder how you're doing in such situations.
Our general philosophy with Commands is that to be a true command, it needs to be self contained without side effects. If there is a need to access a database or external resource (or even collect the current time), that should be resolved at the layer above, and then the result of those operations pulled into the Command
before being handled by the Aggregate
. Thus, I see that a command should always just be a Result
, but the CommandResolver
(or what you later termed the CommandGateway
) would be the one returning some associated Into<CommandExecuteResult>
, being a Future
.
Here is the part where I'm quite impressed with your simple and elegant design choices. All the
EventSource
,EventSink
,SnapshotSource
andSnapshotSink
traits are understandable and decoupled which will allow fine-grained control over returning type to abstract both for sync and async world simultaneously. I really like an explicitSnapshotStrategy
too, while keeping in mind the CQRS ~/ES~ case where no event exist (currentlycqrs
doesn't allow that).Additional aspect worth mentioning: In
cqrs
any DB transactions happen on store implementation level, which may be a bit inflexible and implicit. Incqrusty
we've introduced a notion ofUnitOfWork
for that, which allows to choose desired guarantees on infra level and keep store implementation transaction-agnostic.
As designed, this set of libraries has the concept of optimistic concurrency in mind. Thinking about how to frame a UnitOfWork
as you have it is interesting, but not something I have fully considered.
There also Reactors which are supposed to react on to happened Event in some way, however, I didn't understand how exactly (maybe due to lack of examples).
Yeah, we were trying something experimental here, but that will probably get ripped out and replaced. The main thing here is providing a way for some process to follow an event stream, reacting to the events, keeping quick-to-read snapshots up to date offline, or building new projections. This area needs some iteration.
Again, I apologize that I haven't been present to handle this as I'd have liked. I'll start taking some of the steps mentioned above as PRs, and I'll include you if you'd like to be kept up to date.
@neoeinstein thanks for the feedback! It's OK about delay.
I've started to experiment with all this in async
branch here. It's quite incomplete at the moment. For simplicity reasons and due to borrowing/lifetimes nightmares I've jumped over std::future
directly to async
/.await
usage and even async-trait
. The latter, however, implies performance costs even in trivial cases (due to Box
-ing), but the result ergonomics outweight that for me. So, it's currently nightly-only up to 1.38
Rust release.
I would like to change
Version
to be a generic, allowing GUIDs or other forms of identifiers. I like this.
Actually, with recent use we've found it OK not being generic. The only change I've made is extending its to NonZeroU128
under-the-hood and delegating the responsibility of generating them to EventSink
. So, having u128
size solves the problem with GUID/UUID/ULID/etcID as easily converts to/from desired format. At the EventSink
level library user is free to choose those IDs being sequential/not and the desired storage format (we, actually, use 64 bits nanoseconds timestamp + 64 bits random in the manner of ulid and store them as CockroachDB's UUID
field).
Our general philosophy with Commands is that to be a true command, it needs to be self contained without side effects. If there is a need to access a database or external resource (or even collect the current time), that should be resolved at the layer above, and then the result of those operations pulled into the
Command
before being handled by theAggregate
. Thus, I see that a command should always just be a Result, but theCommandResolver
(or what you later termed theCommandGateway
) would be the one returning some associatedInto<CommandExecuteResult>
, being aFuture
.
Hmmm... from what I've seen out there in examples and CQRS frameworks the Command
handling part is the actual part where framework user writes its "business logic". The part between CommandResolver
and actual Command
handling is a framework part which "just works". It's something like you're throwing a Command
into framework and almost only thing you should declare is how it's handled by Aggregate
.
But having CommandHandler
being pure is quite an interesting idea, while confuses my understanding... Would you be so kind to explain it a bit further for me?
For example, we have to situations:
CreateUser
command handling generates some random unique number on-fly, and we should ensure it being unique. At the moment we perform this check inside CommandHandler
which DB interactions are wrapped into transaction on framework level. If CommandHandler
would be pure, where should I have check this invariant?MuteAudio
command disables audio of WebRTC conference participant for other participants. It's not only updates state in DB, but also talks to some media-server via its API. Currently we talk to that API inside CommandHandler
, but where it should be if CommandHandler
is pure?As designed, this set of libraries has the concept of optimistic concurrency in mind. Thinking about how to frame a
UnitOfWork
as you have it is interesting, but not something I have fully considered.
Yup, generally CQRS/ES is used in a key of optimistic concurrency and eventual consistency. Our case is a bit simplified as we're using CQRS/ES with strong consistency in the manner described here, but with the aim to "jump onto" eventual consistency if/when performance bottleneck will appear there.
So, my vision of CQRS/ES framework is to provide a set of well designed core primitives and some ready "lego" parts of framework implementation. So, either library user can take some high-level abstraction and just go with them, or reuse core primitives and some parts to build its own flow model. Without restricting user to use only strong consistency or eventual consistency, using event sourcing at all, or dispatching commands inside single process only.
Hello there! Thank you for your efforts and sharing this ❤️
I'm doing the very similar work at the moment, but planning to mature it in a closed source code base before releasing/sharing anything. Your work is quite interesting to me as decomposes stuff in a different way. The main difference is that my solution supports both sync/async by abstracting over result types. However, in your solution there is a strict
Result
everywhere, and example apps use sync IO (r2d2, iron, postgres).Actually, I'd like to elaborate with you to focus all the power on a singe The Ecosystem Framework, rather than spreading it on half-solutions. However, my projects are heavily async based, so I cannot "just switch" to your framework at the moment.
Do you have any plans to support async? Are there any designs figured out for that? Can I help somehow with this?