apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
53 stars 43 forks source link

[BUG] use celery task send data to pulsar, init pulsar client error #150

Open dengshaochun opened 1 year ago

dengshaochun commented 1 year ago

python: 3.6 pulsar-client: pulsar-client[avro]==2.10.2 celery: 5.1.2

code example:

#!/usr/bin/env python

import time
import random
import string
from pulsar import Client, CompressionType
from pulsar.schema import AvroSchema, Record, String, Integer

def generate_random_string(length=6):
    charset = string.ascii_letters + string.digits
    random_chars = random.choices(charset, k=length)
    random_string = "".join(random_chars)
    return random_string.capitalize()

class User(Record):
    name = String()
    age = Integer

UserAvroSchema = AvroSchema(User)  # type: ignore

def gen_random_data():
    return User(user=generate_random_string(), age=random.randint(0, 100))

class PulsarDemo(object):
    def __init__(self) -> None:
        self.SERVICE_URL = "pulsar://***"
        self.TOPIC = "persistent://****"
        client = Client(service_url=self.SERVICE_URL)
        self.producer = client.create_producer(
            topic=self.TOPIC,
            schema=UserAvroSchema,
            batching_enabled=True,
            batching_max_messages=1000,
            batching_max_publish_delay_ms=1000,
            compression_type=CompressionType.SNAPPY,  # type: ignore
        )

    def send_callback(self, send_result, msg_id):
        print("Message published: result:{}  msg_id:{}".format(send_result, msg_id))

    def async_producer(self, cnt=1000):
        while cnt >= 0:
            data = gen_random_data()
            self.producer.send_async(
                data,
                callback=self.send_callback,
            )
            time.sleep(0.01)
            cnt -= 1
        self.producer.flush()

# celery task
from celery import shared_task
@shared_task
def mock_data2pulsar(cnt=1000):
    mock = PulsarDemo()
    mock.async_producer()

an exception occurred at:

[2023-08-29 10:27:46,933: ERROR/ForkPoolWorker-31] Pulsar error: TopicNotFound
BewareMyPower commented 1 year ago

I'm new to celery. Could you explain how to run the script you provided?

dengshaochun commented 1 year ago

It cannot be fully reproduced, but the online environment has always been problematic. The test of a separate celery + pulsar code is normal. At present, there is no idea. The online environment reports the following errors:

 p._producer = self._client.create_producer(topic, conf)
_pulsar.TopicNotFound: Pulsar error: TopicNotFound
0000-00-00 00:00:00.000 WARN  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 WARN  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 WARN  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [

The online environment only has problems when calling celery.delay, and there will be no exception when calling the function directly,as follows

# ok
mock_data2pulsar(cnt=1000)

# exception
mock_data2pulsar.delay(cnt=1000)

debug pulsar/__init__.py print topic and conf before p._producer = self._client.create_producer(topic, conf) looks ok

gromsterus commented 1 year ago

Hi @dengshaochun. I tried to reproduce the bug but the code works. There are suspicions that the problem is in the name of topic.

Here my example: https://github.com/gromsterus/issues-sandbox/tree/main/pulsar-client-python-150 I use persistent://public/default/test-celery.

If you provide the complete code with celery initialization it will be easier to help 🤝