AxonFramework / AxonFramework

Framework for Evolutionary Message-Driven Microservices on the JVM
https://axoniq.io/
Apache License 2.0
3.32k stars 790 forks source link

`BlockingStream#hasNextAvailable` results to `false` for fresh `BlockingStream` instances #3122

Closed abccbaandy closed 1 month ago

abccbaandy commented 1 month ago

Basic information

I use the demo project as base code.

Steps to reproduce

        try (BlockingStream<TrackedEventMessage<?>> stream = eventStore.openStream(null)) {
            System.out.println("before while");
            while (stream.hasNextAvailable()) {
                System.out.println("in while");
                TrackedEventMessage<?> event = stream.nextAvailable();
                System.out.println("Event: " + event);
            }
            System.out.println("after while");

        }

Expected behaviour

Should give me some event, just like readEvents

        DomainEventStream stream = eventStore.readEvents("A5E1F7D0-4C");
        System.out.println("before while");
        while (stream.hasNext()) {
            System.out.println("in while");
            DomainEventMessage<?> event = stream.next();
            System.out.println("Event: " + event);
        }
        System.out.println("after while");
before while
in while
Event: GenericDomainEventMessage{payload={CardIssuedEvent[id=A5E1F7D0-4C, amount=20]}, metadata={'traceId'->'a0eee3f1-e968-4b56-b43d-d9aef247f863', 'correlationId'->'a0eee3f1-e968-4b56-b43d-d9aef247f863'}, messageIdentifier='a76199ab-42a7-47b0-a87a-fcfb3541c6a4', timestamp='2024-09-03T07:55:09.186Z', aggregateType='GiftCard', aggregateIdentifier='A5E1F7D0-4C', sequenceNumber=0}
in while
Event: GenericDomainEventMessage{payload={CardRedeemedEvent[id=A5E1F7D0-4C, amount=12]}, metadata={'traceId'->'e40e8c34-5c3d-4577-868a-c7ace1b59de1', 'correlationId'->'e40e8c34-5c3d-4577-868a-c7ace1b59de1'}, messageIdentifier='1c05f88f-68b4-49aa-bd5d-bde7349f890f', timestamp='2024-09-03T07:55:28.538Z', aggregateType='GiftCard', aggregateIdentifier='A5E1F7D0-4C', sequenceNumber=1}
in while
Event: GenericDomainEventMessage{payload={CardRedeemedEvent[id=A5E1F7D0-4C, amount=1]}, metadata={'traceId'->'362d3184-7e20-439a-bacb-0d4e50827d06', 'correlationId'->'362d3184-7e20-439a-bacb-0d4e50827d06'}, messageIdentifier='aa4cf37c-3fdb-48e4-b907-bc16e394aa6e', timestamp='2024-09-03T09:04:56.106Z', aggregateType='GiftCard', aggregateIdentifier='A5E1F7D0-4C', sequenceNumber=2}
after while

Actual behaviour

The code only print

before while
after while
smcvb commented 1 month ago

What is unclear right now, @abccbaandy, is whether the Event Store you're using contains any events. Sure, the GiftCard Demo is a working project, but it still provides freedom on the chosen storage solution.

To conclusively show that what you state is correct a reproducible, like a test, in your own fork of the GiftCard Demo would help us out further.

The reason I am "pushy" on you providing the sample is that I am hard-pressed to assume that the StreamableMessageSource#openStream(TrackingToken) operation, when given null, is broken. This exact flow (read: with TrackingToken == null) is the default flow for now StreamingEventProcessor instances starting in Axon Framework since release 3.0.

Hence, I am looking forward to a reproducible of the scenario you're seeing to figure out what's amiss!

abccbaandy commented 1 month ago

What is unclear right now, @abccbaandy, is whether the Event Store you're using contains any events. Sure, the GiftCard Demo is a working project, but it still provides freedom on the chosen storage solution.

I use Axon server as event store. Also I think the eventStore.readEvents("A5E1F7D0-4C") part is proof the events exist.

The reason I am "pushy" on you providing the sample is that I am hard-pressed to assume that the StreamableMessageSource#openStream(TrackingToken) operation, when given null, is broken. This exact flow (read: with TrackingToken == null) is the default flow for now StreamingEventProcessor instances starting in Axon Framework since release 3.0.

Can you describe more detail about openStream?

I use null because I see a post here https://discuss.axoniq.io/t/read-all-events-in-eventstore/2188

To retrieve all the events in the store, you can use the EventStore#openStream(TrackingToken) method. If you use null as the given TrackingToken, it will start reading from the beginning of the stream.

I also try create a head/tail token, but no luck too.

smcvb commented 1 month ago

Also I think the eventStore.readEvents("A5E1F7D0-4C") part is proof the events exist.

Don't get me wrong, I am not stating the events don't exist in the code you have in front of you! Translating that over to me without a concrete sample is, however, unmanageable through the GitHub comment section.

Can you describe more detail about openStream?

Let me take the example of the PooledStreamingEventProcessor and the Coordinator of this Event Processor that opens the stream for different TrackingTokens (which can be null) and delegates the events to be processed.

