redpanda-data / redpanda

Redpanda is a streaming data platform for developers. Kafka API compatible. 10x faster. No ZooKeeper. No JVM!
https://redpanda.com
9.62k stars 586 forks source link

Could not find a coordinator with type TRANSACTION in testcontainers testcase #6771

Open rrva opened 2 years ago

rrva commented 2 years ago

Version & Environment

Redpanda version: v22.2.1 Testcontainers version: 1.17.5

What went wrong?

When running a testcontainers based junit test which involves transactional producers, swapping out the kafka container for a redpanda container makes the test fail:

Could not find a coordinator with type TRANSACTION with key foo due to unexpected error: null
org.apache.kafka.common.KafkaException: Could not find a coordinator with type TRANSACTION with key foo due to unexpected error: null
    at org.apache.kafka.clients.producer.internals.TransactionManager$FindCoordinatorHandler.handleResponse(TransactionManager.java:1592)
    at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)

What should have happened instead?

The testcase should pass as it does with confluentinc/cp-kafka:6.2.1

How to reproduce the issue?

  1. Create a testcontainers testcase that depends on redpanda
RedpandaContainer kafka = new RedpandaContainer("docker.redpanda.com/vectorized/redpanda:v22.2.1");

kafka.start();
  1. Create a transactional producer using the kafka java API (org.apache.kafka:kafka-clients:3.2.1)
class TransactionalProducer(
    servers: String,
    userName: String?,
    password: String?,
    trustStore: String?,
    trustStorePassword: String?,
    private val topic: String,
) {
    private val log = LoggerFactory.getLogger(this::class.java)

    private val configs = mutableMapOf(
        "bootstrap.servers" to servers,
        "compression.type" to "zstd",
        "linger.ms" to "0",
        "acks" to "all",
        "transactional.id" to "foo"
    )

    init {
        configurePasswords(servers, configs, userName, password, trustStore, trustStorePassword)
        log.info("Kafka config {}", configs)
    }

    private val kafkaProducer: KafkaProducer<String, ByteArray> = KafkaProducer(
        configs.toMap(),
        StringSerializer(),
        ByteArraySerializer()
    ).also {
        it.initTransactions()
    }

    fun beginTransaction() {
        kafkaProducer.beginTransaction()
    }

    fun abortTransaction() {
        kafkaProducer.abortTransaction()
    }

    fun send(id: String?, msg: ByteArray?): Future<RecordMetadata>? {
        return kafkaProducer.send(
            ProducerRecord(topic, id, msg)
        ) { _, e ->
            if (e != null) {
                log.error(e.message, e)
            }
        }
    }

    fun commitTransaction() {
        kafkaProducer.commitTransaction()
    }

}
  1. Initialise a transactional producer

Additional information

testcase logs

06:43:03.418 │      Test worker INFO  ?.r.c.2.1]                │ Container docker.redpanda.com/vectorized/redpanda:v22.2.1 is starting: 60f3424c56c5d204d49b5099d2dc3328ba16ad54f09592fa9515fe89de4ffb11
06:43:04.506 │      Test worker INFO  ?.r.c.2.1]                │ Container docker.redpanda.com/vectorized/redpanda:v22.2.1 started in PT1.297546S
06:43:14.621 │      Test worker INFO  s.k.kt                    │ Local kafka detected, skipping auth setup
06:43:14.622 │      Test worker INFO  v.s.TransactionalProducer │ Kafka config {bootstrap.servers=PLAINTEXT://localhost:61739, compression.type=zstd, linger.ms=0, acks=all, transactional.id=foo}
06:43:14.675 │      Test worker INFO  o.a.k.c.p.KafkaProducer   │ [Producer clientId=producer-foo, transactionalId=foo] Instantiated a transactional producer.
06:43:14.694 │      Test worker INFO  o.a.k.c.u.AppInfoParser   │ Kafka version: 3.2.1
06:43:14.694 │      Test worker INFO  o.a.k.c.u.AppInfoParser   │ Kafka commitId: b172a0a94f4ebb9f
06:43:14.694 │      Test worker INFO  o.a.k.c.u.AppInfoParser   │ Kafka startTimeMs: 1665722594693
06:43:14.697 │      Test worker INFO  .c.p.i.TransactionManager │ [Producer clientId=producer-foo, transactionalId=foo] Invoking InitProducerId for the first time in order to acquire a producer ID
06:43:14.823 │     producer-foo INFO  o.a.k.clients.Metadata    │ [Producer clientId=producer-foo, transactionalId=foo] Cluster ID: redpanda.015a92df-6c84-4e95-a88a-33d3d43bd0f9
06:43:14.824 │     producer-foo INFO  .c.p.i.TransactionManager │ [Producer clientId=producer-foo, transactionalId=foo] Transiting to fatal error state due to org.apache.kafka.common.KafkaException: Could not find a coordinator with type TRANSACTION with key foo due to unexpected error: null

JIRA Link: CORE-1043

bharathv commented 2 years ago

One possible reason for this error is transactions being disabled on the server side. The version of Redpanda you are on doesn't enable transactions by default, can you try setting enable_transactions=true?

rrva commented 2 years ago

The easiest way would be to create a custom docker image with docker.redpanda.com/vectorized/redpanda:v22.2.1 as a base image.

What is the preferred way to patch the existing docker image? Do I put the config in /etc/redpanda/redpanda.yaml?

In my docker image it contains:

node_uuid: 1448f0a6-1e0b-11ed-b972-0242ac110002
redpanda:
    data_directory: /var/lib/redpanda/data
    node_id: 1
    seed_servers: []
    rpc_server:
        address: 0.0.0.0
        port: 33145
    kafka_api:
        - address: 0.0.0.0
          port: 9092
    admin:
        - address: 0.0.0.0
          port: 9644
    developer_mode: true
rpk:
    enable_usage_stats: true
    coredump_dir: /var/lib/redpanda/coredump
pandaproxy: {}
schema_registry: {}

so I guess I will add

enable_transactions: true under redpanda: ?

bharathv commented 2 years ago

that will work, rpk can do it for you.. you can do something like

RUN rpk redpanda config set redpanda.enable_transactions true before you trigger the entry point script.

https://docs.redpanda.com/docs/reference/rpk/rpk-redpanda/rpk-redpanda-config-set/

(I haven't tried it locally but let me know if you run into issues, can help debug).

emaxerrno commented 2 years ago

@rrva - Nov 15th release will have transactions enabled by default.