MeltwaterArchive / dropwizard-extra

A set of miscellaneous and common Dropwizard utilities
109 stars 45 forks source link

build method on KafkaProducerFactory does not accept DefaultPartitioner as the argument #33

Open ramv opened 9 years ago

ramv commented 9 years ago

DefaultPartitioner or SimplePartitioner which implement Partitioner interface cannot be passed to KafkaProducerFactory's build methods. The argument definition is final Class<Partitioner> partitioner but should be final Class<? extends Partitioner> partitioner

Currently the definition of the build methods are the following

public <K, V> KafkaProducer<K, V> build(final Class<? extends Encoder<K>> keyEncoder,
                                       final Class<? extends Encoder<V>> messageEncoder,
                                       final Class<Partitioner> partitioner,
                                       final Environment environment,
                                       final String name) {
        final Producer<K, V> producer = build(keyEncoder, messageEncoder, partitioner, name);
        environment.lifecycle().manage(new ManagedProducer(producer));
        return new InstrumentedProducer<>(
                producer,
                environment.metrics(),
                name);
    }

    public <K, V> Producer<K, V> build(final Class<? extends Encoder<K>> keyEncoder,
                                       final Class<? extends Encoder<V>> messageEncoder,
                                       final Class<Partitioner> partitioner,
                                       final String name) {
        return new Producer<>(
                toProducerConfig(this, messageEncoder, keyEncoder, partitioner, name));
    }

    static <K, V> ProducerConfig toProducerConfig(final KafkaProducerFactory factory,
                                                  final Class<? extends Encoder<V>> messageEncoder,
                                                  final Class<? extends Encoder<K>> keyEncoder,
                                                  final Class<Partitioner> partitioner,
                                                  final String name) {
        final Properties properties = new Properties();

       ....
    }

They should be

public <K, V> KafkaProducer<K, V> build(final Class<? extends Encoder<K>> keyEncoder,
                                       final Class<? extends Encoder<V>> messageEncoder,
                                       final Class<? extends Partitioner> partitioner,
                                       final Environment environment,
                                       final String name) {
        final Producer<K, V> producer = build(keyEncoder, messageEncoder, partitioner, name);
        environment.lifecycle().manage(new ManagedProducer(producer));
        return new InstrumentedProducer<>(
                producer,
                environment.metrics(),
                name);
    }

    public <K, V> Producer<K, V> build(final Class<? extends Encoder<K>> keyEncoder,
                                       final Class<? extends Encoder<V>> messageEncoder,
                                       final Class<? extends Partitioner> partitioner,
                                       final String name) {
        return new Producer<>(
                toProducerConfig(this, messageEncoder, keyEncoder, partitioner, name));
    }

    static <K, V> ProducerConfig toProducerConfig(final KafkaProducerFactory factory,
                                                  final Class<? extends Encoder<V>> messageEncoder,
                                                  final Class<? extends Encoder<K>> keyEncoder,
                                                  final Class<? extends Partitioner> partitioner,
                                                  final String name) {
        final Properties properties = new Properties();

       ....
    }