The Coordinator, on line 928 performs the StreamableMessageSource#openStream(TrackingToken) method. This happens inside the private Coordinator#ensureOpenStream(TrackingToken) operation. If we trace where this method is being used, we land on line 783 in the Coordinator, which provides the local variable streamStartPosition to Coordinator#ensureOpenStream(TrackingToken) in the if-block the call is contained in. As shown on line 773, the streamStartPosition can be null.

Furthermore, following from all the tests performed on the PooledStreamingEventProcessor, Coordinator, and WorkPackage (these together construct the behavior when you select the PooledStreamingEventProcessor to stream events), I would anticipate nothing to be amiss with the StreamableMessageSource#openStream operation when used with a TrackingToken of type `null.

The only why I would be able to better grasp your scenario, @abccbaandy, is if you would provide me with a sample test case that consistently fails as you've described. Hence, a test method that roughly takes the following steps:

  1. Construct an event storage solution of any type.
  2. Add events to this event storage solution.
  3. Construct a StreamableMessageSource from this event storage solution.
  4. Invoke openStream with null on this newly constructed StreamableMessageSource.

If you can provide me with this, I would be able to reproduce the issue locally Exactly as you see it. Without that, my most reasonable basis for this is all the tests that Axon Framework contain that use the StreamableMessageSource#openStream operation with null.

I use null because I see a post here https://discuss.axoniq.io/t/read-all-events-in-eventstore/2188

Happy to see you're using our forum! The reply I gave there to saranga rao is still very much true. However, as pointed out, a clearer reproducible would help us to better understand what you're exactly doing.

I also try create a head/tail token, but no luck too.

What exactly doesn't work here. The creation of the head and tail TrackingToken fails, or providing a head/tail token to StreamableMessageSource#openStream(TrackingToken) doesn't work?

abccbaandy commented 1 month ago

Don't get me wrong, I am not stating the events don't exist in the code you have in front of you! Translating that over to me without a concrete sample is, however, unmanageable through the GitHub comment section.

Ok, this is my fork project with the read event api link https://github.com/abccbaandy/giftcard-demo/blob/master/src/main/java/io/axoniq/demo/giftcard/rest/GiftCardController.java#L117

You have to create some giftcards manually then call the api. GET http://localhost:8080/giftcard/getAllEvent In my local, it show

before while
after while
before while
in while
Event: GenericDomainEventMessage{payload={CardIssuedEvent[id=A5E1F7D0-4C, amount=20]}, metadata={'traceId'->'a0eee3f1-e968-4b56-b43d-d9aef247f863', 'correlationId'->'a0eee3f1-e968-4b56-b43d-d9aef247f863'}, messageIdentifier='a76199ab-42a7-47b0-a87a-fcfb3541c6a4', timestamp='2024-09-03T07:55:09.186Z', aggregateType='GiftCard', aggregateIdentifier='A5E1F7D0-4C', sequenceNumber=0}
in while
Event: GenericDomainEventMessage{payload={CardRedeemedEvent[id=A5E1F7D0-4C, amount=12]}, metadata={'traceId'->'e40e8c34-5c3d-4577-868a-c7ace1b59de1', 'correlationId'->'e40e8c34-5c3d-4577-868a-c7ace1b59de1'}, messageIdentifier='1c05f88f-68b4-49aa-bd5d-bde7349f890f', timestamp='2024-09-03T07:55:28.538Z', aggregateType='GiftCard', aggregateIdentifier='A5E1F7D0-4C', sequenceNumber=1}
in while
Event: GenericDomainEventMessage{payload={CardRedeemedEvent[id=A5E1F7D0-4C, amount=1]}, metadata={'traceId'->'362d3184-7e20-439a-bacb-0d4e50827d06', 'correlationId'->'362d3184-7e20-439a-bacb-0d4e50827d06'}, messageIdentifier='aa4cf37c-3fdb-48e4-b907-bc16e394aa6e', timestamp='2024-09-03T09:04:56.106Z', aggregateType='GiftCard', aggregateIdentifier='A5E1F7D0-4C', sequenceNumber=2}
after while

As you can see, the first while not print anything.

What exactly doesn't work here. The creation of the head and tail TrackingToken fails, or providing a head/tail token to StreamableMessageSource#openStream(TrackingToken) doesn't work?

Create head/tail token without error, but still not give me anything(no events).

abuijze commented 1 month ago

Hi @abccbaandy .

The issue is in the stream.hasNextAvailable() call. This one indicates whether there is an Event available in the stream for immediate consumption. Given you've just opened the stream, this is likely to return false, as the events just started flowing.

Instead, use a stream.hasNextAvailable(int, TimeUnit) call to allow it to wait for a certain time. Alternatively, use setOnAvailableCallback(Runnable) to register a callback that is invoked when messages have become available.

abccbaandy commented 1 month ago

Hi @abuijze Thanks for quick reply. But this behavior looks weird for me, I can't get the same feature like readEvents?(without delay)

