confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
77 stars 866 forks source link

Messages dropping on the floor with transactional producer #1568

Open will118 opened 3 years ago

will118 commented 3 years ago

Description

If connectivity to the cluster is lost after a call to CommitTransaction, subsequent transactions will "succeed" although there is no connectivity to the broker.

I don't think this is the intended behaviour, my understand was we could ignore the delivery reports with a transactional producer (see https://github.com/edenhill/librdkafka/blob/5fa114ccab90b0a7640b2621bf3e88314d731b84/examples/transactions.c#L102-L109).

Based on the debug logs (see below), the "No partitions registered: not sending EndTxn" made me think of this change: https://github.com/edenhill/librdkafka/pull/3271 but I haven't investigated.

Killing connectivity at other points behaves fine (e.g. before committing).

Apologies in advance if there is something I haven't understood.

How to reproduce

This repros consistently for me (100%)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace TestProducer
{
    public class Producer
    {
        private const string MessageTopic = "important-numbers-5";
        private readonly IEnumerable<int> _numbersToSend = Enumerable.Range(0, Int32.MaxValue);
        private readonly IProducer<byte[], string> _kafkaProducer;

        public Producer()
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = "kafka:9092",
                SecurityProtocol = SecurityProtocol.Plaintext,
                SaslMechanism = SaslMechanism.Plain,
                TransactionalId =  "txd-id-54321",
                EnableIdempotence = true,
                TransactionTimeoutMs = 10 * 1000,
                MaxInFlight = 1,
                LingerMs = 2,
                MessageSendMaxRetries = Int32.MaxValue,
                QueueBufferingMaxKbytes = 100000,
                CompressionType = CompressionType.None,
                Debug = "broker,protocol,msg,eos",
                Acks = Acks.All,
                BrokerAddressFamily = BrokerAddressFamily.V4
            };

            void LogHandler(IProducer<byte[], string> producer, LogMessage logMessage) =>
                Console.WriteLine(logMessage.Message);

            _kafkaProducer = new ProducerBuilder<byte[], string>(producerConfig)
                .SetLogHandler(LogHandler)
                .Build();
        }

        public void Run()
        {
            _kafkaProducer.InitTransactions(TimeSpan.FromSeconds(10));
            _ = Task.Run(ProduceLoop);
        }

        private void ProduceLoop()
        {
            foreach (var msg in _numbersToSend)
            {
                Console.WriteLine($"Going to send: {msg}");
                _kafkaProducer.BeginTransaction();
                _kafkaProducer.Produce(MessageTopic, new Message<byte[], string>
                {
                    Key = null,
                    Value = msg.ToString(),
                });
                _kafkaProducer.CommitTransaction();
                Console.WriteLine("Successfully committed"); // <--- Here
                // 1. Breakpoint on the above line
                // 2. Kill connection to kafka
                // 3. Resume + remove breakpoint
                // 4. Watch subsequent messages being dropped on the floor every transaction.timeout.ms
            }
        }
    }
}

The way I have been simulating connectivity issues is with socat:

---
version: '3.4'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    environment:
      KAFKA_BROKER_ID: "991"
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      LOG4J_LOGGER_KAFKA: DEBUG
      LOG4J_LOGGER_ORG_APACHE_KAFKA: DEBUG

  socat:
    image: alpine/socat:latest
    ports:
      - "9092:9092"
    command: tcp-listen:9092,fork,reuseaddr tcp:kafka:9092

and then docker-compose -f docker-compose.kafka.yml kill socat (and 127.0.0.1 kafka in my hosts file)

This is the output from kafkacat -f '%o %s', the gap you can see is where kafka was down. image

The librdkafka logs look like this when "successfully" committing:

