Closed venkatd closed 2 years ago
A few years back, I wrote a hacky event-sourcing implementation for our Turtle backend. It has worked for us so far, but I'd like to re-implement some core components as a standalone library.
Having worked on this particular problem already I think you're in a really good position to create a library. At the same time I think you're putting yourself on a journey to build a library (not an easy one at that!) which is a big committment and will take away from your journey of building the application and business.
We have found a nice list of CQRS libraries in Elixir, https://github.com/slashdotdash/awesome-elixir-cqrs#libraries. Have you considered any of them? Are there fundamental problems with them that are best addressed by starting from scratch?
Keeping it as a separate library will make it easier to test, improve, and get help on from folks like yourself :)
Agreed! Just a word of caution, when you start integrating the library into the main project, we'd strongly advice to keep it in the same repo as it is going to be so much more easy to work on both at the same time, no need for cutting and coordinating releases, etc. It can be really simple, just a top-level ./derive
directory that is used as a path dependency from the main app: {:derive, path: "derive"}
.
Before implementing too much, I'd love to get help iterating on the public API. Please take a look at the tests in
derive_ecto_test.exs
and the documentation inDerive.Reducer
Before jumping in to these files, I briefly looked at some of the other internals, the Dispatcher and the EventLog. You might be interested in looking at Registry
(the PubSub section) and PartitionSupervisor
(will ship in Elixir v1.14) as these should provide great building blocks for what you want to achieve.
test/derive_ecto_test.exs
defmodule User do
use Derive.State.Ecto.Model
schema "users" do
field(:name, :string)
# ...
end
def up do
create table(:users, primary_key: false) do
add(:id, :string, size: 32, primary_key: true)
# ...
end
end
# ...
end
What's your thought process on coupling the schema with the migration?
def handle_event(%UserCreated{user_id: user_id, name: name, email: email}) do
insert(%User{id: user_id, name: name, email: email})
end
def handle_event(%UserNameUpdated{user_id: user_id, name: name}) do
update([User, user_id], %{name: name})
end
I think this is a very nice API.
Derive.Source.EventLog.append(:events, [%UserCreated{id: "1", user_id: "99", name: "John"}])
Derive.Dispatcher.await_processed(dispatcher, [
%UserCreated{id: 12, user_id: 99, name: "John"}
])
It is too early for this but eventually perhaps a single function that does both (append and wait for it to be committed) would be helpful.
setup_all do
Derive.State.Ecto.reset_state(Derive.Repo, [User])
# ^^^^^^^^^^^
nitpick: you probably did it just to get off the ground quickly but eventually I don't think Derive should ship with a Repo module. Instead I think the consumer of the library should pass their own Repo as this gives them more control.
lib/derive/reducer.ex
@moduledoc """
Defines how a given state is kept up to date based on an event source by a `Derive.Dispatcher`
It happens as follows:
- Events come from a source process defined by `&Derive.Reducer.source/0`
- These are partitioned by `&Derive.Reducer.partition/1` for maximum concurrency
- These events are processed by `&Derive.Reducer.handle_event/1`
which produces 0+ operations that are meant to update some state
- These operations are committed by &Derive.Reducer.commit_operations/1
"""
This makes sense to me.
Having worked on this particular problem already I think you're in a really good position to create a library. At the same time I think you're putting yourself on a journey to build a library (not an easy one at that!) which is a big committment and will take away from your journey of building the application and business.
We have found a nice list of CQRS libraries in Elixir, https://github.com/slashdotdash/awesome-elixir-cqrs#libraries. Have you considered any of them? Are there fundamental problems with them that are best addressed by starting from scratch?
Hi thanks for the word of caution :)
I had tried commanded, looked at Fable later on. They didn't meet our needs in a few areas. The main one was developer experience (which we want to continue to invest in). Here are a few examples:
mix es.rebuild Turtle.Storage.Reducer
and we're up to date in a single commandMany of these libraries are monolithic. They are opinionated on where the events come from, how they're structured, etc. Adopting them may require large refactors in our backend.
To avoid a long journey, I think the following will help to keep scope down:
Thanks for sharing that list. There are some new ones which I hadn't seen before. It will be worth looking through each library to see if any satisfy our needs. Worst case, we might get some inspiration on how to implement things.
Agreed! Just a word of caution, when you start integrating the library into the main project, we'd strongly advice to keep it in the same repo as it is going to be so much more easy to work on both at the same time, no need for cutting and coordinating releases, etc. It can be really simple, just a top-level ./derive directory that is used as a path dependency from the main app: {:derive, path: "derive"}.
Good idea! Will move it over this month.
What's your thought process on coupling the schema with the migration?
These are inherently coupled. Every time you change one, you have to change the other.
For example take a look at https://github.com/TurtleAI/turtle-api/blob/master/lib/turtle/project/models/card.ex in our app.
The process for adding a field to the cards table goes like:
handle_event
logicmix es.rebuild {name_of_the_reducer}
This command drops the table (calls down
), recreates it (calls up
), then processes all the relevant events to build the state from scratch. It's not a traditional migration because this is derived state. This rebuild can be called many times as we are iterating on the derived state.
So every time I add/remove/update a field, keeping them together ensures reduces the effort (and chance of mistakes) to keep them in sync.
I think this is a very nice API.
Thanks! These are really helpers that translate to an Ecto.Multi
:)
Insert for example: https://github.com/TurtleAI/derive/blob/master/lib/derive/state/ecto/operation/insert.ex Or if you want to drop down to a regular query: https://github.com/TurtleAI/derive/blob/master/lib/derive/state/ecto/operation/query.ex
It is too early for this but eventually perhaps a single function that does both (append and wait for it to be committed) would be helpful.
I agree. In some cases we want to "fire and forget" but in many cases we want to wait for it to get processed.
nitpick: you probably did it just to get off the ground quickly but eventually I don't think Derive should ship with a Repo module. Instead I think the consumer of the library should pass their own Repo as this gives them more control.
I agree. This repo is only used in the tests and can be configured. I wasn't sure how to define the repo just for the purposes of a test.
Before jumping in to these files, I briefly looked at some of the other internals, the Dispatcher and the EventLog. You might be interested in looking at
Registry
(the PubSub section) andPartitionSupervisor
(will ship in Elixir v1.14) as these should provide great building blocks for what you want to achieve.
PartitionSupervisor looks interesting. Where could it would be used in the implementation?
In derive_ecto_test.exs
I saw we're spawning the EventLog and the Dispatcher processes per log type, e.g. for :users
. If we only have one of these processes per type, that won't scale. A partition supervisor would help spread the load. Perhaps Registry-based PubSub would obviate the need for either or both of these processes (btw, Registry can be partitioned too!) but I thought it was worth mentioning the new supervisor anyway. :)
@wojtekmach got it!
I should confirm here: https://github.com/TurtleAI/derive/blob/master/test/derive_ecto_test.exs#L38
When I define the partition
function as so:
def partition(%{user_id: user_id}), do: user_id
We want events to be processed serially through handle_event
for any given value of user_id
. We also want to use this to "help spread the load" as you say.
(And unlike GenStage/Flow/Broadway, we do not need to deal with back-pressure.)
What would be the main advantage to using the partition supervisor over a Registry? Would it make sense to start with a DynamicSupervisor
and Registry
, then later swap to PartitionSupervisor
?
We have an implementation for running debounced tasks according to a given partition.
Here it is: https://github.com/TurtleAI/turtle-api/blob/50ea4ee0737d847857018ac81c8614b901c46595/lib/debounced_task_queue/debounced_task_supervisor.ex https://github.com/TurtleAI/turtle-api/blob/50ea4ee0737d847857018ac81c8614b901c46595/lib/debounced_task_queue/debounced_task.ex
I was thinking to copy the module+process structure here.
I think Registry + DynamicSupervisor like you have it is the way to go. I personally didn’t run into those yet but there are scenarios where a DynamicSupervisor itself can become a bottleneck in which case changing to PartitionSupervisor is a really easy way to scale out.
Btw, we have a blog post about our homemade analytics solution https://dashbit.co/blog/homemade-analytics-with-ecto-and-elixir where we use Registry+DynamicSupervisor too. Might be interesting if you haven't checked it out yet. I think you nailed it in the debouncer already but I thought I'd mention this resource anyway. :)
I think Registry + DynamicSupervisor like you have it is the way to go. I personally didn’t run into those yet but there are scenarios where a DynamicSupervisor itself can become a bottleneck in which case changing to PartitionSupervisor is a really easy way to scale out.
Ok great, I'll start with Registry + DynamicSupervisor. I think we'll have many more bottlenecks in our own app logic before DynamicSupervisor will become a bottleneck. If it does become a bottleneck, it will mean our business will be large enough to afford more help from Dashbit with scaling ;)
Btw, we have a blog post about our homemade analytics solution https://dashbit.co/blog/homemade-analytics-with-ecto-and-elixir where we use Registry+DynamicSupervisor too. Might be interesting if you haven't checked it out yet. I think you nailed it in the debouncer already but I thought I'd mention this resource anyway. :)
Will give it a read, thanks!
For next steps, I'll get a more full prototype up and running for feedback. If I get stuck I'll hack around it and comment each hack so I don't lose momentum.
Will create a new issue when I'm done. Thanks!
Sounds good, looking forward to the follow-up!
Hi @TurtleAI/eds
A few years back, I wrote a hacky event-sourcing implementation for our Turtle backend. It has worked for us so far, but I'd like to re-implement some core components as a standalone library.
Keeping it as a separate library will make it easier to test, improve, and get help on from folks like yourself :)
Goals of the library
Derive.Reducer
Desired characteristics
Scalability
As our number of users grow, we'd like to be able to handle the load. In the old implementation, I'm doing some things inefficiently such as redundantly querying the event source for events.
We'd also like to eventually run this in a multi-node environment. Our code in Turtle only works in a single-node environment.
Flexibility
We'd like to have flexibility on:
Good tooling
Once we've built this library, we will want to add some tooling to improve our experience. For example:
Next steps
Before implementing too much, I'd love to get help iterating on the public API. Please take a look at the tests in
derive_ecto_test.exs
and the documentation inDerive.Reducer
When we integrate this library into our Turtle backend, we would essentially define a bunch of reducers that implement the
Derive.Reducer
behavior. Then, we'd like some derived state (postgres tables) to be kept up to date with an event log (postgres table called events)Do you have any questions/feedback on the API? Any undefined behavior we should better define?
Thanks!