nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.95k stars 1.41k forks source link

Jetstream: transactional batch publish messages #2384

Open sloveridge opened 3 years ago

sloveridge commented 3 years ago

Feature Request

New pub style analogous to fetch for pull based consumers to allow multiple messages for a stream to be published with a single call and all succeed or fail (effectively a transaction). Messages can be for multiple subjects in the same stream.

Use Case:

There are cases where multiple messages must all successfully publish to ensure consistent state on the system. An example of this using ORDERS would be when an incoming order will be fulfilled by multiple locations and has its line items split on publish. A single order may then result in multiple messages with a format such as:

ORDERS.{locationId}.{itemId}

If there is a connection failure in the middle of publishing the orders messages the systems state is now inconsistent (a partial order is in the stream). The ideal way of handling this would be to publish all of these messages in a single batch which would succeed / fail as a whole. Note messages should not be available to consumers until the batch was successful.

Another scenario where this may be useful is to allow clients to buffer messages before sending, ie bulk insert.

Proposed Change:

Using the Go client as an example something along the lines of:

type batch struct {
    subj String
    payload [][]byte
}

PublishBatch(batches ...batch, opts ...PubOpt) (*PubAck, error)

Who Benefits From The Change(s)?

Alternative Approaches

derekcollison commented 3 years ago

For higher speed publishing you can do async publishing like the following in Go. Other clients are adding as well.

for i := 1; i <= 1_000_000; i++ {
    js.PublishAsync(fmt.Sprintf("orders.%d", i), []byte("OK"))
}
<-js.PublishAsyncComplete()
derekcollison commented 3 years ago

For full TX semantics which proper batch would require, would need quite a bit of work.

There may be ways to explore that could accomplish these goals with the system today.

tpihl commented 3 years ago

I think this is a bit dangerous. The semantics where a publish to stream can validate that no new messages was persisted in between, may already create issues in busy streams. If we’d try to make multi msg transactions we would add a lot of possibility for dead locking and similar.

I would rather receive an order and publish the whole order as one msg on a stream. If I then need it to be split up into multiple streams, I’d have a listener on the whole-order-stream that would (with dedup/idempotency) publish each part to their own topics as per the original request above and then ack the whole-order-msg. Maybe even publish a ‘order processed’ msg in its own topic if there may be consumers that cannot start until all parts is written.

this would, as I understand it, do the same thing while not requiring any transaction (and is also fairly easy to scale)

just my .02 euro

T

sloveridge commented 3 years ago

The comment on preventing new messages before the batch is processed and how that might impact a busy stream is something I hadn't fully considered.

Currently I am implementing the following:

  1. Each part is published with a header containing the order id
  2. Downstream systems receive without ack their parts from separate subjects grouping by the header order id (extending Ackwait via AckProgress)
  3. Once the entire order is successfully pushed to the stream an order.created event is published signalling the downstream systems to process the parts, and ack them.
  4. If order sending interrupted order.deleted sent when reconnected and client system receives error for the order submission
  5. Downstream systems clean up the waiting parts on order.deleted received
AqlaSolutions commented 2 years ago

We need this

derekcollison commented 2 years ago

We need this

Need what?

AqlaSolutions commented 2 years ago

We need this

Need what?

This is a feature request, right?

derekcollison commented 2 years ago

Yes but I am asking what specific parts? Batch, local TX, cross asset TX, etc..

AqlaSolutions commented 2 years ago

Batch sending multiple messages. They might be for the same subject or different subjects in a one stream. It's important to have all-or-nothing behavior so that either all messages in a batch are sent or none. It doesn't matter whether they can be interleaved with other messages sent at the same time.

AqlaSolutions commented 2 years ago

This is our usage case

Additionally in the case that the upstream application dies mid publish it may be difficult to determine that the order was only partially published. Effectively the rollback can be handled via distributed transactions however these can be quite complex to implement successfully and it is much simpler to catch it on the initial publish.

