apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.25k stars 3.58k forks source link

RabbitMQ source connector can't connect #14156

Open spartakos87 opened 2 years ago

spartakos87 commented 2 years ago

Describe the bug Hello, I am trying to connect Pulsar with RabbitMQ. For that, I am trying to set up a source connector To Reproduce Steps to reproduce the behavior:

  1. First of all I set up Pulsar with docker, docker run -it --name my-pulsar -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar-all:latest bin/pulsar standalone
  2. Then I have created the YAML file for the connector, configs: host: "" port: 5672 virtualHost: "/" username: "myname" password: "mypass" queueName: "test1" connectionName: "test1-connection" requestedChannelMax: 0 requestedFrameMax: 0 connectionTimeout: 60000 handshakeTimeout: 10000 requestedHeartbeat: 60 prefetchCount: 0 prefetchGlobal: "false" passive: "false" With name rabbitmq.yaml, which I placed in folder examples.
  3. Then I created the connector with pulsar-admin ./bin/pulsar-admin sources create --tenant public --namespace default --name rabbit-connector --source-type rabbitmq --source-config-file examples/rabbitmq.yaml --destination-topic-name dst-topic
  4. Then the connector keep restarted forever.
  5. In logs a get this, ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - Source open produced uncaught exception: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:?] at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399) ~[?:?] at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242) ~[?:?] at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224) ~[?:?] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:?] at java.net.Socket.connect(Socket.java:609) ~[?:?] at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.1.1.jar:5.1.1] at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62) ~[amqp-client-5.1.1.jar:5.1.1] at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99) ~[amqp-client-5.1.1.jar:5.1.1] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:948) ~[amqp-client-5.1.1.jar:5.1.1] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.1.jar:5.1.1] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1034) ~[amqp-client-5.1.1.jar:5.1.1] at org.apache.pulsar.io.rabbitmq.RabbitMQSource.open(RabbitMQSource.java:65) ~[qCOnir1sptZl9djvqq9k1g/:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:735) [org.apache.pulsar-pulsar-functions-instance-2.8.1.jar:2.8.1] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:219) [org.apache.pulsar-pulsar-functions-instance-2.8.1.jar:2.8.1] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:243) [org.apache.pulsar-pulsar-functions-instance-2.8.1.jar:2.8.1] at java.lang.Thread.run(Thread.java:829) [?:?]

The RabbitMQ is tested and runs smoothly with the given credentials. Any idea what happened here?

nicoloboschi commented 2 years ago

It looks like the pulsar process can't reach the rabbitmq server. Could you try to test if, from inside the container, are you able to reach the rabbitmq server ?

spartakos87 commented 2 years ago

I am trying to connect with RabbitMQ from Pulsar container and I get the messages. I use the IP of the container which contained the RabbitMQ. So the in YAML file I define the host as that IP, in my case 172.17.0.2 . The I try to make a connector as I said and I when try to get status of connector I get this, { "numInstances" : 1, "numRunning" : 0, "instances" : [ { "instanceId" : 0, "status" : { "running" : false, "error" : "UNAVAILABLE: io exception", "numRestarts" : 0, "numReceivedFromSource" : 0, "numSystemExceptions" : 0, "latestSystemExceptions" : [ ], "numSourceExceptions" : 0, "latestSourceExceptions" : [ ], "numWritten" : 0, "lastReceivedTime" : 0, "workerId" : "c-standalone-fw-localhost-8080" } } ] }

In log I get this error, 09:50:14.287 [pulsar-timer-82-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [persistent://public/functions/assignments] [c-standalone-fw-localhost-8080-scheduler-manager] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 2.000 ms - 99pct: 2.000 ms - 99.9pct: 2.000 ms - max: 2.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0 09:50:14.333 [pulsar-timer-82-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [persistent://public/functions/metadata] [c-standalone-fw-localhost-8080-leader] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 2.000 ms - 99pct: 2.000 ms - 99.9pct: 2.000 ms - max: 2.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0 09:50:26.729 [function-timer-thread-93-1] ERROR org.apache.pulsar.functions.runtime.process.ProcessRuntime - Health check failed for rabbit-connector-0 java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.pulsar.functions.runtime.process.ProcessRuntime.lambda$start$1(ProcessRuntime.java:186) ~[org.apache.pulsar-pulsar-functions-runtime-2.8.1.jar:2.8.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at io.grpc.Status.asRuntimeException(Status.java:533) ~[io.grpc-grpc-api-1.33.0.jar:1.33.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533) ~[io.grpc-grpc-stub-1.33.0.jar:1.33.0] at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:616) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:69) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:802) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:781) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[io.grpc-grpc-core-1.33.0.jar:1.33.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] ... 1 more Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /127.0.0.1:37605 Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused at io.grpc.netty.shaded.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:243) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.grpc-grpc-netty-shaded-1.33.0.jar:1.33.0] ... 1 more

spartakos87 commented 2 years ago

I am trying the same in local, without docker, but I get the same error "UNAVAILABLE: io exception"

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

spartakos87 commented 2 years ago

Hello again This time I run Pulsar locally from binary with the yaml, configs: host: "localhost" port: 5672 virtualHost: "/" username: "username" password: "password" queueName: "topic" connectionName: "my-connection" requestedChannelMax: 0 requestedFrameMax: 0 connectionTimeout: 60000 handshakeTimeout: 10000 requestedHeartbeat: 60 prefetchCount: 0 prefetchGlobal: "false" passive: "false"

For few moments appear to rabbitmq dashboard the "my-connection" and seems that pulsar read some messages but I get this error this time...

`2022-06-15T19:19:14,949+0300 [function-timer-thread-78-1] ERROR org.apache.pulsar.functions.runtime.process.ProcessRuntime - Health check failed for rabbit-connector-local-0 java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.pulsar.functions.runtime.process.ProcessRuntime.lambda$start$1(ProcessRuntime.java:184) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.0.jar:2.10.0] at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) [org.apache.pulsar-pulsar-common-2.10.0.jar:2.10.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at io.grpc.Status.asRuntimeException(Status.java:535) ~[io.grpc-grpc-api-1.42.1.jar:1.42.1] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:534) ~[io.grpc-grpc-stub-1.42.1.jar:1.42.1] at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] ... 1 more Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /127.0.0.1:46247 Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused at io.grpc.netty.shaded.io.netty.channel.unix.Errors.newConnectException0(Errors.java:155) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:278) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:687) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:470) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1] ... 1 more

` Any ideas?