apache / pulsar-adapters

Apache Pulsar Adapters
https://pulsar.apache.org/
24 stars 31 forks source link

PulsarKafkaProducer/Consumer do not set required configuration properties. #27

Open afedulov opened 3 years ago

afedulov commented 3 years ago

Describe the bug

 new PulsarKafkaProducer<>(
    props, new StringSerializer(), new StringSerializer<>());

fails with

org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.

Similarly, PulsarKafkaConsumer fails with missing key.deserializer/value.serializer.

To Reproduce

import java.util.Properties;
import org.apache.kafka.clients.producer.PulsarKafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;

public class ReproducePulsarBug {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:49377");
//  Only works if below two lines are commented in:
//    props.put("key.serializer", StringSerializer.class.getName());
//    props.put("value.serializer", StringSerializer.class.getName());
    new PulsarKafkaProducer<>(
        props, new StringSerializer(), new StringSerializer());
  }
}

Additional context

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-kafka-original</artifactId>
    <version>2.8.0</version>
</dependency>