apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.28k stars 3.59k forks source link

function worker doesn't use configurationStoreServers parameter while creating function #11261

Open coute opened 3 years ago

coute commented 3 years ago

Describe the bug When I try to create a function in my cluster, I have this error :

Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.167 [pulsar-web-41-5] INFO  org.apache.pulsar.functions.worker.rest.api.ComponentImpl - Uploading Function package to fm1/functions/f_java_brain_message_adapter/9fea4fe9-6f33-431a-be10-15f08fe773ac-metrics-1.0-SNAPSHOT-jar-with-dependencies.jar
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.174 [pulsar-web-41-5] INFO  org.apache.pulsar.functions.worker.WorkerUtils - Uploading function package to 'fm1/functions/f_java_brain_message_adapter/9fea4fe9-6f33-431a-be10-15f08fe773ac-metrics-1.0-SNAPSHOT-jar-with-dependencies.jar'
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.210 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /pulsar/functions/fm1/functions/f_java_brain_message_adapter/9fea4fe9-6f33-431a-be10-15f08fe773ac-metrics-1.0-SNAPSHOT-jar-with-dependencies.jar/<default>/allocation moved version from -1 to 0.
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.219 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For fm1/functions/f_java_brain_message_adapter/9fea4fe9-6f33-431a-be10-15f08fe773ac-metrics-1.0-SNAPSHOT-jar-with-dependencies.jar:<default> : []
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.221 [pulsar-web-41-5] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For fm1/functions/f_java_brain_message_adapter/9fea4fe9-6f33-431a-be10-15f08fe773ac-metrics-1.0-SNAPSHOT-jar-with-dependencies.jar:<default> : []
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.223 [pulsar-web-41-5] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /pulsar/functions/fm1/functions/f_java_brain_message_adapter/9fea4fe9-6f33-431a-be10-15f08fe773ac-metrics-1.0-SNAPSHOT-jar-with-dependencies.jar/<default>/allocation moved to phase ALLOCATING : version = 0.
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.225 [pulsar-web-41-5] INFO  org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase@4d00cb8e
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.225 [pulsar-web-41-5] INFO  org.apache.zookeeper.ClientCnxnSocket - jute.maxbuffer value is 10485760 Bytes
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.226 [pulsar-web-41-5] INFO  org.apache.zookeeper.ClientCnxn - zookeeper.request.timeout value is 0. feature enabled=false
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.227 [pulsar-web-41-5-SendThread(localhost:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2181.
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.227 [pulsar-web-41-5-SendThread(localhost:2181)] INFO  org.apache.zookeeper.ClientCnxn - SASL config status: Will not attempt to authenticate using SASL (unknown error)
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: 08:52:41.228 [pulsar-web-41-5-SendThread(localhost:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session 0x0 for sever localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException.
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]: java.net.ConnectException: Connection refused
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]:         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]:         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) ~[?:?]
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]:         at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344) ~[org.apache.zookeeper-zookeeper-3.6.3.jar:3.6.3]
Jul 08 08:52:41 ip-10-0-2-32 pulsar[10797]:         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290) [org.apache.zookeeper-zookeeper-3.6.3.jar:3.6.3]

