streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
452 stars 137 forks source link

[BUG] Pulsar Consumer can't read message from Kafka Client. #1336

Open rillo-carrillo opened 2 years ago

rillo-carrillo commented 2 years ago

Describe the bug We have a Pulsar cluster with KOP enabled.

When a Pulsar Producer sends messages to a topic used by a Kafka consumer, Kafka's consumer is able to receive and read the message.

When a Kafka Producer sends messages to a topic used by a Pulsar consumer, Pulsar's consumer is not able to properly read the message.

To Reproduce Steps to reproduce the behavior:

  1. Start a kafka producer to send message to $topic
  2. Start a pulsar consumer to receive message from $topic
  3. Pulsar consumer will show error:

Some required fields are missing at org.apache.pulsar.common.api.proto.SingleMessageMetadata.checkRequiredFields(SingleMessageMetadata.java:478) ~[org.apache.pulsar-pulsar-common-2.9.1.jar:2.9.1] at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:473) ~[org.apache.pulsar-pulsar-common-2.9.1.jar:2.9.1] at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1666) ~[org.apache.pulsar-pulsar-common-2.9.1.jar:2.9.1] at org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1027) ~[org.apache.pulsar-pulsar-client-original-2.9.1.jar:2.9.1] ... 32 more 2022-06-07T19:47:49,411+0000 [pulsar-client-io-1-1] ERROR org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/angeltestpartition-partition-0][cli-sub] Discarding corrupted message at 1466:2

Expected behavior Be able to produce messages from Kafka clients and consumed on Pulsar consumers.

BewareMyPower commented 2 years ago

What's your entryFormat config? If you configured entryFormat=kafka, you need to configure a KafkaPayloadProcessor to handle Kafka format messages at the Pulsar consumer side. See https://github.com/streamnative/kop/blob/master/docs/configuration.md#choose-the-proper-entryformat.

rillo-carrillo commented 2 years ago

The property is set to: entryFormat=mixed_kafka.

Is there any additional configuration that we need to do on Pulsar consumers to successfully read messages published by Kafka producer?

BewareMyPower commented 2 years ago

No, you only need the KafkaPayloadProcessor.