johanhaleby / occurrent

Unintrusive Event Sourcing Library for the JVM
https://occurrent.org
120 stars 16 forks source link

Some qs related to Occurent #130

Closed cloudcompute closed 1 year ago

cloudcompute commented 1 year ago

Hi @johanhaleby

This library looks promising.. Readme file says: It is not production ready yet. What else work is remaining so that it can be used in production? Do you have any plans to further this work?

Can we use it with Quarkus / GraalVM as-is?

Can we implement CQRS on top of Occurent that supports both eventual and strong consistency just like this Elixir-based library supports?

Thanks

johanhaleby commented 1 year ago

Hi and thanks for you interest in the project.

This library looks promising.. Readme file says: It is not production ready yet. What else work is remaining so that it can be used in production? Do you have any plans to further this work?

We've actually used it in production for more than a year. During this time I've discovered and fixed some hard-to-detect bugs. But we're using Spring (Spring Boot) so the hardening has mostly happened in the (blocking) spring implementations. So I think there are some things that should be "backported" to the vanilla/native MongoDB subscription model implementation. I'm thinking mainly of recovering and restarting subscriptions automatically on certain errors (which is happening in the Spring implementation). I'm actually not sure if it works with GraalVM, never tried it.

Can we implement CQRS on top of Occurent that supports both eventual and strong consistency just like this Elixir-based library supports?

Yes, at least in the Spring implementation, you can leverage its transaction support to update views/projection in the same tx that writing the events. See here for example. I'm not sure if Quarkus has something similar as Spring's @Transactional support, but if it does, I think it should work there as well.

Another way would be to create indexes in MongoDB and query the event store directly. I know that this is frowned upon by some, but in my experience, it works well for small to mid-sized projects (at least). This way the "projections" will also be consistent.

cloudcompute commented 1 year ago

It is good to know that Occurent is getting used in production. It is sounding exciting now. I have some more questions, kindly reply, it'd help me decide whether I can use Occurent.

It looks like that Spring (Spring Boot 3) offers support for GraalVM which is very fast and consumes far less resources than JVM. Taken from its documentation Spring Native provides support for compiling Spring applications to native executables using the GraalVM native-image compiler.

a. Do you use Spring Native image in your production applications? If Spring has become as efficient as Quarkus, then I can use anyone. Yes, Quarkus provides support for @Transactional.

b. The flow of my application is something like this: App writes to Event Store ---> Use Debezium CDC to write to Nats (or kafka) Pub/Sub ---> Subscribers read and update their projections. So Occurent fits in this scenario?

c. I'd prefer Postgres to Mongo on the CQRS write-side. On the read-side there could be several like ElasticSearch, or Analytics database, or MongoDB.

Would it be difficult to implement PostgreSQL implementation for Occurent?

d. As you mentioned, you are using the @Transactional for implementing strong consistency. In case Postgres is the write-db and Mongo/Elastic are the read stores, I guess @Transactional won't be of any use to achieve strong consistency. For this, we need Microservices worflow engine (their Orchestration) that supports transactions across multiple dbs. Am I right?

e. Event sourcing has some other features too, like Snapshots, Replay, delete the events for a given aggregate key. Does Occurent support them too, or we need to write the logic ourselves? Axon and Eventuate frameworks implement them.

f. These 2 frameworks also implement Saga-based orchestration. Could you pl. hint is there any such library (that gives Saga and API composition) whose API is pure functional.

I'm thinking mainly of recovering and restarting subscriptions automatically on certain errors (which is happening in the Spring implementation).

Kindly explain what does this mean, do you want to update Occurent having this feature?

I am thankful for your kind support.

johanhaleby commented 1 year ago

a. Do you use Spring Native image in your production applications? If Spring has become as efficient as Quarkus, then I can use anyone. Yes, Quarkus provides support for @transactional.

No, we're not using Spring Native, and I've never tried it tbh. Would be a nice experiment to see if it works out of the box. I don't think I'm using any reflection in Occurrent (unless you use Jackson, but I guess this must be a sovled problem since so many users are using it) as far as I can remember.

b. The flow of my application is something like this: App writes to Event Store ---> Use Debezium CDC to write to Nats (or kafka) Pub/Sub ---> Subscribers read and update their projections. So Occurent fits in this scenario?