I deployed pulsar in cluster mode and Zookeeper service is not installed on broker servers. ConfigurationStoreServers is set with zookeeper server IP but functions worker try to connect to it with localhost.

Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]: 08:52:36.808 [main] INFO  org.apache.pulsar.functions.worker.PulsarWorkerService - Worker Configs: {
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "workerId" : "c-local-fw-10.0.2.32-8080",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "workerHostname" : "10.0.2.32",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "workerPort" : 8080,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "workerPortTls" : 8443,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "authenticateMetricsEndpoint" : true,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "includeStandardPrometheusMetrics" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "jvmGCMetricsLoggerClassName" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "numHttpServerThreads" : 8,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "httpRequestsLimitEnabled" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "httpRequestsMaxPerSecond" : 100.0,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "configurationStoreServers" : "10.0.2.115:2181,10.0.2.15:2181,10.0.2.229:2181",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "zooKeeperSessionTimeoutMillis" : 30000,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "zooKeeperOperationTimeoutSeconds" : 30,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "zooKeeperCacheExpirySeconds" : 300,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "connectorsDirectory" : "./connectors",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "narExtractionDirectory" : "/tmp",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "validateConnectorConfig" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionsDirectory" : "./functions",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionMetadataTopicName" : "metadata",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "useCompactedMetadataTopic" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionWebServiceUrl" : "http://10.0.2.32:8080",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "pulsarServiceUrl" : "pulsar://10.0.2.32:6650",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "pulsarWebServiceUrl" : "http://10.0.2.32:8080",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "clusterCoordinationTopicName" : "coordinate",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "pulsarFunctionsNamespace" : "public/functions",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "pulsarFunctionsCluster" : "local",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "numFunctionPackageReplicas" : 3,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "downloadDirectory" : "./download/pulsar_functions",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "stateStorageServiceUrl" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionAssignmentTopicName" : "assignments",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "schedulerClassName" : "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "failureCheckFreqMs" : 30000,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "rescheduleTimeoutMs" : 60000,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "rebalanceCheckFreqSec" : -1,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "initialBrokerReconnectMaxRetries" : 60,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "assignmentWriteMaxRetries" : 60,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "instanceLivenessCheckFreqMs" : 30000,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "brokerClientAuthenticationEnabled" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "brokerClientAuthenticationPlugin" : "org.apache.pulsar.client.impl.auth.AuthenticationDisabled",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "brokerClientAuthenticationParameters" : "",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "bookkeeperClientAuthenticationPlugin" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "bookkeeperClientAuthenticationParametersName" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "bookkeeperClientAuthenticationParameters" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "topicCompactionFrequencySec" : 1800,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsEnabled" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsCertificateFilePath" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsKeyFilePath" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsTrustCertsFilePath" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsAllowInsecureConnection" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsRequireTrustedClientCertOnConnect" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "useTls" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsEnableHostnameVerification" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsCertRefreshCheckDurationSec" : 300,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "authenticationEnabled" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "authenticationProviders" : [ ],
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "authorizationEnabled" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "authorizationProvider" : "org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "superUserRoles" : [ ],
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "properties" : { },
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "initializedDlogMetadata" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "brokerClientTrustCertsFilePath" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionRuntimeFactoryClassName" : "org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionRuntimeFactoryConfigs" : {
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:     "logDirectory" : "logs/",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:     "javaInstanceJarLocation" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:     "pythonInstanceLocation" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:     "extraFunctionDependenciesDir" : null
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   },
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "secretsProviderConfiguratorClassName" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "secretsProviderConfiguratorConfig" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionInstanceMinResources" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionInstanceMaxResources" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionInstanceResourceGranularities" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionInstanceResourceChangeInLockStep" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionAuthProviderClassName" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "runtimeCustomizerClassName" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "runtimeCustomizerConfig" : { },
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "maxPendingAsyncRequests" : 1000,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "forwardSourceMessageProperty" : true,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionsWorkerServiceNarPackage" : "",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionsWorkerServiceCustomConfigs" : { },
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "exposeAdminClientEnabled" : false,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "threadContainerFactory" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "processContainerFactory" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "kubernetesContainerFactory" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "clientAuthenticationParameters" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "clientAuthenticationPlugin" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionMetadataTopic" : "persistent://public/functions/metadata",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "clusterCoordinationTopic" : "persistent://public/functions/coordinate",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "functionAssignmentTopic" : "persistent://public/functions/assignments",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "tlsTrustChainBytes" : null,
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "workerWebAddress" : "http://10.0.2.32:8080",
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]:   "workerWebAddressTls" : "https://10.0.2.32:8443"
Jul 08 08:52:36 ip-10-0-2-32 pulsar[10797]: }

I ran this command to add my function :

pulsar-admin --admin-url http://10.0.2.32:8080 functions create --jar java_functions/metrics-1.0-SNAPSHOT-jar-with-dependencies.jar --name f_java_brain_message_adapter --tenant fm1 --namespace functions --classname com.dopi.metrics.functions.adapters.BrainMessageAdapterFunction --inputs persistent://fm1/topics/dopi-brain-new --parallelism 1 --user-config '{"context-deployment-prefix":"fm1::"}' --retain-ordering

To Reproduce Steps to reproduce the behavior:

  1. Install pulsar in cluster mode with 2 brokers and 3 zookeepers on separate servers
  2. Run function worker with broker
  3. Try to add a function

I have the same problem when I configure functions-worker to run separately

Operating System

coute commented 3 years ago

I have the same issue with 3 brokers and numFunctionPackageReplicas set to 3.

codelipenghui commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

emanueledomingo commented 2 years ago

HI everyone, i actually have the same problem. I setted configurationStoreServers:

# Configuration Store connection string
configurationStoreServers:  <my-zk-ip>:2181

but it tries connecting to 'localhost:2181':

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase

Using pulsar 2.9.2, base file found here.

EDIT: It seems that the worker gets the connectString from the cluster metadata (done in initialize-cluster-metadata). Is this correct? If yes, is this an expected behaviour?