Closed tonyvanriet closed 6 years ago
Hi @tonyvanriet,
We are using this for more than 2 years and we didn't have such issues even with streams ~1M events. What is size of your stream? Can you send some part of code where you implemented :on_event? Anything you could give us to reproduce problem would be great.
Also, problem with manual reading existing events and then subscribing to that stream is that there's small time period between those 2 where new event can appear in that stream and you'll loose it. read_and_stay_subscribed first subscribes to stream and than starts reading from start until event number subscription returned as last known. All events that are coming in the meantime are buffered and published after reading is done (that is what buffered_messages are for), so pay attention on that if you are doing something similar manually.
Hey @burmajam. Thanks for your feedback.
Here's a snippet of our :on_event
handler.
def handle_info({:on_event, resolved_event}, state) do
resolved_event
|> state.process_event.()
Logger.debug "#{__MODULE__} processed event number #{resolved_event.event.event_number}"
{:noreply, state}
end
state.process_event.()
is a callback that's provided when the application starts the subscription. Of course, this is where the real work gets done for each event. In our case, it processes the event synchronously so this subscriber process will not grab the next :on_event
message until this processing is done.
I was able to confirm one of my theories above. While our receiver is processing the events, the Extreme.Subscription
continues to push :on_event
messages into the mailbox. When it timed out, there were about 300_000 :on_event
messages in the process mailbox.
Perhaps our event processing function is much slower than yours, but it still seems unlikely that any typical event processing could possibly keep up with the rate that events can be read out of the EventStore.
Are you doing anything in your applications to prevent the subscribers mailbox from getting flooded with :on_event
messages in this scenario? It seems like either your event processing is really really fast, or you would have to buffer up the messages in your application subscriber to prevent the mailbox saturation.
Regarding your concern about reading and then subscribing, I was misunderstanding how the buffered_messages
state was used, thanks for clarifying.
I don't think we're at risk of missing events here. Our application persists the event_number
only after it completes the processing of each event. Then, when we start the subscription, we start from the last persisted event_number
.
handle_info is async as handle_cast, so that's the reason why you keep getting messages. We are using Extreme.Listener for this situation. Did you try it?
We haven't tried the Extreme.Listener.
It looks like perhaps it could eliminate some of the boilerplate code around our subscriber, but I don't see any differences that would prevent the :on_event
messages from piling up in the Listener mailbox in the same way. Am I missing something?
I've seen a couple cases now in our apps where a large process mailbox significantly degrades performance. Nonetheless, my best guess here is that your apps see the same :on_event
message accumulation, but it's somehow not degrading the performance of your apps enough to create a problem.
If you see any other possibilities here, let me know. Otherwise, we can go ahead and close the issue.
Hi Tony,
You are right. Extreme.Listener would suffer the same issue. I was in a hurry last time and didn't think about it. There's no way to apply back pressure right now since we are just sending message {:on_event, event} from Subscription process to receiver. Still, that didn't produce any problems to projects we were using Extreme on.
However, we should address this using GenStage or something else but we can't do it yet, since few of our projects are on older Elixir and we have to migrate it first.
If you have some clever suggestion I'd appreciate it :)
GenStage does seem like a great fit for this.
Sorry to say that I don't have any suggestions that I would consider to be clever. :) Any ideas I've thought through just end up looking like a re-implementation of some aspect of GenStage.
The simplest I can think of is sending these big initial reads of 4K events to a separate :catch-up
callback and wait for the application to request the next batch.
I'm doubtful I'd be able to put any time into a PR for this any time soon, especially since we're quite content to just do our own catch-up reads and then start the subscription.
@tonyvanriet Have you tried using a persistent subscription to the Event Store?
When you connect to a persistent subscription you specify the buffer size, in number of messages, so your receiver won't be overwhelmed with messages during catch-up.
{:ok, subscription} = Extreme.connect_to_persistent_subscription(server, self(), group, stream, buffer_size)
Your receiver process still receives {:on_event, push_message}
, but must acknowledge receipt before it receives the next message. This takes advantage of the Event Store's built-in support for back pressure. Your receiver receives events only as fast as it can successfully process them.
You also get to observe the state of each persistent subscription in the Event Store admin UI.
We haven't gotten around to trying a persistent subscription yet, but it's on our list. Thanks for the pointer.
(actually this probably isnt something suited for genstage - but it is a throttling problem)
So this sounds like a problem that might suit genstage? I am no expert. But you shouldnt be continuing to read batches of events unless you are able to process them. I do have a load of eventstore and akka.net experience though.
So here is the disconnect between two actors ability to consume. So you need back pressure to stop the eventstore.
A persistent subscription is no use in a scenario in an aggregate where you want to reload it.
Perhaps the answer is snapshotting but even then you are just masking the issue which is at some point (because the consumer of events has to do more work than the producer) you will always blow up.
Make sense?
Persistent subscription has ack mechanism so it ensures that process is not under pressure. I wouldn't worry about back pressure.
Anyhow, since issue was reported that "other processes are timing out" this looks to me same as issue which will be resolved as soon as pull request #47 is merged into master. The issue was that protobuff messages get stuck into extreme server state, and process heap starts to grow and eventually extreme gets confused and stop sending events to subscribers/readers
@burmajam could you please review #47 and close it?
Thanks, Milan Jaric
We've attempted to use
read_and_stay_subscribed
to catch up and subscribe to a large stream. While processing the stream to catch up, we start seeing timeouts in other processes that are accessing the EventStore, or the Extreme TCP session will get closed from a heartbeat timeout.Our easy alternative is to manage the "catch up" ourselves by doing our own individual reads until we're caught up, and then turn on the subscription. Nonetheless, I thought I'd raise the issue with you to see if we might be missing something when it comes to how we're using
Extreme.Subscription
.I haven't dug down to root cause, but I suspect it's related to how the
Extreme.Subscription
starts out by reading from the stream as fast as it can and buffers up the events. It may be that these back-to-back reads are putting enough stress on the EventStore to cause these other timeouts.Alternatively, since the
:on_event
call is asynchronous and our receiver can't keep up with the initial reads, I'm concerned that the mailbox of our receiver is filling up with:on_event
messages which can kill process performance, but I haven't confirmed that theory yet.I was hoping to find some way to control the maximum size of the
Extreme.Subscription
state.buffered_messages
, or somehow apply backpressure toExtreme.Subscription
, but I don't see anything.Any thoughts?