apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.13k stars 3.57k forks source link

broker开启function功能时的多集群跨地域复制问题 #14227

Closed JarvisZhu closed 2 years ago

JarvisZhu commented 2 years ago

pulsar集群1:10.66.107.31/32/33,每台服务器上一个broker实例、一个bookie实例 local zookeeper集群:10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183

pulsar集群2:10.66.107.37/38/39,每台服务器上一个broker实例、一个bookie实例 local zookeeper集群:10.66.107.36:2181,10.66.107.36:2182,10.66.107.36:2183

共享的存储配置Zookeeper集群:10.66.107.35:2181,10.66.107.35:2182,10.66.107.35:2183

集群1的配置如下(以31为例): broker.conf: zookeeperServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183 configurationStoreServers=10.66.107.35:2181,10.66.107.35:2182,10.66.107.35:2183 brokerServicePortTls=6651 webServicePortTls=8443 advertisedAddress=10.66.107.31 clusterName=pulsar-cluster-1 functionsWorkerEnabled=true

bookkeeper.conf advertisedAddress=10.66.107.31 bookieId=31 zkServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183 httpServerEnabled=true

functions_worker.yml workerId: 31 workerHostname: 10.66.107.31 configurationStoreServers: 10.66.107.34:2181 pulsarFunctionsCluster: pulsar-cluster-1 stateStorageServiceUrl: bk://localhost:4181 pulsarFunctionsNamespace: public/functions1

集群2的配置如下(以37为例): broker.conf zookeeperServers=10.66.107.36:2181,10.66.107.36:2182,10.66.107.36:2183 configurationStoreServers=10.66.107.35:2181,10.66.107.35:2182,10.66.107.35:2183 brokerServicePortTls=6651 webServicePortTls=8443 advertisedAddress=10.66.107.37 clusterName=pulsar-cluster-2 functionsWorkerEnabled=true

bookkeeper.conf advertisedAddress=10.66.107.37 bookieId=37 zkServers=10.66.107.36:2181,10.66.107.36:2182,10.66.107.36:2183 httpServerEnabled=true

functions_worker.yml workerId: 37 workerHostname: 10.66.107.37 configurationStoreServers: 10.66.107.36:2181 pulsarFunctionsCluster: pulsar-cluster-2 stateStorageServiceUrl: bk://localhost:4181 pulsarFunctionsNamespace: public/functions2

问题1: 两个集群一共6个broker.conf中的functionsWorkerEnabled=true。 第一个集群bookie、broker启动正常,第二个集群的三个bookie启动正常,但是三个broker启动报错(后将functionsWorkerEnabled全部改为false后正常): 15:17:55.899 [ForkJoinPool.commonPool-worker-1] WARN org.apache.pulsar.broker.web.PulsarWebResource - Namespace missing local cluster name in clusters list: local_cluster=pulsar-cluster-2 ns=public/functions clusters=[pulsar-cluster-1] 15:17:55.924 [pulsar-web-40-15] INFO org.eclipse.jetty.server.RequestLog - 10.66.107.37 - - [26/Jan/2022:15:17:55 +0800] "PUT /admin/v2/persistent/public/functions/assignments HTTP/1.1" 412 60 "-" "Pulsar-Java-v2.8.0" 139 15:17:55.933 [AsyncHttpClient-57-1] WARN org.apache.pulsar.client.admin.internal.BaseResource - [http://10.66.107.37:8080/admin/v2/persistent/public/functions/assignments] Failed to perform http put request: javax.ws.rs.ClientErrorException: HTTP 412 Precondition Failed 15:17:55.944 [main] ERROR org.apache.pulsar.functions.worker.PulsarWorkerService - Error Starting up in worker org.apache.pulsar.client.admin.PulsarAdminException$PreconditionFailedException: Namespace does not have any clusters configured at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:236) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:882) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:863) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131] at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:254) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131] at org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131] Caused by: javax.ws.rs.ClientErrorException: HTTP 412 Precondition Failed at org.glassfish.jersey.client.JerseyInvocation.createExceptionForFamily(JerseyInvocation.java:985) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:967) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.JerseyInvocation.access$700(JerseyInvocation.java:82) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] ... 54 more

问题2: 将functionsWorkerEnabled全部改为false后正常,但是在集群1某个节点上执行如下命令配置从pulsar-cluster-1到pulsar-cluster-2的跨地域复制时: bin/pulsar-admin clusters create \ --broker-url pulsar://10.66.107.37:6650,10.66.107.38:6650,10.66.107.39:6650 \ --url http://10.66.107.37:8080,10.66.107.38:8080,10.66.107.39:8080 \ pulsar-cluster-2 报错: 22:26:57.207 [AsyncHttpClient-7-1] WARN org.apache.pulsar.client.admin.internal.BaseResource - [http://10.66.107.32:8080/admin/v2/clusters/pulsar-cluster-2] Failed to perform http put request: javax.ws.rs.ClientErrorException: HTTP 409 Conflict Cluster already exists Reason: Cluster already exists 难道按照上述步骤搭建完之后,自动就是双向的全连通复制了?如果是这样的话,单向复制模式和failover模式该如何搭建呢?

问题3:

测试结果:

1.使用命令行在两个集群中发送消息:先在集群1上进行消费,消费完之后连到集群2上重启Java代码消费者,没有重复消费; 2.使用命令行在两个集群中发送消息:先在集群2上进行消费,消费完之后连到集群1上重启Java代码消费者,有重复消费;

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.