BEagle1984 / silverback

Silverback is a simple but feature-rich message bus for .NET core (it currently supports Kafka, RabbitMQ and MQTT).
https://silverback-messaging.net
MIT License
260 stars 38 forks source link

Feature: Support for transactions #109

Open msallin opened 4 years ago

msallin commented 4 years ago

Let's discuss how we could build support for Kafka-Transactions in Silverback. An example how to do this with the .NET Kafka lib is shown here: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Transactions/Program.cs

BEagle1984 commented 4 years ago

This is for sure a cool topic. It's something I had a look in the past already, but I never really tried to integrate.

Please note that the whole point of the transactions is to implement the exactly once processing. This can be achieved already with Silverback, storing the offsets into the database and using the outbox table (DbDeferredOutboundConnector). This mechanism by the way has a broader scope, since the database transaction automatically includes the other side effects applied to the data and is therefore useful in other use cases as well (not only when the data are streamed from topic to topic, but also when the data in the database of your (micro)service is modified because of an incoming message.

The producer transaction at the end of the day does two things: it ensure that all produced messages are either committed or discarded and that the consumed offset is committed as part of the same transaction. As said, both goals are achieved by Silverback already, but using the database.

That being said, using the native transactions is far more optimized and will lead to a much greater throughput.

I think that a possible approach would be to rely on an ambient transaction.

using (var scope = new TransactionScope())
{
    await _publisher.PublishAsync(...);
    await _publisher.PublishAsync(...);
    await _publisher.PublishAsync(...);
}

The tricky part here is that in Silverback we cannot assume that this peace of code will never be executed concurrently and we need therefore to create a sort of pool of producers to handle each transaction. (I guess...) Enlisting into the ambient transaction shouldn't be tremendously difficult and all we have to do is commit or rollback the partecipating producers transactions.

To include the consumer into the transaction it should be possible to add an option to the inbound endpoint, like RequireTransaction = true, that would just lead to a TransactionScope being instantiated into the consumer pipeline.

Here we start with some assumptions. I think that in order to join multiple producers into the same transaction, all it's needed is to set the same transactional.id. I also think that at this point any of the producers can be used to link the consumer offsets to the transaction (since at the end of the day it is just one big transaction).

It all looks very straightforward but it for sure isn't that easy and I'm missing a ton of details.

BEagle1984 commented 4 years ago

I forgot to say that if we go down the ambient transaction route, we get the big bonus of having the database transactions being automatically enlisted into the same transaction.

The drawback is maybe the API that is a bit weird if you are not using a database (with isolation levels etc., that don't have any meaning in this context).

msallin commented 4 years ago

Please note that the whole point of the transactions is to implement the exactly once processing. This can be achieved already with Silverback, storing the offsets into the database and using the outbox table (DbDeferredOutboundConnector).

Our problem isn't the exactly-once but the atomicity part. In our use case we have to garantuee that messages are only visible to the consumer in batch (n messages in topic 1 and one message in topic 2).

The outbox pattern addresses the problem of combining database transactions and message delivery garantuee [1]. However, it does not enable you to be transactional over topics and partitions. i.e. the producer sends a batch of messages to multiple partitions such that either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers. [2]

I think that a possible approach would be to rely on an ambient transaction.

This looks very good IMHO. It also means that we don't have to break the abstraction (because other message brokers maybe don't support transactions) but still have granular control.

we need therefore to create a sort of pool of producers to handle each transaction.

Currently there is one producer per endpoint, right? We might raise the question if this is the thing we should do anyway. E.g. the following is stated by the KafkaProducer Java doc: "The producer is thread safe and should generally be shared among all threads for best performance."

How is this in .NET?

To include the consumer into the transaction it should be possible to add an option to the inbound endpoint, like RequireTransaction = true, that would just lead to a TransactionScope being instantiated into the consumer pipeline.

Very interesting. Do we have to think about nested TransactionScopes?

[1] https://microservices.io/patterns/data/transactional-outbox.html (09.09.2020) [2] https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/ (09.09.2020) [3] https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html (09.09.2020)

BEagle1984 commented 4 years ago

Our problem isn't the exactly-once but the atomicity part. In our use case we have to garantuee that messages are only visible to the consumer in batch (n messages in topic 1 and one message in topic 2).

The outbox pattern addresses the problem of combining database transactions and message delivery garantuee [1]. However, it does not enable you to be transactional over topics and partitions. i.e. the producer sends a batch of messages to multiple partitions such that either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers. [2]

This is definitely addressed by the outbox pattern. The messages are atomically written nto the outbox table and that gives you the guarantee that they will eventually be written to the topic. The background service taking care of the outbox queue will just retry over and over until it succedes.

Currently there is one producer per endpoint, right? We might raise the question if this is the thing we should do anyway. E.g. the following is stated by the KafkaProducer Java doc: "The producer is thread safe and should generally be shared among all threads for best performance."

How is this in .NET?

It's the same in .NET. It is even suggested to use a single KafkaProducer to produce into all topics, to allow librdkafka to properly batch and optimize the transmission. Note that each instance of the producer will require some connection to the broket, so this cannot scale indefinitely. [4][5] In Silverback a producer is created per each endpoint. This is done to keep things simple while allowing a different configuration per each endpoint.

This could actually create some issues with the transactions, where the transactional.id is a property in the KafkaProducer configuration, meaning that you need a new instance per each transaction. It's not generally an issue, but you must be careful with it, for example if you think about starting a transaction from an ApiController.

To include the consumer into the transaction it should be possible to add an option to the inbound endpoint, like RequireTransaction = true, that would just lead to a TransactionScope being instantiated into the consumer pipeline.

Very interesting. Do we have to think about nested TransactionScopes?

They can definitely be nested, I don't know what it means for us. I think the TransactionScope itself controls the creation of the ambient transaction and we just enlist. The TransactionScopeOption specifies whether you merge into the existing transaction, you create a new one or you ignore it, but all that is handled by the TransactionScope (I think!).

[4] https://github.com/confluentinc/confluent-kafka-dotnet/issues/868 [2] https://github.com/confluentinc/confluent-kafka-dotnet/issues/159

msallin commented 4 years ago

This is definitely addressed by the outbox pattern. The messages are atomically written nto the outbox table and that gives you the guarantee that they will eventually be written to the topic. The background service taking care of the outbox queue will just retry over and over until it succedes.

It does garantuee that eventuelly this will be but not as a transaction (eventual consistency isn't the same as atomicity even if the final result might be the same). We must not make messages in one topic visible to the consumer if the write into the other was not successful. In most cases it will be the same because the messags in the outbox queue can be processed but not in every situation.

allowing a different configuration per each endpoint.

Hmm, yes, thats true.

This could actually create some issues with the transactions, where the transactional.id is a property in the KafkaProducer configuration, meaning that you need a new instance per each transaction.

Really? This sounds wired. I've to digg a bit into how this transaction-stuff is implemented. Btw. we can also have a look at already existing java implementations: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

BEagle1984 commented 4 years ago

It does garantuee that eventuelly this will be but not as a transaction (eventual consistency isn't the same as atomicity even if the final result might be the same). We must not make messages in one topic visible to the consumer if the write into the other was not successful. In most cases it will be the same because the messags in the outbox queue can be processed but not in every situation.

Well, there are some edge cases, where the messages from the outbox table cannot be published anymore, but you pretty much have to get rid of the topic (or the outbound configuration in Silverback at least) for it to happen. I don't see much other instances. What could happen is that one message is delayed and this could be an issue in some very specific cases. Note that the outbox table processor its preserving the messages order across all topics, in its default configuration (it simply processes the messages in the outbound queue one after the other, starting from the older message). So the delay isn't normally a huge issue, since it isn't like one topic is flooded while unable to write to the other one. (I may be oversimplfying it and you for sure know your case better then me.)

Really? This sounds wired. I've to digg a bit into how this transaction-stuff is implemented. Btw. we can also have a look at already existing java implementations: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Yep, nice. They are doing pretty much what I described but it's worth having a closer look.

    private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
        if (txIdPrefix != null) {
            if (this.producerPerConsumerPartition) {
                return createTransactionalProducerForPartition(txIdPrefix);
            }
            else {
                return createTransactionalProducer(txIdPrefix);
            }
        }
        if (this.producerPerThread) {
            CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
            if (this.threadBoundProducerEpochs.get() == null) {
                this.threadBoundProducerEpochs.set(this.epoch.get());
            }
            if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
                closeThreadBoundProducer();
                tlProducer = null;
            }
            if (tlProducer == null) {
                tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
                        this.physicalCloseTimeout, this.beanName);
                for (Listener<K, V> listener : this.listeners) {
                    listener.producerAdded(tlProducer.clientId, tlProducer);
                }
                this.threadBoundProducers.set(tlProducer);
                this.threadBoundProducerEpochs.set(this.epoch.get());
            }
            return tlProducer;
        }
        synchronized (this) {
            if (this.producer == null) {
                this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
                        this.physicalCloseTimeout, this.beanName);
                this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
            }
            return this.producer;
        }
    }

    protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
        BlockingQueue<CloseSafeProducer<K, V>> queue = getCache(txIdPrefix);
        Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
        Producer<K, V> cachedProducer = queue.poll();
        if (cachedProducer == null) {
            return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
        }
        else {
            return cachedProducer;
        }
    }

    protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {
        String suffix = TransactionSupport.getTransactionIdSuffix();
        if (suffix == null) {
            return createTransactionalProducer(txIdPrefix);
        }
        else {
            synchronized (this.consumerProducers) {
                if (!this.consumerProducers.containsKey(suffix)) {
                    CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,
                            this::removeConsumerProducer);
                    this.consumerProducers.put(suffix, newProducer);
                    return newProducer;
                }
                else {
                    return this.consumerProducers.get(suffix);
                }
            }
        }
    }

If no transaction is involved, they use a single producer. Otherwise a producer is created per each transaction. Then there are some options to have a producer per thread (option that we don't have in Silverback) and a producer per each partition when transactions are involved (I guess because it's partition is consumed separately in differnet threads, this is also not supported in Silverback...yet).