AqlaSolutions commented 2 years ago

Also we already have to use this approach for better performance

An application could publish all the line items as a single "batch" message however this removes the ability to direct traffic to consumers based on subject, removing a core benefit of Jetstream in this scenario.

Having a built-in batch functionality would allow us to use multiple subjects without loosing performance.

derekcollison commented 2 years ago

For performance we have async publish which async waits for the pubAcks, might want to try that for better perf if that is a concern since normal js Publish is sync RPC by default.

AqlaSolutions commented 2 years ago

We can't use async publish because ordering of messages from a same producer is important. Messages order in the same batch should be preserved (of course only if they belong to a same stream). Also ordering of multiple batches sent from a same producer should be preserved too. However, other producer might insert its messages anywhere in between which is OK.

derekcollison commented 2 years ago

Async publishes preserve order.

AqlaSolutions commented 2 years ago

I'm not sure about that. If you call send on a TCP socket with async io in .NET is there a guarantee that another send call immediately after this will be delivered after the first buffer, not before?

derekcollison commented 2 years ago

Async in this case means we do not wait in place for the PubAck but track them and process them asynchronously. We can limit how many can be allowed to be outstanding.

tpihl commented 2 years ago

If the order is only important per subject, sender (and batch), why not send all that data in same message? Sort and group by subject and then send a msg that will either be sent or not.

Combined with a batch message containing all expected msgs to be sent, where this msg could be acked/removed when all parts was acked by the relevant stream(s) should give same behavior lock free. The sender will, when starting, re-receive full batch msgs, an can read streams to validate if each parts is there or still need to be published.

This could be parallel for the happy-flow if it’s time critical while offering eventually consistent guarantee with minimal scaling limitations

AqlaSolutions commented 2 years ago
  1. The order is important per stream but there could be different subjects in the same stream.
  2. I can't send data to the same subject because I have different consumers which should not receive things they don't need. Filtering on their side is not an option because the traffic is very noticeable and such cpu usage is not acceptable.
tpihl commented 2 years ago

Thats not what I meant.

don’t filter on their side. But if order only is relevant per subject, publish one msg per subject with all msgs for that subject in the right order inside

tpihl commented 2 years ago

Maybe if you give concrete example?

AqlaSolutions commented 2 years ago

publish one msg per subject with all msgs for that subject in the right order inside

@tpihl If I do it this way I can't guarantee that multiple related messages are sent to their subjects. It's possible that app dies after sending to one subject and before sending to another. But I need the all-or-nothing semantics.

Imagine you transfer 20 USD to your friend Bob. Your balance and Bob balance are processed on different nodes. The system sends to one node (subject 1): "subtract 20 USD from tpihl", and to another node (subject 2) - "add 20 USD to Bob". If it dies in between then Bob doesn't receive your money though it's subtracted from your balance. If each node listens to both subjects it causes too big resource usage (imagine 100 nodes filtering their related messages from a common stream).

Instead all related messages could be sent to NATS as a single batch, even messages with different subjects.

tpihl commented 2 years ago

What I tried to explain before is that your case should be one event (the transfer) with a from, to and amount

One optimization I’m assuming you do is to keep some kind of persistent balance. And that’s reasonable, even banks and bookkeeping does it, but it can always we recreated by rereading all events (the balance is a snapshot in time, and when all events is read, that time is now)

to make my example something more similar to yours, imagine we have different banks and the banks cannot, should not read each other’s stream so we must make multiple writes;

well, start just as above with one event. My bank records that event and when aggregating my transactions will find the -$20. The “to” is an external entity in another bank so currently those $20 is in my banks deposit for Bobs bank.

my bank would then run a process subscribing to transactions from/to Bobs bank, and receiving my transfer it would write a new message in Bobs banks stream with from: mybank.tpihl and to Bob with $20. So whatever holds a snapshot of Bobs account will now receive this msg and Bob can spend his money.

