AxonFramework / extension-mongo

Axon Framework extension for Mongo integration as a Dead Letter, Event, Saga and Tracking Token storage solution.
https://axoniq.io/
Apache License 2.0
24 stars 16 forks source link

Mongo tracking token becomes too large during replay #9

Open martijnvanderwoud opened 4 years ago

martijnvanderwoud commented 4 years ago

During replay of an event processor, a MongoTrackingToken is wrapped inside a ReplayToken as the tokenAtReset attribute. I presume that this is necessary to detect when the processor is back at the initial tracking position and the replay is thus finished. However, I see that all processed events during replay are appended to the trackedEvents collection of the wrapped token. Of course, this will cause the wrapped token to grow very quickly. For me, this eventually results in a failure when attempting to store the token: "Packet for query is too large (4.242.184 > 4.194.304). You can change this value on the server by setting the 'max_allowed_packet' variable." (JPA token store)

smcvb commented 4 years ago

The following is stated in the Javadoc of the MongoTrackingToken around the necessity of the trackedEvents Map:

This tracking token implementation keeps track of all events retrieved in a period of time before the furthest position in the stream. This makes it possible to detect event entries that have a lower timestamp than that with the highest timestamp but are published at a later time (due to time differences between nodes).

Thus, although I understand it increases the size of the ReplayToken even further if it consists out of two MongoTrackingToken, at this stage I am uncertain how to resolve this from with the extension. If we would drop the collection, we'd lose the certainty of ordering.

If we could move the MongoTrackingToken towards a similar model as the GlobalSequenceTrackingToken, thus not based on timestamps but based on a global incrementing index, then we could omit the usages of the collections of trackedEvents. This will require us to set up a sequence generator from within Mongo, firstly. Doable, but a quick Google suggests this is not the way to go...

So, at this stage @martijnvanderwoud, I am uncertain how to resolve this for you, other than stating: why not change the max_allowed_packet as is suggested?

If you have a solution how to omit the trackedEvents, please do share it! Maybe we can incorporated a smart solution here to support smaller MongoTrackingTokens from within this extension.

martijnvanderwoud commented 4 years ago

Hi Steven, thanks for your reply!

Perhaps I did not describe the problem clearly enough: ALL events encountered during a replay are added to the tokenAtReset attribute. This means that changing max_allowed_packet will not fix the problem, since the token would contain over 300K tracked events before my replay is done.

I agree that we can not reasonably avoid having the trackedEvents collection in the MongoTrackingToken, but perhaps we can avoid modifying the tokenAtReset during a replay. I looked at the code and I suspect that the implementation of org.axonframework.extensions.mongo.eventsourcing.eventstore.MongoTrackingToken#covers might not be entirely correct. Considering the org.axonframework.eventhandling.ReplayToken#advancedTo method:

@Override
public TrackingToken advancedTo(TrackingToken newToken) {
    if (this.tokenAtReset == null
            || (newToken.covers(WrappedToken.unwrapUpperBound(this.tokenAtReset))
            && !tokenAtReset.covers(WrappedToken.unwrapLowerBound(newToken)))) {
        // we're done replaying
        // if the token at reset was a wrapped token itself, we'll need to use that one to maintain progress.
        if (tokenAtReset instanceof WrappedToken) {
            return ((WrappedToken) tokenAtReset).advancedTo(newToken);
        }
        return newToken;
    } else if (tokenAtReset.covers(WrappedToken.unwrapLowerBound(newToken))) {
        // we're still well behind
        return new ReplayToken(tokenAtReset, newToken, true);
    } else {
        // we're getting an event that we didn't have before, but we haven't finished replaying either
        if (tokenAtReset instanceof WrappedToken) {
            return new ReplayToken(tokenAtReset.upperBound(newToken), ((WrappedToken) tokenAtReset).advancedTo(newToken), false);
        }
        return new ReplayToken(tokenAtReset.upperBound(newToken), newToken, false);
    }
}

If my debugging observations are correct, it seems that tokenAtReset.covers(WrappedToken.unwrapLowerBound(newToken)) resolves to false even when the tokenAtReset was created as a head token. As a result, the token is replaced with tokenAtReset.upperBound(newToken) every time the replay token is advances. Consequently the tokenAtReset grows and grows until it contains many thousands of tracked events and eventually fails to be stored.

The practical outcome is that replaying any non-trivial number of events is now impossible for tracking event processors consuming from the Mongo event store

martijnvanderwoud commented 4 years ago

