streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
449 stars 133 forks source link

[BUG] Broker with transactionCoordinatorEnabled property as True has error. #1355

Open rillo-carrillo opened 2 years ago

rillo-carrillo commented 2 years ago

Describe the bug There is an error constantly coming up on the brokers logs Based on the description Transaction Coordinator property must be enabled. Broker config contains the value of property transactionCoordinatorEnabled=true. But error keeps coming.

2022-06-22T21:23:07,078+0000 [pulsar-io-5-4] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - error while handle command:
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
    at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.throwIfTransactionCoordinatorDisabled(KafkaRequestHandler.java:2478) ~[ZTCCJ6BUXIIL7zuZcqCOWA/:?]
    at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.getTransactionCoordinator(KafkaRequestHandler.java:275) ~[ZTCCJ6BUXIIL7zuZcqCOWA/:?]
    at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.handleInitProducerId(KafkaRequestHandler.java:1907) ~[ZTCCJ6BUXIIL7zuZcqCOWA/:?]
    at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.channelRead(KafkaCommandDecoder.java:319) [ZTCCJ6BUXIIL7zuZcqCOWA/:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [io.netty-netty-handler-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
    at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]

To Reproduce Steps to reproduce the behavior:

  1. Start broker with KOP

Expected behavior Do not see this error if we have property as expected.

Screenshots If applicable, add screenshots to help explain your problem.

Additional context Pulsar version 2.9.1 KOP version 2.9.2.17

Demogorgon314 commented 2 years ago

You need set the following configuration to enable the KoP transaction.

brokerDeduplicationEnabled=true
kafkaTransactionCoordinatorEnabled=true