No transaction, no messy locks, just three processes listening to each other. And if one is down, things are delayed a bit, that’s all. Each of the processes must be idempotent and handle ack and other things, so not simpler or easier than a database transaction, just very different.

But that’s the cost of asynchronous and loosely coupled eventbased solutions.

AqlaSolutions commented 2 years ago

@tpihl thanks, that might work. Anyway, if I'm forced to batch messages myself it's harder to use some JetStream features like deduplication (because there are multiple logical messages in a single NATS messages) and manual acks (because I have to ack the whole batch instead of a single logical message). And it's impossible to publish to multiple subjects atomically - yes, I can workaround it with "chaining" like in your solution but in some cases it would be much easier (and have better latency) to just publish into multiple subjects in a way that consumers don't need to know anything about each other.

I think this feature is still necessary.

tpihl commented 2 years ago

Just a quesrion; if it’s the same work needed to be done to support your use case, why do you think latency will be smaller if it’s solved inside nats jetstream?

I’m more afraid that make Ing this complex behavior info the standard jet stream will also make all simple operations slower since they will have to take transactions info account. And since I have 99% simple operations I myself prefer those to be simple and fast while accepting that I have to do the few complex things myself.

I fear that we’re diving into feature complexity hell a small step at the time. My view of jetstreams edge over more complex engines is the simplicity. Less simplicity, maybe better to use Kafka instead

AqlaSolutions commented 2 years ago

@tpihl Latency is smaller because processing is started by two banks simultaneously. But in your approach the message is sent to the second bank only after the first bank receives it.

tpihl commented 2 years ago

You mean the latency induced by 2 steps vs latency induced by locking, committing, rollbacks and similar?

Ofc you’re right. If we only consider one msg with two destination subjects, this is slower, since there is a publish, a subscribe reception, two publish and one ack involved before the real consumers subscriptions will get their own msgs

with transaction it would just be two publish and one commit with either error or ack