In fact, this is not my first version. My first version is call asStream() and return as a Flux object

        return Flux.fromStream(eventStore.openStream(null).asStream());

But in the postman, I found it's empty forever.

smcvb commented 1 month ago

There is indeed quite a difference between streaming events through the openStream method and sourcing events through the readEvents operation.

We are aligning both a bit more with our development on Axon Framework 5, actually, but one fact remains:

Regardless, we acknowledge it isn't apparent that a newly constructed BlockingStream will 9 out of 10 return false for the hasNextAvailable operation. To that end, I've adjusted the JavaDoc to clarify this (which you can find in this commit).

Concerning your initial trial with the asStream operation, this would fail for a similar reason, actually. The BlockingStream#asStream method uses our StreamUtils#asStream(BlockingStream) utility method. This will, internally, construct a Spliterator that will retrieve all events present at that moment in time. As the BlockingStream is new, there are no events yet.

By the way, I cloned your fork (thanks again for providing it, @abccbaandy! :pray:) and played around with it. Increasing the time window on the hasNextAvailable invocation did resolve the problem at hand. As such, I feel comfortable that the problem has been resolved by (1) providing more information here and (2) appending it to the JavaDoc for future reference.

Thus, I will proceed by closing this issue as resolved. Final words: it might be easier to use the EventProcessors that Axon Framework provides instead of using the BlockingStream directly, @abccbaandy. There is obviously nothing wrong with playing around with this! But we have provided all that scaffolding around event processing with the intent to simplify the users life.

abccbaandy commented 1 month ago

Hmm...looks like there are so many ways to get those event. How about I list my current requirement, maybe you can provide more suitable function/way for my need?

I am thinking about use event store as the operation log, to track what users do to the resource(domain). I use sql to explain more easy. Get all event by some property, ex: user_id, domain_type.

select * from events where user_id = some_user order by create_time dest limit N
select * from events where domain_type = some_domain order by create_time dest limit N

If support some page feature like spring's Page would be better. Is this easy in current axon server event store?

By the way, we are not really use webflux, is axon suitable for Spring web? Does axon provide full old school blocking api? I guess it might be more stable without those "data not available yet" issue.

smcvb commented 1 month ago

How about I list my current requirement, maybe you can provide more suitable function/way for my need?

Sure thing, let me see how I can help you out here.


I am thinking about use event store as the operation log, to track what users do to the resource(domain). I use sql to explain more easy. Get all event by some property, ex: user_id, domain_type.

Although using the EventStore directly for this purpose, given the nature of an Event Store, this will become a very slow query eventually. Note that the intent of an Event Store is to maintain events until the end of time. Hence, the Event Store will always be growing.

Given the sample queries you've shared, I would recommend constructing dedicated projections for the trace information you are looking to share.

More concretely, I would thus recommend writing a class with @EventHandler annotated methods that map the events from the Event Store into a projection / query model of your choice.

By doing so, you will:

  1. Resolve the need to play with low-level APIs of Axon Framework.
  2. Benefit from the Event Processor support provided by Axon Framework,
  3. Benefit from being able to chose how to store your projection / query model and how to reply to it. You could, as requested, build a solution that allows Paging, for example.

Is this easy in current axon server event store?

Axon Server's UI has a "Seatch" page, where you can use a query language to filter events when needed. You could use this API in a Java application too, although it requires a bit more work.

By the way, we are not really use webflux, is axon suitable for Spring web?

For commands and queries the answer is an easy yes. We have a dedicated Axon Framework Extension to support Project Reactor, which you can find here.

Does axon provide full old school blocking api?

I would argue that the hasNextAvailable is the blocking API, is that style requires you to use a (busy) wait loop. The only difference compared to how you used it, was the need to use a time window.


I hope the above helps you further, @abccbaandy!

abccbaandy commented 1 month ago

In fact, you make me more confused 😢.

One of attractive feature is event store have persist event, and traditional service only persist state. So in theory, I can use some query language to get the events from event store as the proof to show someone did something to the some resource.

https://docs.axoniq.io/reference-guide/architecture-overview/event-sourcing#benefits

It's also in the official reference.

Given the sample queries you've shared, I would recommend constructing dedicated projections for the trace information you are looking to share.

Then what's the value of event store in biz logic? I think my requirement is common: I just want have a operation log and some filter.

With traditional service, I need write my own log for every user action(command). WIth axon, I still need to do the same thing?

abuijze commented 1 month ago

Then what's the value of event store in biz logic?

The value of the event store is that you have the events to build your projections from. But in the end, you build the projection in the way that suits your requirements best. You can project the event stream ad-hoc by just querying them and sending them to the UI. You can also project them into a separate database table that has the transformation already done.

Either way, the event store is not optimized for random queries. It is optimized for reading events for a single aggregate, streaming events in time-order, and appending events. If you want to query based on another property, you're best off using projections.

With traditional service, I need write my own log for every user action(command). WIth axon, I still need to do the same thing?

With Axon, you'd apply your Events, and Axon will ensure they are stored and taken into account for the next command. By default, Axon doesn't log/store commands. Only their outcome (the Events).