exponentially / extreme

Elixir Adapter for EventStore
MIT License
129 stars 31 forks source link

Correlation ID subscription for Process Managers #19

Closed henry-hz closed 7 years ago

henry-hz commented 7 years ago

Dear @burmajam , Greg Young gave us support on how to design the process manager, https://github.com/slashdotdash/commanded/issues/44 . How could I proceed to subscribe to correlation-ids ? Does extreme support them ? I found this concept map confirming the idea: http://www.conceptmaps.io/maps/0acfabc1-5e39-4dd7-9590-3b32c2918ec8/detail

mjaric commented 7 years ago

Hi @henry-hz,

Correlation ID is application pattern, it is not part of event sourcing store, but you can store this info in event sourcing store as meta data. It realy depends on case and mostly it is used to track initiative trough the system.

Lets take for example software development lifecycle. The client initiate initiative with one setance, you store ti to ticketing service with specifici ID and then the rest participants create other tasks (commands) for the team. Every task (epic, stories, technocal tasks...) has meta data which holds this initiative ID so you can easily present to the client progress and workoad. When all tasks with this id are done, then initiative is considered as complete.

Now you coud apply this to microservices too where specific user command provides correlation id, all dervided commands and evenets and messages should contain the same value for correlation id.

To achevie this in event store use event metadata to store for each event its correlation id. then if you realy need to process it in your microserrvices you will need router which can route message by correlation id or to create application queue and subscriptions to specific correlation id but that realy depends on bisiness requirements. If you want to manage back preasure using this to avoid long return path when command is traveling trought number of processes then you can subscribe to internal pubsub service which should return completed response back to command source.

Now, the erlang way would be to pass initiator pid as correlation id all way down to process layer to keep track to whome should response be returned. And this will be ok in many caseses except when you need gateway to third party or some other service which is not in same domain as your aggregate but such task is part of that business transaction. Many frameworks has covered this scenario. Im coming from .NET world and I used MassTranzit and NServiceBus. They masively used correlation id in Saga handlers. Saga is similar to actor in many ways and represents a business process. If you look at them you will se that saga subscribes to response from gateway with correlation id but when message response returns this subscription deletes. Subscription constraint was defined with tuple correlationid, request/response message type.

Finally, extreme dont have builtin support for correlation id, neither event store has it except transaction id or sequence id where you can see which events are coming from which transaction and thats it. But I will consult with burmaja tomorrow maybe he has some ideas how this could be done in erlang, so we will share thoughts.

Btw, is there specific issue you want to resolve in your framework?

Cheers

henry-hz commented 7 years ago

Thanks for the feedback @mjaric , it helped me to have a better view on how process managers will benefit from the correlation-id interacting with external services. Yes, I am working on https://github.com/slashdotdash/commanded/ that is a collection of building block to build CQRS/DDD/ES applications. It started with postgre, and I am writing the multi-support db abstraction. The next target is Eventstore, but during the Process Manager refactoring, even that Commanded has a very beautiful design, the Process Manager implementation was using a "brute force" subscribe-all implementation for every Process Manager instance. As we expect to have a high number of process concurrently, I looked for a better design before moving ahead with the db abstraction. Greg Young is helping us, in issue 44, today, suggesting PM's to subscribe to the projection below, using metadata on eventstore, so even without a MQ in the middle, PM's will be able to listen the data that it should be (instead of filtering all the data for every PM pid).

In a few words, the issue is to re-design the PM using a FSM style as we coded here, and implementing a "correlation-id" subscription to follow the process from the start to the end.

fromAll().
    when({
           $any : function(s,e) {
                 if(e.metadata.correlationId) {
                       linkTo(e.metadata.correlationId, e);
                 }
           }
    })
mjaric commented 7 years ago

I see, you want to partition events in ES, that is ok if you want to avoid MQ but if persistence is not event store then again it will be issue if you use db backends like rdbms since it will kill that single table. Re-indexing such table with millions of records can give you a lot of trouble in 24/7 businesses. Alternative of creating new tables will also create mess. Messaging is by my opinion better approach in long term since queues can be kept shorter when you delete message on ack. The question is do you need long term persistent list of correlated messages?

henry-hz commented 7 years ago

if we persist correlation-id on meta, it should be okay. for the process manager, our long business process should be 3 months, but anyway, we need an immutable db, once we are keeping financial transactions, so we do need to persist them

mjaric commented 7 years ago

Definetly, but trick is that you need that info for process state machine or to correlate messages which are comming from outher world and should be related to some aggregate command which happend 3 months ago so you can close business transaction (commit it). Im a bit confused why you are trying to expose correlation id to process manager, i would rather create a handler which should accept a message by its type/kind in each domain which is interested for such type and then find who is subscribed to that correlation id. It is still pubsub technique and can routed easily back to aggregate which expects such mesaage. You can easily apply TTL to guarantee SLA and avoid waiting too long for response from other endpoints, simply respond with HTTP 498 if you message comes trough some rest hook if none of domain aggregates is subscribed to such correlation id and discard message. This way you can handle both type of asyncronious messages, the one you need asap to complete and second which can wait indefinetly. So I wouldnt bother process manager with such feature

mjaric commented 7 years ago

One more thing, you can still use projection for correlation ids Greg mentioned and subscribe pubsub service/process to create routing logic + queue for uncompleted business transactions. From events you will know stream id and correlation id, that is all you need to pass back awaited message :)

henry-hz commented 7 years ago

thanks @mjaric :)

burmajam commented 7 years ago

CorrelationID is logical thing. It can be user_id, company_id, whatever_id and it is part of various events. You have to listen all relevant events (from different applications / indexed streams), take that "correlation_id" and forward event to relevant process manager.

henry-hz commented 7 years ago

@burmajam got it. I wrote two approaches on how to implement and abstract in a way that is suitable both for extreme and eventstore(postgre) here