But on the server side we now must hold a lot more stuff in memory, and since we’re in “generic world” we must now do ACID. (Ref https://en.wikipedia.org/wiki/ACID).

Another way for you could be to write your stuff to a acid(sql) database and then read it to send messages. Because that’s where we might end up if we continue to add complexity here.

I think the checks already in place to fail publish to stream based on expected last message is great. And async publish is also great to optimize when there is no need for a rollback of the writes that did succeed. My maybe humbler opinion is that more complex cases either should be decomposed (as I tried to make examples for) or solved outside the messaging/streaming solution

/T

AqlaSolutions commented 2 years ago

In our system there is no need to do rollbacks and locking. The latency of send -> receive on bank1-> send to subject 2 -> receive on bank2 is higher then send (2 messages, atomic) -> simultaneously receive on bank1 and bank2. In the second example bank2 doesn't need to wait for bank1 to receive the message.

tpihl commented 2 years ago

So then you don’t need to batch send, just send them async. The rollback/lock was for the ‘all or nothing”, and if that’s not needed, no issue. Or do a saga pattern

bruth commented 2 years ago

No transaction, no messy locks, just three processes listening to each other. And if one is down, things are delayed a bit, that’s all. Each of the processes must be idempotent and handle ack and other things, so not simpler or easier than a database transaction, just very different.

@tpihl This is something that I have been going back and forth on whether to design for the expectation that multiple events could be returned from, say, a command handler, or model it such as there is always the primary event that is produced and written to a stream and then there is simply (not necessarily simple) listeners to react to that event and publish follow-up events that may be relevant.

It sort of always bothered me that a single command could result in multiple events produced. It feels like a short circuit of event X being produced that would lead to the condition of event Y. For example, a command to move a chess piece that would result in checkmate. One could model this as the "piece-moved" and then an assert of the move to determine "checkmate" or "game-finished". Instead the command results in the moved pieced, and the evaluation post-move determines the state of the game and any implicit result from that.

tpihl commented 2 years ago

I absolutely agree that transactions, as well as exactly once, could simplify some scenarios, but at what cost? Because the more complexity you add, the more corner-cases will appear.

Moving a check-piece should not be considered an event, because the move will be undone if it was illegal (moving into check) and since we talk cqrs;

why does everyone assume that there is any correlation between a command and a “main” event derived from that command? I’ve seen frameworks on GitHub that assume an event is the command.

a command is a wish for something, right. It may result in an event stating it succeeded or that some entity properties changed. Or many events over time (the usual example is travelagency and travel with taxi, hotel, flight and future cancellations and so on). Or an event stating it failed. And possibly several commands to other services. There is no sync to real events, they happened, they are facts and not conditional on anything else. If multiple events happened, they should be documented as they happen (I.e. not based on return value).

I think I need another example to understand your scenario @bruth

bruth commented 2 years ago

@tpihl I agree with what you are saying! Re-reading what I wrote it may have come across that I was challenging what you were saying.

Moving a check-piece should not be considered an event, because the move will be undone if it was illegal (moving into check)

Yes that is correct, a command would come "move-piece" which would be validated against the state, and only if legal, the event produced to represent the state transition.

tpihl commented 2 years ago

You’re right, my bad/T

AqlaSolutions commented 2 years ago

Hi, we migrated to JetStream from STAN and still need batch publish.

Use case

Types of messages:

We want to publish them to stream in a correct order (Placed -> Balance -> Executed). Some of our consumers need to subscribe only to Executed events due to performance reasons. But others want to receive everything in the same order as it happened. To do this we have 1 stream with 3 subjects: trading.placed, trading.balance, trading.executed. So one can subscribe just to trading.executed only. On publisher side we don't use PublishAsync because it causes too high CPU usage and latency on NATS server. Instead we aggregate these events in a batch and publish 1000 per Publish call. Now we are forced to send series of orders to trading.placed, then to trading.balance and then to trading.executed like this:

Instead we want to be able to send a batch with multiple subjects inside mixed:

derekcollison commented 2 years ago

If using the Go client you can use async publish and wait on a batch..

https://github.com/nats-io/nats.go/blob/main/js_test.go#L437

AqlaSolutions commented 2 years ago

But they are still sent in separate TCP packets on each call to Publish, aren't they? So it doesn't change the amount of messages that NATS needs to receive and process and it won't lower CPU load on NATS side.

We use C# btw.

derekcollison commented 2 years ago

It will lower CPU, number of messages / packets is same but the client will batch them up.

AqlaSolutions commented 2 years ago

I looked into the GO client source code and I don't see anything like flushing in the final method. It looks like it just returns a thing which can be used to wait for queued messages to be sent.

derekcollison commented 2 years ago

In the link I referenced above you will notice it waits on js.PublishAsyncComplete()

AqlaSolutions commented 2 years ago

Yeah, and I looked inside PublishAsyncComplete

derekcollison commented 2 years ago

That only returns ok if all pub acks have been received with no errors, which means all messages have been processed and stored by the NATS system.

kakserpom commented 1 year ago

I need this. Transactional batch publishing and acknowledging, namely. I like NATS JetStream a lot better than Kafka, but I am forced to use Kafka because I need this all-or-nothing behavior.

My use case: I have a stateful consumer that consumes a single input channel in batches, does some in-memory stuff and publishes a number of messages into a number of output channels (and acknowledges messages in the input channel). Should the consumer fail at any given moment, the input messages must not be acknowledged if the output messages were not published and vice versa.

delaneyj commented 1 year ago

This is a well tread area in the CQRS space for those that want to dig deeper. The general answer is an aggregate is the transaction boundary. While I agree that having a per stream batch with all or nothing would be really beneficial, you can have proper behavior without it. But it sure is a lot simpler to model most problems if it's there 🤔. Again I'm not talking cross stream but to different subjects in the same stream.

chrissheridan-zh commented 1 year ago

I need this as well.

danyo1399 commented 10 months ago

This is an issue for using nats jetstream for event sourcing. A command can result in one or more events.

The batch transaction could be tied to stream + subject and stream + subject would probable make up an event sourced stream. Don’t know if that makes the task any easier

suikast42 commented 8 months ago

For higher speed publishing you can do async publishing like the following in Go. Other clients are adding as well.

for i := 1; i <= 1_000_000; i++ {
  js.PublishAsync(fmt.Sprintf("orders.%d", i), []byte("OK"))
}
<-js.PublishAsyncComplete()

Does nats gives a guraty of the order of the call js.PublishAsyncComplete() ?

Can nats for example ack the message 2 before it sends an ack for message 1 ?

Or can I happened that messge 1 - 10 for example acked 11-15 not and 16 .. acekd ?

ripienaar commented 8 months ago

Yeah I dont believe ordering is guaranteed - if you need ordering on publish you have to wait for acks and retry so your retry is in the correct order.

suikast42 commented 8 months ago

Yeah I dont believe ordering is guaranteed - if you need ordering on publish you have to wait for acks and retry so your retry is in the correct order.

I am currently working with the java API ays well. I am pushiing messages into nats over Grpc..

              CompletableFuture<PublishAck> publishAckCompletableFuture = js.publishAsync(msg);
                publishAckCompletableFuture.whenComplete((publishAck, throwable) -> {
                    try {
                        if (publishAck.hasError()) {
                            logger.error("Can't push message. publishAck.hasError {} ", publishAck.getError());
                            responseObserver.onNext(response(request, ReturnCode.INTERNAL_ERROR, publishAck.getError()));
                            return;
                        }
                        if (throwable != null) {
                            logger.error("Can't push message ", throwable);
                            responseObserver.onNext(response(request, ReturnCode.INTERNAL_ERROR, throwable.getMessage()));
                            return;
                        }
                        responseObserver.onNext(response(request, ReturnCode.OK, "Ok"));
                    } finally {
                        if (type == NatsManagement.PushType.SingleCall) {
                            responseObserver.onCompleted();
                        }
                    }

                });

So I am not waiting like in go with

<-js.PublishAsyncComplete()

My expectation is that nats should not send ack for 2 until 1 is delivered. If it's so I can set a marker on client site that set the last acked message and with the first failure ( nack or connection error ) I can cancel the delivery logic with the first error and restart from that position. ( I think the same should work with golang as well ). Sure the stream logic must handle possible duplicate handling.

Do you see any gaps there ?

ripienaar commented 8 months ago

The messages will trverse from client to jetstream in order - unless while publishing some event occurs like a leader election things can change slightly AND if one message do not get persisted due to transient error while others do you cannot retry that message in proper order.

You can use the Nats-Message-Id header for dedupe, but if you need ordering dont think about the happy case think about how to recover from failure to place messages in order

suikast42 commented 8 months ago

The messages will trverse from client to jetstream in order - unless while publishing some event occurs like a leader election things can change slightly AND if one message do not get persisted due to transient error while others do you cannot retry that message in proper order.

In that get I get an ack timeout error right ?

You can use the Nats-Message-Id header for dedupe I know about Nats-Message-Id. But that works only for a given time window right?

but if you need ordering dont think about the happy case think about how to recover from failure to place messages in order

My approch is as flowing.

  1. Some clients who publishes rarely events can use a sync publish. That's slow but safe ordering right?

  2. High frequently producers collects their events in a local SQLlite db and a worker on clinet site push this events in order. Every batch send will abbort with the first error ( Nack or an ack timeout). Then it will restart the back withg the last not acked message. The worst thing what I can see here is a duplicate delivery. Did I miss something?

ripienaar commented 8 months ago

A batch sends, lets say, 100 messages. It can be that message 50-55 fail but all 100 was sent and so most were persisted.

You cant retry 50-55 and get them in order again, you cant stop and start again at 50 cos 56+ got saved.