Looking further at org.axonframework.extensions.mongo.eventsourcing.eventstore.MongoTrackingToken#covers:

 @Override
    public boolean covers(TrackingToken other) {
        Assert.isTrue(other instanceof MongoTrackingToken, () -> "Incompatible token type provided.");
        //noinspection ConstantConditions
        MongoTrackingToken otherToken = (MongoTrackingToken) other;

        long oldest = this.trackedEvents.values().stream().min(Comparator.naturalOrder()).orElse(0L);
        return otherToken.timestamp <= this.timestamp
                && otherToken.trackedEvents.keySet().stream()
                                           .allMatch(k -> this.trackedEvents.containsKey(k) ||
                                                   otherToken.trackedEvents.get(k) < oldest);
    }

I suspect that this statement is problematic: long oldest = this.trackedEvents.values().stream().min(Comparator.naturalOrder()).orElse(0L);. If this token contains no event, it is assumed that the oldest tracked event in this token is at the unix epoch. This would result in returning false for any other token containing at least one tracked event.

Maybe this statement could be changed as follows? long oldest = this.trackedEvents.values().stream().min(Comparator.naturalOrder()).orElse(this.timestamp);

smcvb commented 4 years ago

Thanks for further clarification on the matter @martijnvanderwoud; always very helpful is a contributor dives into the matter as well :-)

Sounds like you are on to something here...have you by any chance tried this out with a local snapshot build of the Mongo Extension? Or, if you are up for it, you can provide a PR with a test case validating the desired behaviour.

If all seems well, I feel such a fix justifies a bug release ASAP; just so you know.

martijnvanderwoud commented 4 years ago

@smcvb sorry for the late reply. For the particular project where I was facing this issue I decided to switch to a JPA-based event store

I think an ASAP bug release might be a bit much, because the problem only occurs when the tokenAtReset does not contain any processed events. I presume that this is not a common scenario.

I stumbled upon this after migrating events from a hand-rolled CQRS solution into the Axon event store. The event processors were initialised with HEAD tokens because the read models were already populated. When I tested replaying one of these processors I ran into this issue. I could have worked around this by initialising that particular read model with a tail token instead of triggering a replay.

smcvb commented 4 years ago

ASAP might be putting it harsh, true, as the occurrences of the issue are likely. Still, it seems pretty breaking to me, hence my request whether you've resolved this with a local snapshot.

However, as you've pointed out, you've decided against Mongo as the event storage solution. Hence I am guessing you haven't tested this solution you've shared locally, correct?

martijnvanderwoud commented 4 years ago

Hence I am guessing you haven't tested this solution you've shared locally, correct?

Yes you are correct, sorry. I was hesitant to go down the path of making this kind of change without having more insight in the workings of the various tracking token methods

re-easy commented 4 months ago

Is there any chance that this issue gets fixed?

smcvb commented 4 months ago

Is there any chance that this issue gets fixed?

Fair question, @re-easy. As it stands, we don't foresee a non-breakable way forward to resolve this.

As we don't intend to make any breaking changes between minor version (e.g., between 4.8.0 and 4.9.0), I am afraid we can't do a lot right now.

However, one thing that made me think, is @martijnvanderwoud his response on May the 6th, 2020:

I think an ASAP bug release might be a bit much, because the problem only occurs when the tokenAtReset does not contain any processed events. I presume that this is not a common scenario.

His argument is valid here. Typically, the tokenAtReset has already handled events in the past. However, within the Axon Framework project, we've recently set the default token to a ReplayToken as well, which might impact you here.

To be able to deduce whether that's the case for you, @re-easy, could you share with us the version of Axon Framework and the Mongo Extension you're using?

Finally, just to be clear, there might be something we can do right away to battle this recent change in Axon Framework. But, we are not able to make the changes I've suggested here within the lifecycle of 4, I am afraid.

Lastly, I do want to remind you that we don't feel Mongo is the best storage solution for events. If you're able to, I would recommend switching to an RDBMS solution (JPA or JDBC) or Axon Server.

Regardless, keep us posted on the version you're using, @re-easy!

re-easy commented 4 months ago

@smcvb We are using Axon framework and the Mongo extension of version 4.9.0.

At the moment we rebuild our projections (replay) by roughly deleting the tracking token entries for the event processor and then restart the service. I guess this method does not benefit from what you described about the tokenAtReset. So we have to use the replay API instead I guess.

Thanks for sharing!

gklijs commented 4 months ago

Another way is to set the default token to a normal tail token instead of a replying token. You do need to specify this for each event processor individually.

re-easy commented 3 months ago

@gklijs Thanks for this neat trick. I set the token of each segment of the event processor to the first domain event and restarted the service. The replay worked without any performance issues. Does it work the same way when I have multiple instances of the service running?

gklijs commented 3 months ago

@re-easy Sorry I missed your message. The initial token is only created once. So, that should also work with multiple instances.