TurtleAI / derive

An Event Sourcing and CQRS solution.
0 stars 1 forks source link

More robust implementation for a large number of events #3

Closed venkatd closed 2 years ago

venkatd commented 2 years ago

Hi @TurtleAI/eds

I'm satisfied with the public api of the library, so want to start getting feedback again :)

A Dispatcher will start from the last processed event and process events until there are no more events to process. This is fine if there are a small number of events, however it's possible (for example if we are rebuild the state from scratch) that there would be on the order of 50,000 events.

Right now I recursively call a catchup function and process 100 events at a time, but I wonder if there are better ways to handle it? Should I somehow have the process to send a message to itself to process more events?

Here's, on boot, the process picks up where it left off in the event log to start processing events: https://github.com/TurtleAI/derive/blob/518e9ccf57c77224a5e224b6927989f309a776ff/lib/derive/dispatcher.ex#L111-L118

Implementation of catchup: https://github.com/TurtleAI/derive/blob/518e9ccf57c77224a5e224b6927989f309a776ff/lib/derive/dispatcher.ex#L129-L161

philss commented 2 years ago

Hi @venkatd :wave:

Right now I recursively call a catchup function and process 100 events at a time, but I wonder if there are better ways to handle it? Should I somehow have the process to send a message to itself to process more events?

I think in overall using recursion is OK for this, specially because you are handling the first message after handle_continue. However it would be nice to have control of the batch size (events per cycle), so if your events grow in number, they can be handled accordingly. I would suggest something like GenStage for that, since you can have back-pressure control. But this would make thinks more complex.

What I imagine is that you could have the Derive.EventLog.fetch(source, {version, batch_size}) in a producer, and that producer could control which version and batch size (based on demand) it has. The consumer would process the events. When there is no events, the producer would {:stop, reason, new_state} and maybe send back the last version used. I'm not sure thought if this "stop process" would work. I'm going to test with a small example and I will write back to you here.

Just to confirm: your events must be in order, right?

venkatd commented 2 years ago

@philss

GenStage sounded interesting, but I was worried about the added complexity just for solving this problem. And I'm not sure if we would need back-pressure in our case. The code works fine when there aren't a lot of events, but I'm wondering how these recursive calls would behave with a much larger dataset.

Some issues I'm wondering about:

I should clarify terms in the code:

Just to confirm: your events must be in order, right? Yes they must be processed in order

One thought I had was that here: https://github.com/TurtleAI/derive/blob/e3fa1c610ce302e4d6e86c6e57bc7a3265ae5e10/lib/derive/dispatcher.ex#L159

I am calling catchup recursively. Perhaps I should send a message to self() like GenServer.cast(self(), :catchup_again) or something like this?

philss commented 2 years ago

@venkatd I see. So let's try to address the issues:

  • Could a very long call catchup cause the GenServer to timeout?

By default the long call will not timeout, but other messages won't be processed until this one is finished. This could potentially fill all the process mailbox.

  • Would the GenServer be unresponsive to messages. If I called something like GenServer.call(dispatcher, :get_status) would that lock up until we have fully processed all 50k events?

Yes, this way it will lock up until all events are processed. Sending messages to self() (instead of the recursion) could solve this issue, but at the same time you could receive other messages in the middle of the process. It would look like this:

sequenceDiagram
    Dispatcher->>Dispatcher: :catchup_on_boot
    Other->>Dispatcher: {:await, events}
    Dispatcher->>Dispatcher: :catchup_on_boot
    Dispatcher-->>Other: :ok

I have a suggestion with handle_continue in the bottom.

One way to improve this would be to fetch all versions available upfront, and then do a "parallel map" using Task. Do you think this is possible? Keeping the order of things is what make this harder. Do you consider that all events must be in order, or do you think it's fine to keep order only for partitions (like company id, or user id)?

The version variable is a pointer/cursor on where in the event log we are which allows resuming processing if we shut down the app. Would you recommend renaming this to something else?

I think version is fine, but naming it cursor looks better to me. WDYT?

I am calling catchup recursively. Perhaps I should send a message to self() like GenServer.cast(self(), :catchup_again) or something like this?

Like I said previously, you would end with the possibility of another message be processed between :catchup_again messages. One way to circumvent this is to use the handle_continue instructions instead. You could make your first handle_continue to return another handle_continue - this time for catchup with your updated state. And then handle_continue(:catchup, ...) would do the loop for you. Something like this:

defmodule Derive.Dispatcher do
  # ...

  # your existing `handle_continue`
  @impl true
  def handle_continue(:load_partition, %S{reducer: reducer} = state) do
    partition = reducer.get_partition(@global_partition)

    # here you say to `:continue` for the catchup
    {:noreply, %{state | partition: partition}, {:continue, :catchup_on_boot}}
  end

  # then, a new `continue` clause
  @impl true
  def handle_continue(:catchup_on_boot, %S{
           reducer: reducer,
           source: source,
           partition: %Derive.Partition{version: version} = partition,
           batch_size: batch_size,
           lookup_or_start: lookup_or_start
         } = state) do
    case Derive.EventLog.fetch(source, {version, batch_size}) do
      {[], _} ->
        # done processing so return the state as is, or stop
        case state.mode do
          :catchup -> {:noreply, state}
          :rebuild -> {:stop, :normal, state}
        end

      {events, new_version} ->
        events
        |> events_by_partition_dispatcher(reducer, lookup_or_start)
        |> Enum.map(fn {partition_dispatcher, events} ->
          PartitionDispatcher.dispatch_events(partition_dispatcher, events)
          {partition_dispatcher, events}
        end)
        |> Enum.each(fn {partition_dispatcher, events} ->
          PartitionDispatcher.await(partition_dispatcher, events)
        end)

        new_partition = %{partition | version: new_version}
        reducer.set_partition(new_partition)

        # we have more events left to process, so we continue with catchup
        {:noreply, %{state | partition: new_partition}, {:continue, :catchup_on_boot}}
    end
  end

  # ...
end

I think this way you can at least guarantee that no other message will be processed in between catchup_on_boot messages.

But one important thing came to mind: do you think the system should start sending messages to this process even before the catchup finishes (like my previous diagram suggests)? If the answer is no, I think you should really do all the "catchup" inside the init callback, and only make this process available after all the catchup. This would make the boot of your application slower, but at least it would guarantee that no one is calling this process before it is ready.

Another important question: what happens if your process receives a message between the handle_continue(:load_partition) and the existing handle_cast(:catchup_on_boot)? Do you accept that, or it shouldn't happen? Again, this is the same as I'm suggesting in the diagram. Perhaps you shouldn't have the handle_cast at all.

venkatd commented 2 years ago

@philss

I think version is fine, but naming it cursor looks better to me. WDYT?

Sgtm!

During catchup, I think it would be totally fine to accept events while things are not yet fully caught up. There aren't any APIs that would depend on things being fully caught up.

The use cases for dispatcher will remain simple:

Do you consider that all events must be in order, or do you think it's fine to keep order only for partitions (like company id, or user id)

Events must be processed in order, but only within a given partition such as a user id. Are there any code examples that you could point to on how this might be done with a Task module?

philss commented 2 years ago

During catchup, I think it would be totally fine to accept events while things are not yet fully caught up. There aren't any APIs that would depend on things being fully caught up.

Cool. So this means it can work fine with "recursive calls" to self() using handle_cast. You could send batches of 1000 events, in order to break this processing into small parts and avoid very long call.

Events must be processed in order, but only within a given partition such as a user id. Are there any code examples that you could point to on how this might be done with a Task module?

I mentioned Task, but I think there is no easy way to do that using tasks. For sure this can be done using Flow. You could create a new stream with Stream.resource/3, and pass it to Flow providing a partition key or function. WDYT?

batch_size = 1_000

stream =
  Stream.resource(
    fn -> cursor end,
    fn cursor ->
      case Derive.EventLog.fetch(source, {cursor, batch_size}) do
        {[], cursor} -> {:halt, cursor}
        {events, cursor} -> {events, cursor}
      end
    end,
    fn _cursor -> :ok end
  )

stream
|> Flow.from_enumerable()
|> Flow.partition(key: {:key, :user_id})
|> Flow.map(fn event -> :process_the_event end) # here you could also use `Flow.map_batch/2` to work with multiple events.
|> Enum.to_list()
venkatd commented 2 years ago

@philss thanks!

I've gone this route of the recursive handle_cast and whew the tests are finally passing again. :)

I'll revisit the Flow implementation if we decide to run the logic in an init step. I have some questions on my Derive.await implementation but I can create a separate issue for this.

Thanks for your detailed answers, it has saved me from a lot of struggle.