apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.13k stars 3.57k forks source link

[Bug] python function ignores `--subs-position` #17772

Open dionjansen opened 1 year ago

dionjansen commented 1 year ago

Search before asking

Version

Pulsar version: 2.10.1

Minimal reproduce step

Follow Pulsar functions quickstart. I'm using Docker to run Pulsar standalone:

1. Run pulsar standalone

First, create a new Python file:

$ touch reverse.py

In that file, add the following:

def process(input):
    return input[::-1]
$ docker run --name pulsar \
  --rm -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --volume `pwd`/reverse.py:/pulsar/reverse.py \
  apachepulsar/pulsar:2.10.1 bin/pulsar standalone

2. Produce messages to the source topic

$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-client produce \
  persistent://public/default/backwards \
  --messages '1 tset,2 tset,3 tset'"

3. Consume results

$ docker exec -it pulsar bash -c "bin/pulsar-client consume \
  persistent://public/default/forwards \
  --subscription-name test \
  --num-messages 0"

4. Start the function with --subs-position Earliest

# using create
$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-admin functions create \
  --py '/pulsar/reverse.py' \
  --classname reverse \
  --inputs persistent://public/default/backwards \
  --output persistent://public/default/forwards \
  --tenant public \
  --namespace default \
  --subs-position Earliest \
  --name reverse"

# or localrun
$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-admin functions localrun \
  --py '/pulsar/reverse.py' \
  --classname reverse \
  --inputs persistent://public/default/backwards \
  --output persistent://public/default/forwards \
  --tenant public \
  --namespace default \
  --subs-position Earliest \
  --name reverse"

What did you expect to see?

The consumer started in step (3) should output:

----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CDsQAyAA, __pfn_input_topic__=persistent://public/default/backwards], content:tets 1
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CDsQBCAA, __pfn_input_topic__=persistent://public/default/backwards], content:tets 2
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CDsQBSAA, __pfn_input_topic__=persistent://public/default/backwards], content:tets 3

What did you see instead?

Nothing is output by the consumer. Because it starts reading at the latest position.

Anything else?

In localrun mode you can see the subscriptionPosition in the function_details is passed to the python function script, but it seems to be ignored:

$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-admin functions localrun \
  --py '/pulsar/reverse.py' \
  --classname reverse \
  --inputs persistent://public/default/backwards \
  --output persistent://public/default/forwards \
  --tenant public \
  --namespace default \
  --subs-position Earliest \
  --name reverse"

2022-09-21T12:09:59,245+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory - Java instance jar location is not defined, using the location defined in system environment : /pulsar/instances/java-instance.jar
2022-09-21T12:09:59,249+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory - Python instance file location is not defined using the location defined in system environment : /pulsar/instances/python-instance/python_instance_main.py
2022-09-21T12:09:59,250+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory - No extra dependencies location is defined in either function worker config or system environment
2022-09-21T12:09:59,328+0000 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/reverse-0 RuntimeSpawner starting function
2022-09-21T12:09:59,329+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Creating function log directory /pulsar/logs/functions/public/default/reverse
2022-09-21T12:09:59,329+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Created or found function log directory /pulsar/logs/functions/public/default/reverse
2022-09-21T12:09:59,330+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - ProcessBuilder starting the process with args python /pulsar/instances/python-instance/python_instance_main.py --py /pulsar/reverse.py --logging_directory /pulsar/logs/functions --logging_file reverse --logging_config_file /pulsar/conf/functions-logging/logging_config.ini --instance_id 0 --function_id d094e40d-3b40-4d97-9a48-bfbe3703b3f2 --function_version 29c06a13-31f8-462a-b175-e112a599ae24 --function_details '{"tenant":"public","namespace":"default","name":"reverse","className":"reverse","runtime":"PYTHON","autoAck":true,"parallelism":1,"source":{"inputSpecs":{"persistent://public/default/backwards":{}},"cleanupSubscription":true,"subscriptionPosition":"EARLIEST"},"sink":{"topic":"persistent://public/default/forwards","forwardSourceMessageProperty":true},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}' --pulsar_serviceurl pulsar://localhost:6650 --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 45649 --metrics_port 42105 --expected_healthcheck_interval 30 --secrets_provider secretsprovider.ClearTextSecretsProvider --cluster_name local
2022-09-21T12:09:59,340+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Started process successfully
2022-09-21 12:09:59.569 INFO  [139811108165440] Client:88 | Subscribing on Topic :persistent://public/default/backwards
2022-09-21 12:09:59.569 INFO  [139811108165440] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-09-21 12:09:59.569 INFO  [139811108165440] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2022-09-21 12:09:59.569 INFO  [139811045275392] ExecutorService:41 | Run io_service in a single thread
2022-09-21 12:09:59.576 INFO  [139811045275392] ClientConnection:375 | [127.0.0.1:33306 -> 127.0.0.1:6650] Connected to broker
2022-09-21 12:09:59.583 INFO  [139811045275392] HandlerBase:64 | [persistent://public/default/backwards, public/default/reverse, 0] Getting connection from pool
2022-09-21 12:09:59.585 INFO  [139811028408064] ExecutorService:41 | Run io_service in a single thread
2022-09-21 12:09:59.614 INFO  [139811045275392] ConsumerImpl:224 | [persistent://public/default/backwards, public/default/reverse, 0] Created consumer on broker [127.0.0.1:33306 -> 127.0.0.1:6650]
2022-09-21 12:10:18.443 INFO  [139811045275392] HandlerBase:64 | [persistent://public/default/forwards, ] Getting connection from pool
2022-09-21 12:10:18.452 INFO  [139811045275392] ProducerImpl:189 | [persistent://public/default/forwards, ] Created producer on broker [127.0.0.1:33306 -> 127.0.0.1:6650]

Are you willing to submit a PR?

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.