line / decaton

High throughput asynchronous task processing on Apache Kafka
Apache License 2.0
336 stars 51 forks source link

Waiting indefinitely if closing a unstarted ProcessorSubscription #115

Closed mokejp closed 3 years ago

mokejp commented 3 years ago

close() will wait indefinitely if closing a ProcessorSubscription without calling start(). This symptom occurs from version 1.1.0.

If I don't need to start it, I can implement it so that it doesn't create a ProcessorSubscription, but I thought this symptom was strange.

Here is the simple reproducer

var properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap-servers>");
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
var subscription = SubscriptionBuilder.newBuilder("my-decaton-processor").processorsBuilder(
                ProcessorsBuilder.consuming("my-decaton-topic", new ProtocolBuffersDeserializer<>(SomeTask.parser())))
                                                                .consumerConfig(properties).build();
subscription.close(); // hang-up...
kawamuray commented 3 years ago

Thanks for reporting! Yeah this is definitely a case that we failed to cover in past updates.

I've filed #116 to fix this, so should be addressed soon.

kawamuray commented 3 years ago

Fixed, and released as part of v3.0.1.