[thrd:main]: Cluster connection already in progress: refresh unavailable topics
[thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
[thrd:main]: Cluster connection already in progress: acquire ProducerID
[thrd:kafka:9092/bootstrap]: kafka:9092/991: Connect to ipv4#127.0.0.1:9092 failed: Unknown error (after 2053ms in state CONNECT) (_TRANSPORT): identical to last error: error log suppressed
[thrd:kafka:9092/bootstrap]: kafka:9092/991: Broker changed state CONNECT -> DOWN
[thrd:kafka:9092/bootstrap]: kafka:9092/991: Broker changed state DOWN -> INIT
[thrd:main]: No brokers available for Transactions (2 broker(s) known)
[thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (2 broker(s) known)
[thrd:main]: kafka:9092/991: Selected for cluster connection: acquire ProducerID (broker has 5 connection attempt(s))
[thrd:main]: No brokers available for Transactions (2 broker(s) known)
[thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (2 broker(s) known)
[thrd:kafka:9092/bootstrap]: kafka:9092/991: Received CONNECT op
[thrd:kafka:9092/bootstrap]: kafka:9092/991: Broker changed state INIT -> TRY_CONNECT
[thrd:kafka:9092/bootstrap]: kafka:9092/991: broker in state TRY_CONNECT connecting
[thrd:kafka:9092/bootstrap]: kafka:9092/991: Broker changed state TRY_CONNECT -> CONNECT
[thrd:kafka:9092/bootstrap]: kafka:9092/991: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 2056
[thrd:main]: Cluster connection already in progress: refresh unavailable topics
[thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
[thrd:main]: Cluster connection already in progress: acquire ProducerID
[thrd:main]: No brokers available for Transactions (2 broker(s) known)
[thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (2 broker(s) known)
[thrd:kafka:9092/bootstrap]: kafka:9092/991: important-numbers-5 [0]: timed out 0+1 message(s) (MsgId 312..312): message.timeout.ms exceeded
[thrd:kafka:9092/bootstrap]: Beginning partition drain for PID{Id:1,Epoch:12} reset for 0 partition(s) with in-flight requests: 1 message(s) timed out on important-numbers-5 [0]
[thrd:kafka:9092/bootstrap]: Idempotent producer state change Assigned -> DrainReset
[thrd:kafka:9092/bootstrap]: All partitions drained
[thrd:kafka:9092/bootstrap]: Idempotent producer state change DrainReset -> RequestPID
[thrd:kafka:9092/bootstrap]: Starting PID FSM timer (fire immediately): Drain done
[thrd:main]: Idempotent producer state change RequestPID -> WaitTransport
[thrd:main]: Starting PID FSM timer: No broker available
[thrd:main]: Cluster connection already in progress: acquire ProducerID
[thrd:main]: No brokers available for Transactions (2 broker(s) known)
[thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (2 broker(s) known)
[thrd:app]: Transaction commit message flush complete
[thrd:app]: Transactional API called: commit_transaction
[thrd:main]: No partitions registered: not sending EndTxn
[thrd:main]: Transaction state change BeginCommit -> CommittingTransaction
[thrd:main]: Transaction successfully committed
[thrd:main]: Transaction state change CommittingTransaction -> Ready

Checklist

Please provide the following information:

mhowlett commented 3 years ago

@edenhill

edenhill commented 3 years ago

Thanks for a great report! Will investigate next week

edenhill commented 3 years ago

I believe this check: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_txnmgr.c#L2367

needs to go before this check: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_txnmgr.c#L2346

Will add a test-case to verify this is indeed the problem, but it seems very likely.

edenhill commented 3 years ago

Side note on your config:

                EnableIdempotence = true,  <-- enabled by default when transactional.id is set.
                MaxInFlight = 1,  <-- no need for this, it just slows down throughput. The idempotent producer has a window of 5 in-flight requests.
                MessageSendMaxRetries = Int32.MaxValue,  <-- this is already the default for the idempotent producer
                Acks = Acks.All,  <-- this is already the default
will118 commented 3 years ago

Yeah, some of them are indeed unnecessary - we are using MaxInFlight = 5 in our app.

Thanks for looking into the issue, what you've said makes sense based on my limited familiarity.

Looking forward to the next release!