apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
153 stars 101 forks source link

Camel sjms2 source connector doesn't stop properly #1522

Open filippzorin opened 1 year ago

filippzorin commented 1 year ago

Hi.

I'm using Apache Camel Kafka Connectors sjms2-source-connector with TIBCO EMS provider. When the broker restarts, the connector tries to restart as well. But when connector restarting, looks like it doesn't stop TIBCO Client completely. And when it starting it creates second connection to JMS and fails with error Client ID already exists. I use package camel-sjms2-kafka-connector-3.18.2 with following config:

{
    "connector.class": "org.apache.camel.kafkaconnector.sjms2.CamelSjms2SourceConnector",
    "camel.source.path.destinationName": "queue_name",
    "camel.component.sjms2.connection-factory": "#class:com.tibco.tibjms.TibjmsConnectionFactory",
    "camel.source.path.destinationType": "queue",
    "topics": "topic_name",
    "camel.source.endpoint.clientId": "client_id",
    "name": "tibco_ems_connector",
    "camel.component.sjms2.connection-factory.serverUrl": "tcp://tibco-ems-host1:7222,tcp://tibco-ems-host2:7222",
    "camel.component.sjms2.connection-factory.userName": "username",
    "camel.component.sjms2.connection-factory.userPassword": "password",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

Also I increased graceful shutdown timeout CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 120000 without this option connector fails while stopping with error like Cannot stop connector gracefully.

The full log of kafka connect and jstack output after error appeared attached. jstack_output.txt kafka_connect_log.txt

Is it a bug or I do something wrong? Thanks for any advise.

filippzorin commented 1 year ago

One more update: jstack state when connector failed on connect-1 service there is connector thread

[appuser@connect-1 ~]$ jstack -e 1 | grep tibco -A 13 -B 1

"connector-thread-tibco_connector" #267 prio=5 os_prio=0 cpu=23.32ms elapsed=108410.10s allocated=921K defined_classes=28 tid=0x00007f3014059800 nid=0x7da in Object.wait()  [0x00007f30ac533000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(java.base@11.0.16.1/Native Method)
        - waiting on <no object reference available>
        at java.lang.Object.wait(java.base@11.0.16.1/Object.java:328)
        at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:154)
        - waiting to re-lock in wait() <0x0000000704e98058> (a org.apache.kafka.connect.runtime.WorkerConnector)
        at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)
        at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.16.1/Executors.java:515)
        at java.util.concurrent.FutureTask.run(java.base@11.0.16.1/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.16.1/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.16.1/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@11.0.16.1/Thread.java:829)

And on other host there is TIBCO EMS TCPLink Reader

[appuser@connect-3 ~]$ jstack -e 1 | grep jms -A 1 -B 11

"TIBCO EMS TCPLink Reader (Server-1049459457)" #325 daemon prio=5 os_prio=0 cpu=932.19ms elapsed=109478.06s allocated=1009K defined_classes=4 tid=0x00007f62ac619800 nid=0x23d runnable  [0x00007f61abffe000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(java.base@11.0.16.1/Native Method)
        at java.net.SocketInputStream.socketRead(java.base@11.0.16.1/SocketInputStream.java:115)
        at java.net.SocketInputStream.read(java.base@11.0.16.1/SocketInputStream.java:168)
        at java.net.SocketInputStream.read(java.base@11.0.16.1/SocketInputStream.java:140)
        at java.io.BufferedInputStream.fill(java.base@11.0.16.1/BufferedInputStream.java:252)
        at java.io.BufferedInputStream.read(java.base@11.0.16.1/BufferedInputStream.java:271)
        - locked <0x000000070513b8a8> (a java.io.BufferedInputStream)
        at java.io.DataInputStream.readInt(java.base@11.0.16.1/DataInputStream.java:392)
        at com.tibco.tibjms.TibjmsxLinkTcp._readWireMsg(TibjmsxLinkTcp.java:630)
        at com.tibco.tibjms.TibjmsxLinkTcp$LinkReader.work(TibjmsxLinkTcp.java:285)
        at com.tibco.tibjms.TibjmsxLinkTcp$LinkReader.run(TibjmsxLinkTcp.java:264)

And after deleting connector from kafka connect I see that connector thread was deleted from jstack, but I still see TIBCO EMS TCPLink Reader in jstack

[appuser@connect-3 ~]$ jstack -e 1 | grep jms -A 1 -B 11

"TIBCO EMS TCPLink Reader (Server-1049459457)" #325 daemon prio=5 os_prio=0 cpu=938.54ms elapsed=110211.84s allocated=1009K defined_classes=4 tid=0x00007f62ac619800 nid=0x23d runnable  [0x00007f61abffe000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(java.base@11.0.16.1/Native Method)
        at java.net.SocketInputStream.socketRead(java.base@11.0.16.1/SocketInputStream.java:115)
        at java.net.SocketInputStream.read(java.base@11.0.16.1/SocketInputStream.java:168)
        at java.net.SocketInputStream.read(java.base@11.0.16.1/SocketInputStream.java:140)
        at java.io.BufferedInputStream.fill(java.base@11.0.16.1/BufferedInputStream.java:252)
        at java.io.BufferedInputStream.read(java.base@11.0.16.1/BufferedInputStream.java:271)
        - locked <0x000000070513b8a8> (a java.io.BufferedInputStream)
        at java.io.DataInputStream.readInt(java.base@11.0.16.1/DataInputStream.java:392)
        at com.tibco.tibjms.TibjmsxLinkTcp._readWireMsg(TibjmsxLinkTcp.java:630)
        at com.tibco.tibjms.TibjmsxLinkTcp$LinkReader.work(TibjmsxLinkTcp.java:285)
        at com.tibco.tibjms.TibjmsxLinkTcp$LinkReader.run(TibjmsxLinkTcp.java:264)

And I guess this is cause why I cannot restart or recreate connector without restarting kafka connect service. Is it possible to fix it or workaround manually or using script (like remove this thread manually)? Or maybe I missed something in connector config?