apache / pulsar-adapters

Apache Pulsar Adapters
https://pulsar.apache.org/
24 stars 31 forks source link

DeadLetterPolicy in Apache Pulsar Storm Adapter Not implemented #53

Open karunamgoyal opened 1 year ago

karunamgoyal commented 1 year ago

Describe the bug v2.11.0 There is no way to negatively acknowledge the consumer and the registry method for DeadLetterPolicy is broken

ConsumerConfigurationData<byte[]> subscriptionConfig = new ConsumerConfigurationData<>(); subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); subscriptionConfig.setSubscriptionType(SubscriptionType.Shared); subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder() .deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build()); PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration, ((ClientBuilderImpl) createBuilder(viestiSourceConfig)) .getClientConfigurationData() .clone(), subscriptionConfig);

This above code doesnt stick while creating PulsarSpout.

` static class SpoutConsumer implements PulsarSpoutConsumer { private Consumer<byte[]> consumer;

    public SpoutConsumer(Consumer<byte[]> consumer) {
        this.consumer = consumer;
    }

    public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
        return this.consumer.receive(timeout, unit);
    }

    public void acknowledgeAsync(Message<?> msg) {
        this.consumer.acknowledgeAsync(msg);
    }

    public void close() throws PulsarClientException {
        this.consumer.close();
    }

    public void unsubscribe() throws PulsarClientException {
        this.consumer.unsubscribe();
    }
}

`

Also there is no Mechanism to negativelyAcknowledge a message.

Expected behavior

While Setting DeadletterPolicy It should not drop it while serialising. Negative Acks support should be there

Desktop (please complete the following information): MacOs Ventura 13.4.1 java version "1.8.0_333" Java(TM) SE Runtime Environment (build 1.8.0_333-b02) Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode) Apache Storm 2.2.1

Trying to consume from Pulsar Topic in Apache Storm