apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.17k stars 3.57k forks source link

[Bug] NullPointerException with dlogNamespace when running Pulsar Functions with etcd metadata store #19660

Open abeliangroupie opened 1 year ago

abeliangroupie commented 1 year ago

Search before asking

Version

OS version: Ubuntu 22.04 Pulsar version: 2.11.0 (using apachepulsar/pulsar-all:2.11.0 Docker images)

Minimal reproduce step

It's not the cleanest, but here's a Docker Compose I've been working with. Note that it spins up 3 etcd + 3 bookie + 1 broker container.

version: "3.9"

services:
  # etcd cluster.
  etcd1:
    image: quay.io/coreos/etcd:v3.5.7
    container_name: etcd1
    restart: always
    networks:
      - pulsar
    command: >
      /usr/local/bin/etcd
      --name node1
      --initial-advertise-peer-urls http://etcd1:2380
      --listen-peer-urls http://0.0.0.0:2380
      --advertise-client-urls http://etcd1:2379
      --listen-client-urls http://0.0.0.0:2379
      --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
      --initial-cluster-state new
      --initial-cluster-token initial-token

  etcd2:
    image: quay.io/coreos/etcd:v3.5.7
    container_name: etcd2
    restart: always
    networks:
      - pulsar
    command: >
      /usr/local/bin/etcd
      --name node2
      --initial-advertise-peer-urls http://etcd2:2380
      --listen-peer-urls http://0.0.0.0:2380
      --advertise-client-urls http://etcd2:2379
      --listen-client-urls http://0.0.0.0:2379
      --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
      --initial-cluster-state new
      --initial-cluster-token initial-token

  etcd3:
    image: quay.io/coreos/etcd:v3.5.7
    container_name: etcd3
    restart: always
    networks:
      - pulsar
    command: >
      /usr/local/bin/etcd
      --name node3
      --initial-advertise-peer-urls http://etcd3:2380
      --listen-peer-urls http://0.0.0.0:2380
      --advertise-client-urls http://etcd3:2379
      --listen-client-urls http://0.0.0.0:2379
      --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
      --initial-cluster-state new
      --initial-cluster-token initial-token

  # Container that only runs once to initialise metadata.
  bootstrap:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bootstrap
    depends_on:
      - etcd1
      - etcd2
      - etcd3
    networks:
      - pulsar
    command: >
      bin/pulsar initialize-cluster-metadata \
        --cluster my-pulsar \
        --metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \
        --configuration-metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \
        --web-service-url http://broker1:8080 \
        --broker-service-url pulsar://broker1:6650 \

  # BookKeeper cluster.
  bookie1:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bookie1
    restart: always
    depends_on:
      bootstrap:
        condition: service_completed_successfully
    networks:
      - pulsar
    volumes:
      - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
    command: bin/pulsar bookie

  bookie2:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bookie2
    restart: always
    depends_on:
      bootstrap:
        condition: service_completed_successfully
    networks:
      - pulsar
    volumes:
      - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
    command: bin/pulsar bookie

  bookie3:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bookie3
    restart: always
    depends_on:
      bootstrap:
        condition: service_completed_successfully
    networks:
      - pulsar
    volumes:
      - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
    command: bin/pulsar bookie

  # Broker.
  broker:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: broker
    restart: always
    depends_on:
      - bookie1
      - bookie2
      - bookie3
    networks:
      - pulsar
    volumes:
      - ${PWD}/broker.conf:/pulsar/conf/broker.conf
      - ${PWD}/functions_worker.yml:/pulsar/conf/functions_worker.yml
    command: bin/pulsar broker

networks:
  pulsar:
    driver: bridge

Config changes are as follows:

bookkeeper.conf:

useHostNameAsBookieID=true
metadataServiceUri=metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379

broker.conf:

clusterName=my-pulsar
metadataStoreUrl=etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379
functionsWorkerEnabled=true

functions_worker.yml

pulsarFunctionsCluster: my-pulsar
configurationMetadataStoreUrl: metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379

To trigger the error, from inside the broker container:

$ bin/pulsar-admin functions create \
    --py test_function.py \
    --classname test_function.ExamplePulsarFunction \
    --tenant public \
    --namespace default \
    --name test-function \
    --inputs persistent://public/default/test-input-topic \
    --output persistent://public/default/test-output-topic

What did you expect to see?

The Pulsar function successfully being added (or, with the test command using touch above, Pulsar complaining that test_function.ExamplePulsarFunction doesn't exist).

What did you see instead?

The error seen is:

Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

Reason: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

Traceback from the broker image:

2023-02-28T05:49:23,816+0000 [pulsar-web-38-11] ERROR org.apache.pulsar.functions.worker.rest.api.FunctionsImpl - Failed process Function public/default/test-function package: 
java.lang.NullPointerException: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null
    at org.apache.pulsar.functions.worker.WorkerUtils.uploadToBookKeeper(WorkerUtils.java:90) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
    at org.apache.pulsar.functions.worker.WorkerUtils.uploadFileToBookkeeper(WorkerUtils.java:80) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
    at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.getFunctionPackageLocation(ComponentImpl.java:416) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
    at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:240) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
    at org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:200) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:67) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
    at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[org.eclipse.jetty-jetty-servlets-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]

Anything else?

I suspect the main issue is that various parts of dlog implementation are still ZooKeeper-specific, for example:

This all means that by the time uploadToBookKeeper() is called in WorkerUtils, dlogNamespace is null and dlogNamespace.logExists() fails.

Are you willing to submit a PR?

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.

packageman commented 1 year ago

@abeliangroupie I encountered the same problem, is there any update from your side?

abeliangroupie commented 11 months ago

@abeliangroupie I encountered the same problem, is there any update from your side?

I ended up giving up on etcd and went back to ZooKeeper for now, unfortunately

liangyuanpeng commented 9 months ago

@packageman What's your pulsar verison when problem here, same with this issue of pulsar 2.11? Have any test more high pulsar version?

packageman commented 8 months ago

What's your pulsar verison when problem here。

@liangyuanpeng My pulsar version is 3.1.0. Yes, same with this issue of pulsar 2.11。

Currently, the latest version is 3.1.2, I haven't tested on this version。