commanded / eventstore

Event store using PostgreSQL for persistence
MIT License
1.05k stars 146 forks source link

can you give a concrete example of how to stream all events #274

Closed codefist closed 10 months ago

codefist commented 1 year ago

all_events = EventStore.stream_all_forward() |> Enum.to_list()

This will read all events into memory, it is for illustration only. Use the Stream functions to process the events in a memory efficient way.

I need to step through all events and run a function per. Can you please give a concrete example of the syntax for this.

codefist commented 1 year ago

or, it's just simple...: EventStore.stream_all_forward |> Enum.each(...))

maybe your "Use the Stream functions to process" confused me.

jvantuyl commented 1 year ago

So, a stream is just like any other enumerable. The example turns it into a list using Enum.to_list/1, which is why the memory usage would be a problem. So you would do this just like you would stream-process any other Enumerable value.

I'm not 100% sure about your specific use case, but I'll give five examples. Hopefully yours will be in there somewhere.

Streaming "each"

The function Stream.each/2 can consume a Stream (or any Enumerable) and execute a function on each value. The output of it is another stream with the original values. This is useful to "tap into" a stream without transforming it. Its output is another stream with the same values that went into it.

In this example, we use this function to print out a representation of each value as it come through the stream. In this example, we don't have any "final" consumer for this to drive the stream to actually process values. We'll use Stream.run/1 which exists just for this purpose.

This is an important point about Stream values in Elixir: "Until something consumes it, nothing actually happens." That's why the initial examples had the Enum.to_list/1 function. Otherwise the stream wouldn't have been consumed by anything.

EventStore.stream_all_forward()
|> Stream.each(&IO.inspect/1)
|> Stream.run()

Streaming "map"

This example builds on the previous one to transform the values before displaying them. We use Stream.map/2 for that. For purposes of this example, we'll just wrap them in an :ok result-tuple.

EventStore.stream_all_forward()
|> Stream.map(&{:ok, &1})
|> Stream.each(&IO.inspect/1)
|> Stream.run()

Streaming "filter"

This example builds on the previous one to filter values before transforming and displaying them. We use Stream.filter/2 for that. For purposes of this example, we'll limit ourselves to events of "OrderCreated" type.

Instead of using Stream.run/1 here, we'll use Enum.count/1 which will efficiently count the values in the stream and return that count when it is fully consumed. This shows how you can really consume this with any function that expects an Enumerable.

Also note that this is memory efficient because Enum.count/1 doesn't save the streamed values as it counts them. If we used something that did, we would have a problem just like we did with Enum.to_list/1. Examples of this would be Enum.sort/1 and Enum.shuffle/1.

EventStore.stream_all_forward()
|> Stream.filter(&(&1.event_type == "OrderCreated"))
|> Stream.map(&{:ok, &1})
|> Stream.each(&IO.inspect/1)
|> Enum.count()

Streaming Tasks

Let's assume you have a function called refresh_order/1 that rebuilds your order data. It may do a lot of work and take a long time. So using Stream.each/2 would work, but it would be too slow.

The Task.async_stream/3 function will consume an Enumerable and run a function on it as an asynchronous Task. It will run a pool of them equal in size to the number of processors on your machine. In this case, let's say we want the return values and we want them in the same order as they went in (for some reason).

You can't define functions the normal way in IEx, so I'll build them as anonymous functions just so this is easy to follow along with. In real code, you'd have these functions in a module.

refresh_order = fn(order) ->
    # pretend to do something that takes time
    5
    |> :timer.seconds()
    |> :timer.sleep()
    order
end

EventStore.stream_all_forward()
|> Stream.filter(&(&1.event_type == "OrderCreated"))
|> Task.async_stream(refresh_order, ordered: true)
#                                   ^^^^^^^^^^^^^
#  with statically defined functions, we'd use a capture here instead of passing in the dynamically-defined function
|> Stream.each(&IO.inspect/1)
|> Enum.count()

Streaming Supervised Tasks

Maybe this code is working fine, but on deploys it abruptly explodes instead of finishing its work. Or it generates a barrage of errors as the tasks all get killed.

Using a Task.Supervisor, you can force the Task processes to be supervised under the special supervisor that you started. The default behavior of the async_stream* functions on a Task.Supervisor will actually wait 5 seconds for them to finish before killing them all.

I'll use the nolink variant of the function so that we receive errors when things fail instead of getting an :EXIT (which kills our process). Let's say these processes spend a lot of time waiting on outside stuff. To get a deeper pool, we'll increase the concurrency and increase the default Task timeout, too.

I'll start a supervisor explicitly in this example so you can see it work in IEx. Normally, you'd want that as part of your supervision tree. Task.Supervisor defines a child_spec/1 function, so you can start one the same way you'd start a GenServer.

Since the Enumerable argument no longer comes first with these functions,

refresh_order = fn(order) ->
    # pretend to do something that takes time
    5
    |> :timer.seconds()
    |> :timer.sleep()
    order
end

# we'll also create a little function to make this easier to pipe into
refresh_orders = fn(order_stream) ->
    Task.Supervisor.async_stream_nolink(
        MyApp.OrderRefreshSupervisor,
        order_stream,
        refresh_order,  # with statically-defined functions, this would be a capture
        max_concurrency: 4 * System.schedulers_online(),
        ordered: true
    )
end

# in a supervision tree, the child spec would be `{Task.Supervisor, name: MyApp.OrderRefreshSupervisor}`
{:ok, _} = Task.Supervisor.start_link(name: MyApp.OrderRefreshSupervisor)

EventStore.stream_all_forward()
|> Stream.filter(&(&1.event_type == "OrderCreated"))
|> refresh_orders.()  
|> Stream.each(&IO.inspect/1)
|> Enum.count()

# Note the `.()` syntax above. This is needed because we dynamically defined the function.
# A normal function call would work if the function was defined in a module.

Hopefully this helps. Happy coding!

ericlyoung commented 9 months ago

this definitely answered my question, thanks @jvantuyl @slashdotdash this need of mine (not unusual for an event sourced system) does reveal a missing piece of functionalty, something like: "stream_forward_by_event_types(["OrderCreated", "OrderUpdated"]), which is of course filters at the query level. see the of_type method at: https://railseventstore.org/docs/v2/read/