eclipse-hono / hono

Eclipse Hono™ Project
https://eclipse.dev/hono
Eclipse Public License 2.0
449 stars 137 forks source link

Protocol adapter shutdown: Cannot perform [send] operation after producer has been closed #3178

Open calohmn opened 2 years ago

calohmn commented 2 years ago

When using Kafka-based messaging: On protocol adapter shutdown, there may be errors like these getting logged:

WARN org.eclipse.hono.tracing.TracingHelper
An unexpected error occurred! [logged on span [8df2f9a8fbb2799c:33ca4d94d9776346:8ceb03ecb09127ef:1 - forward Event]]
java.lang.IllegalStateException: Cannot perform operation after producer has been closed
    at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:892)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:901)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
    at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:76)
    at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:159)
    at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:157)
    at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Unknown Source)
avgustinmm commented 2 years ago

As far as I see, before sending a record via KafkaProducer hono checks that lifecycle status is 'started' - hence the component hasn't received stopped event yet and the Hono hasn't called KafkaProducer.close. However, it seems to me that, the thread model of vertx KafkaProducer implies that if hono component call send-then-close the send won't fail (at least not with that stack trace). I see that Vertx kafka producer uses to register close hooks for vertx/context close/remove. So, maybe these hooks close producers on shutdown even before Hono components have got stop event.

avgustinmm commented 2 years ago

just fyi running:

final Vertx vertx = Vertx.vertx(); ((VertxInternal)vertx).addCloseHook(p -> { System.out.println("[vertx hook before] vert closed"); p.complete();; }); vertx.deployVerticle(new AbstractVerticle() { @Override public void stop(final Promise stopPromise) throws Exception { System.out.println("[stop] stop verticle"); super.stop(stopPromise); } }).onComplete(v -> { ((VertxInternal)vertx).addCloseHook(p -> { System.out.println("[vertx hook after] vert closed"); p.complete();; }); vertx.close(); });

produces for me:

[vertx hook before] vert closed [vertx hook after] vert closed [stop] stop verticle

so it seems that Vertx close handlers are called before stopping verticles. For context close hooks it seems that verticle stop is called before them.