Yes, instead of Debezium, Occurrent uses MongoDB change streams to achieve the same thing.

c. I'd prefer Postgres to Mongo on the CQRS write-side. On the read-side there could be several like ElasticSearch, or Analytics database, or MongoDB.

Would it be difficult to implement PostgreSQL implementation for Occurent?

For projections, use can store the data whereever you want. If you're talking about the write side then it will require some work, but I think it should be doable. I've also been interested in writing a Postgres/JDBC implementation, but since we're not using it in production ourselves, I haven't given it that much thought as of yet.

d. As you mentioned, you are using the @transactional for implementing strong consistency. In case Postgres is the write-db and Mongo/Elastic are the read stores, I guess @transactional won't be of any use to achieve strong consistency. For this, we need Microservices worflow engine (their Orchestration) that supports transactions across multiple dbs. Am I right?

I don't think it's possible to get strong consistency in these cases (and honestly you probably don't need it?) unless you're using something like XA transaction manager (and both DB's support it).

e. Event sourcing has some other features too, like Snapshots, Replay, delete the events for a given aggregate key. Does Occurent support them too, or we need to write the logic ourselves? Axon and Eventuate frameworks implement them.

Occurrent supports replay and deleting of events and streams. There's no built-in support for snapshots yet, but I've been thinking about adding it. I've not found the need to use it myself yet though and it's kind of an anti-pattern (you should be "closing the books" instead).

f. These 2 frameworks also implement Saga-based orchestration. Could you pl. hint is there any such library (that gives Saga and API composition) whose API is pure functional.

There are links to some on the Occurrent website. But I want to create something Occurrent specific :)

I'm thinking mainly of recovering and restarting subscriptions automatically on certain errors (which is happening in the Spring implementation).

This is already supported in Occurrent if your using a durable subscription model.

cloudcompute commented 1 year ago

Awesome reply!

Yes, instead of Debezium, Occurrent uses MongoDB change streams to achieve the same thing.

Wonderful, so this completely eliminates using another server (Debezium). So Mongo change streams write to a pub/sub messaging system in the same transaction (the one that is used to write data in Mongo store)?

strong consistency in these cases (and honestly you probably don't need it?) unless you're using something like XA transaction manager (and both DB's support it).

Probably in few scenarios, when a user is curious enough to read his own writes (immediately), may be when he enters a new record in a table (inline editing). I am not sure though. But again, we can use Optimistic UI.

In case strong consistency is required, I'd prefer Saga to XA TM. I believe both must be taking almost same time.

There are links to some on the Occurrent website. But I want to create something Occurrent specific :)

Thank you for sharing the links. Baker and nflow look good at first sight, I will have a look at them. I know about Temporal and Cadence but they are heavy weight solutions. Even Kogito (a project under Quarkus umbrella) provides support to both DMN and BPMN.

There is a messaging framework that is used in Quarkus which states that you can build event-sourcing apps. Taken from there.. SmallRye Reactive Messaging is a framework for building event-driven, data streaming, and event-sourcing applications.

Tbh, an implementation of Occurent-specific saga will consume a lot of time.

I've also been interested in writing a Postgres/JDBC implementation

Great if you get time. If I decide to use Occurent and Postgres, I'll share that work with you. One of the main reason of using Postgres is that there is a wonderful open source alternative to AWS Aurora Postgres, named Neon: Serverless Postgres But the downside is we need to use Debezium.

Mongo's licensing is quite restrictive, therefore I want to avoid it. They use Server Side Public License while Postgres licensing is very liberal. Anyhow, I'd try to find self-hosted distributed data stores which offer something similar to MongoDB change Streams so that we don't need to use Debzium.

If we use Kubernetes, then let us manage Postgres scaling ourselves using Neon as one of the pods. No need to pay to AWS.

Snapshots are probably useful only if we have millions of events an they'd speed up things. I guess it is an anti-pattern because we have to save the snapshots in addition to event source, is that so?

2 more qs, simple enough though.

How does Occurent handle Optimistic Concurrency Control when two or more users race to write/update an aggregate simultaneously? Do we need to it of our own, probably using Etags?

I hope I'd be able to use Occurrent with Nats Jetstream which is Cloud Events compliant.. Pl. confirm.

Thanks once again.

johanhaleby commented 1 year ago

Awesome reply!

Yes, instead of Debezium, Occurrent uses MongoDB change streams to achieve the same thing.

Wonderful, so this completely eliminates using another server (Debezium). So Mongo change streams write to a pub/sub messaging system in the same transaction (the one that is used to write data in Mongo store)?

No, it's not the same transaction. The events are propagated via mongo change streams to subscriptions after they've been written to the "event store". This is how it usually works, and it's by design. Note that as long as you use a durable subscription model (from Occurrent) you're guaranteed to receive all events, so you don't have to worry about losing data, even on crash etc (Occurrent implements at least once delivery semantics for (durable) subscriptions).

strong consistency in these cases (and honestly you probably don't need it?) unless you're using something like XA transaction manager (and both DB's support it).

Probably in few scenarios, when a user is curious enough to read his own writes (immediately), may be when he enters a new record in a table (inline editing). I am not sure though. But again, we can use Optimistic UI.

In case strong consistency is required, I'd prefer Saga to XA TM. I believe both must be taking almost same time.

There are links to some on the Occurrent website. But I want to create something Occurrent specific :)

Thank you for sharing the links. Baker and nflow look good at first sight, I will have a look at them. I know about Temporal and Cadence but they are heavy weight solutions. Even Kogito (a project under Quarkus umbrella) provides support to both DMN and BPMN.

There is a messaging framework that is used in Quarkus which states that you can build event-sourcing apps. Taken from there.. SmallRye Reactive Messaging is a framework for building event-driven, data streaming, and event-sourcing applications.

Tbh, an implementation of Occurent-specific saga will consume a lot of time.

I've also been interested in writing a Postgres/JDBC implementation

Great if you get time. If I decide to use Occurent and Postgres, I'll share that work with you. One of the main reason of using Postgres is that there is a wonderful open source alternative to AWS Aurora Postgres, named Neon: Serverless Postgres But the downside is we need to use Debezium.

Mongo's licensing is quite restrictive, therefore I want to avoid it. They use Server Side Public License while Postgres licensing is very liberal. Anyhow, I'd try to find self-hosted distributed data stores which offer something similar to MongoDB change Streams so that we don't need to use Debzium.

If we use Kubernetes, then let us manage Postgres scaling ourselves using Neon as one of the pods. No need to pay to AWS.

Snapshots are probably useful only if we have millions of events an they'd speed up things. I guess it is an anti-pattern because we have to save the snapshots in addition to event source, is that so?

2 more qs, simple enough though.

How does Occurent handle Optimistic Concurrency Control when two or more users race to write/update an aggregate simultaneously? Do we need to it of our own, probably using Etags?

Occurrent handles this for you, see write condition. If you use an application service it happens transparently. You'll get an exception, WriteConditionNotFulfilledException if this happens.

I hope I'd be able to use Occurrent with Nats Jetstream which is Cloud Events compliant.. Pl. confirm.

Cool, never heard of it. But you should be able to pipe events, using an Occurrent subscription, to nats jetstream. If you try it out let me know how it goes :)

Thanks once again.

cloudcompute commented 1 year ago

Hi @johanhaleby

Hope you are doing well. Instead of using Java, we have almost decided to use Rust for the forthcoming even-driven/event-sourcing project for a couple of reasons:

a. JVM takes a lot of memory and its garbage collector has issues. b. Rust is very fast, takes very less memory, and several other advantages.

I have found a ES-CQRS library written in Rust and will prove a very good starting point. While making modifications to it, I'll certainly recommend your code especially making sure writing events using Cloud Events format in the event store.

Thank you for your kind guidance.

johanhaleby commented 1 year ago

Okay, thanks for your interest anyways :)

a. JVM takes a lot of memory and its garbage collector has issues. b. Rust is very fast, takes very less memory, and several other advantages.

It does take up a lot of memory, but when it comes to GC, I don't really know what you mean. GC in Java is top-notch nowadays, and I seriously doubt that you'll run into trouble with the GC, quite the opposite.

cloudcompute commented 1 year ago

A decade ago, I used to work in Java. As far as I am able to recall, it is JVM that decides when to call Garbage Collector.. and when it does so, CPU usage becomes high affecting the application's performance.

Also one has to be careful that no memory leaks should be there.. Taken from this link When Garbage Collection runs repeatedly in the JVM, CPU consumption will start to spike. It is a computation-intensive operation, ... When an application is suffering from a memory leak, it will continuously create objects without releasing them.

This link gives insightful answers to Memory usage Rust vs Java.

johanhaleby commented 1 year ago

Yeah, I know about the rust memory model, but I still doubt that you'll run into issues with GC in modern Java. A lot has happened in this space in the last 10 years. How many transactions per second are you expecting? I think you'll run into issues with the DB sooner than Java, especially if you choose the reactive implementations (or project loom). See e.g. https://projectreactor.io/ that Occurrent uses under the hood for the reactive implementation:

Reactor operators and schedulers can sustain high throughput rates, on the order of 10's of millions of messages per second. Its low memory footprint goes under most radars.

johanhaleby commented 1 year ago

I am hearing about Project Reactor for the first time. So the Occurent's flow of program is like this: Java code (invoked from REST API) calls the functions on this reactive implementation in an asynchronous manner which further writes the event data in MongoDB?

No, it's not asynchronous, it's non-blocking which enables much better throughput (if this is what you need). The programming model is more functional though, so there's potentially a learning curve. I wouldn't go down this route unless you know that you need the performance.

A lot has happened in this space in the last 10 years.

I fully agree. So I can safely assume that GC won't be an issue.

Well, I don't know about your use case so I cannot safely say anything, you would have to test it yourself. However, I've been working with highly scalable JVM systems for almost 20 years, and I can probably count on one hand the times I've had to tweak the GC.

How many transactions per second are you expecting?

Do you mean how many (approximate) writes would be there in the underlying event store per second?

Yes

I think you'll run into issues with the DB sooner than Java.

If I choose to use Rust, then DB performance would be slow.. is this you want to say?

No, I just mean that you'll probably run into performance issues with the DB before you run into performance issues with the Java GC. But I don't know your use case, so it's just a hunch.

cloudcompute commented 1 year ago

I am hearing about Project Reactor for the first time. So the Occurent's flow of program is like this: Java code (invoked from REST API) calls the functions on this reactive implementation in an asynchronous manner which further writes the event data in MongoDB?

A lot has happened in this space in the last 10 years.

I fully agree. So I can safely assume that GC won't be an issue.

How many transactions per second are you expecting?

Do you mean how many (approximate) writes per second would be there in the underlying event store?

I think you'll run into issues with the DB sooner than Java.

If I choose to use Rust, then DB performance would be slow. And there must be some library in the Rust ecosystem that is as efficient as Project Reactor is, then only I should consider using Rust. Is this what you want to say?

By the way, I am planning to write the events to Nats Streaming itself, bypassing the Mongo like database altogether. So in this case, Nats would be doing two things simultaneously: a. event sourcing.. act as a single source of truth b. pub-sub messaging system

What is your viewpoint regarding this?

Thanks

johanhaleby commented 1 year ago

I don't know much about Nats, but it needs to support, among other things, stream-level consistency/versioning. This is one of the reasons why Kafka won't work as an event store afaik.

cloudcompute commented 1 year ago

So this means that our ES library should work like this: its Java (or Rust) code makes a call to the third-party reactive stream implementation library which further writes the event data to an event store.

Kafka won't work as an event store

You are correct. Confluent is pretty successful in gathering a lot of hype around Kafka. But it is very difficult to administer, consumes so much of resources. Nats, on the other hand, looks much better than Kafka overall, but Synadia's/Nats marketing does not match that of Confluent/Kafka.

I am going to do ES for the first time. I am just banking on this article which is written by the Nats' Release manager. It claims that ES is possible with Nats. Also, the company behind WasmCloud (trying to build a platform using Web Assembly .. as an alternative to Kubernetes) is using Nats for ES.

Nats supports versioning. This article says..each entity stream (per subject) has its own consistency boundary and thus can be appended or read concurrently with other entity streams. Looks like this is stream-level consistency, I am not sure though.

How many transactions..

Somewhere around 30 K writes per second.

you'll probably run into performance issues with the DB before you run into performance issues with the Java GC.

Got it.

I am not a fan of OOP anymore. In this forthcoming project, I'll do functional programming, I know it is very much different than OO but I'm sure I'd be able to code well using functional programming concepts.

I've been working with highly scalable JVM systems for almost 20 years, and I can probably count on one hand the times I've had to tweak the GC.

Wonderful !

johanhaleby commented 1 year ago

Somewhere around 30 K writes per second.

This is a lot, you will accumulate A LOT of events very fast. How long will you store these events? Usually, in event sourcing, you want to keep streams short and migrate them to cold storage when they are not needed anymore. Or "simply" get rid of old events when they are no longer needed, this is called "closing the books".

I would make absolutely sure that ES is the right solution for your problem before picking this technology. Maybe auditing would suffice?

cloudcompute commented 1 year ago

How long will you store these events?

As soon as a particular workflow related to a given set of events is complete. For example, a request for quotation is received (initial event) --> payment received (final event). In case, a workflow is dependent on some historical-events data, I think we can take snapshots Or simply do the necessary calculations on that data before we move it to cold storage.

Migrate the old events to cold/tiered storage Or delete them.

Probably, most of the events would be moved to cold storage. There might be several events that could be deleted. Thorough analysis has not been done yet, will do it in iterations.. agile way, as the development progresses.

I would make absolutely sure that ES is the right solution for your problem before picking this technology. Maybe auditing would suffice?

Replay is mandatory for auditing and analytical purposes, bug fixing.

Nats supports versioning. This article says..each entity stream (per subject) has its own consistency boundary and thus can be appended or read concurrently with other entity streams.

Great if you can confirm whether this is stream-level consistency in Nats.

Regards

johanhaleby commented 1 year ago

I don't know enough about nats to confirm this, but it sounds like from what you've written above.

cloudcompute commented 1 year ago

Regarding the Event Sourcing strategy that I wrote above (how long to store them...), do you find it okay?

Initially, the traffic would be too little. With time, it'd grow and at the peak, it may touch to 30K.

cloudcompute commented 1 year ago

HI @johanhaleby

Yesterday, I was going through the occurent.org's contents.

Initially, I was of the impression that Occurent is writing the MongoDB data to a pub-sub messaging system. But after reading the documentation, I got to know that Occurent is itself a pub-sub. And, a combination of Reactive Streams (in Java) + MongoDB Change Streams is making it so.

You must be applying the concept of Reactive streams (backpressure) on both write and read sides of ES / CQRS.

Is my perception correct? Is its performance quite efficient?

johanhaleby commented 1 year ago

Well, I would rather see it as an "event store". All event stores must support some sort of "pub sub" in that you can subscribe to event changes. This is implemented in Occurrent using MongoDB change streams.

When it comes to the reactive implementation I'm just using the default backpressure semantics. You can change how backpressure etc works yourself when interacting with the project reactor (flux/mono) API.

cloudcompute commented 1 year ago

In case, you need pub-sub on Postgres, this library could be of help https://github.com/supabase/realtime

johanhaleby commented 1 year ago

Thank you, but it won't work for Occurrent due to this: https://github.com/supabase/realtime#does-this-server-guarantee-message-delivery

cloudcompute commented 1 year ago

Hi @johanhaleby

I want to use MySQL as an event store and the flow of events is: Producer App --> MySQL --> CDC connector --> Kafka --> Consumer app

A producer app will write cloud-events to MySQL in a non-blocking way. Could you pl. hint which files of the Occurrent library do I need to work upon?

I think Subscription code (of Occurrent) won't be required because a consumer app would read from Kafka (eventual consistency).. correct?

Thanks

johanhaleby commented 1 year ago

If you're looking for a non-blocking implementation in Occurrent you can have a look at ReactorMongoEventStore.

cloudcompute commented 8 months ago

Hi @johanhaleby

It is good to see that this project has progressed a lot. I was going through the example folder. Almost all 'domain' examples are Kotlin. Only one is Java.. do you have any other domain examples written in Java that cover all the Occurrent scenarios.

In one example, you are writing an integration event to RabbitMQ, do we need to declare such an event?

When does this code get executed, immediately after a domain event is written to MongoDB? Pl. try to explain in detail.

Thank you