apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.19k stars 3.58k forks source link

Can not use Context#Counter in function #6589

Open zyllt opened 4 years ago

zyllt commented 4 years ago

Describe the bug When I created a demo function(WordCountFunction) in the cluster, I got an exception.According to the source code, I found that Function#Context#stateContext needs the support of StreamStorage.I started StreamStorage use extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent and restart WordCountFunction,i found the WordCountFunction thread is parking. In my pulsar cluster,broker and bookie deploy on different machine. To Reproduce Steps to reproduce the behavior:

  1. start StreamStorage with a bookie server at machine A
  2. start WordCountFunction with a broker server at machine B
  3. WordCountFunction thread is parking at JavaInstanceRunnable.createStateTable(JavaInstanceRunnable.java:345)

Screenshots

19:27:11.455 [test/test-namespace/WordCountFunction-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Starting Java Instance WordCountFunction :
 Details = tenant: "test"
namespace: "test-namespace"
name: "WordCountFunction"
className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
userConfig: "{\"PublishTopic\":\"test_result\"}"
autoAck: true
parallelism: 1
source {
  typeClassName: "java.lang.String"
  inputSpecs {
    key: "test/test-namespace/test_src"
    value {
    }
  }
  cleanupSubscription: true
}
sink {
  topic: "test/test-namespace/test_result"
  typeClassName: "java.lang.Void"
}
resources {
  cpu: 1.0
  ram: 1073741824
  disk: 10737418240
}
componentType: FUNCTION

19:27:11.455 [test/test-namespace/WordCountFunction-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Load JAR: /usr/local/pulsar-2.5.0/download/pulsar_functions/test/test-namespace/WordCountFunction/0/pulsar-functions-api-examples.jar
19:27:11.467 [test/test-namespace/WordCountFunction-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Initialize function class loader for function WordCountFunction at function cache manager
19:27:11.920 [client-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.clients.impl.channel.StorageServerChannelManager - Added range server (hostname: "127.0.0.1"
port: 4181
) into the channel manager.

jstack -l thread info

"test/test-namespace/WordCountFunction-0" #29 prio=5 os_prio=0 tid=0x00007f9215a92800 nid=0x5c06 waiting on condition [0x00007f9144d2d000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000efe1e060> (a java.util.concurrent.CompletableFuture$Signaller)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at org.apache.bookkeeper.common.concurrent.FutureUtils.result(FutureUtils.java:72)
    at org.apache.bookkeeper.common.concurrent.FutureUtils.result(FutureUtils.java:61)
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.createStateTable(JavaInstanceRunnable.java:345)
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupStateTable(JavaInstanceRunnable.java:397)
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:208)
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:244)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

Desktop (please complete the following information):

ls /stream
[assignment, controller, metadata, servers, storage]
ls /stream/servers
[available]
ls /stream/servers/available
[127.0.0.1:4181]
sijie commented 4 years ago

Can you describe how do you deploy the cluster?

zyllt commented 4 years ago

@sijie thanks for your reply. I deploy the pulsar cluster on bare metal.my cluster has five machines.Two of them have brokers and three have bookies.local-zk and configuration-store deployed on all machines.I started functions-woker within brokers use Thread Runtime.
At the beginning I started a demo-function named WordCountFunction,then i trigger it and broker log outputs a exception. image According to the exception messages, I looked at the source code and found that using the Function# Context#stateContext requires starting a StreamStorage component. I first started this component in a single bookie use extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent. The Booki Log shows a successful start.And I use netstat -ant | grep -i listen | grep 4181 command to see that port 4181 is already listening at 0.0.0.0.
In the configuration file functions_worker.yml I set the stateStorageServiceUrl configuration to bk://10.1.0.112:4181.that is IP of bookie. After restarting the broker I deleted the demo-function WordCountFunction I created earlier and recreated it again.But when I check the logs, I find that the startup of the demo-function is parked.
I use command of pulsar-admin functions trigger,prompt me function does not exist.

19:27:11.455 [test/test-namespace/WordCountFunction-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Load JAR: /usr/local/pulsar-2.5.0/download/pulsar_functions/test/test-namespace/WordCountFunction/0/pulsar-functions-api-examples.jar
19:27:11.467 [test/test-namespace/WordCountFunction-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Initialize function class loader for function WordCountFunction at function cache manager
19:27:11.920 [client-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.clients.impl.channel.StorageServerChannelManager - Added range server (hostname: "127.0.0.1"
port: 4181
) into the channel manager.

I suspect it may be a problem with this Added range server (hostname: "127.0.0.1" port: 4181), because StreamStorageServer is on another machine, but I have configured the correct stateStorageServiceUrl in functions_worker.yml.
I think that the hostname should be obtained in zk according to the source code, and then I connected to local-zk to check ls /stream/servers/available path found that the output is [127.0.0.1:4181]. I think it may be that the real IP address cannot be obtained when registering for zk,but I cannot confirm whether this problem is caused.
Then I started an independent functions-worker on the bookie machine, and then started the demo-function and trigger it,then i found that everything was normal. I tested other cases,you can see here. https://github.com/apache/bookkeeper/issues/2216#issuecomment-602995573

sijie commented 4 years ago

I see. I think this is the problem is on advertisement. It seems to advertise the address using 127.0.0.1. We need to fix the issue at apache/bookkeeper#2216.