apache / rocketmq-client-python

Apache RocketMQ python client
https://rocketmq.apache.org/
Apache License 2.0
274 stars 95 forks source link

[ISSUE #99]Add a function which shows how to use rocketmq in multi-threaded scen… #100

Closed tom0392 closed 4 years ago

tom0392 commented 4 years ago

Resolves #99

codecov-io commented 4 years ago

Codecov Report

Merging #100 into master will not change coverage. The diff coverage is n/a.

Impacted file tree graph

@@           Coverage Diff           @@
##           master     #100   +/-   ##
=======================================
  Coverage   82.14%   82.14%           
=======================================
  Files           4        4           
  Lines         532      532           
=======================================
  Hits          437      437           
  Misses         95       95           

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 58397dc...89ad6bd. Read the comment docs.

messense commented 4 years ago

No real world applications should do this kind of hack.

tom0392 commented 4 years ago

@ifplusor Hi, Could you gvie some specific reasons for further discussion?

tom0392 commented 4 years ago

@messense Hi, this is indeed a special scene. However, in the production debugging and test environment, we encountered this problem repeatedly and we think the submitted code is one way to avoid the errors after the actual test verification. Of course, we look forward to other methods.

First suggestion When the producer sends a message successfully, the program will return SendStatus.OK=0 to the ret variable. So we think when the producer fails to send a message, the program should return an error code to the ret accordingly and not to throw an exception to prevent the thread from running the rest of code, especially the following step: producer.shutdown()

ifplusor commented 4 years ago

First, don't call start() and shutdown() under multi-threads; secondly, C++ SDK has retry logic.

ifplusor commented 4 years ago

You should report the error of sending, but call shutdown() is unrecommended.

ifplusor commented 4 years ago

send_sync is thread-safe, protect it with mutex is unnecessary.

messense commented 4 years ago

Producer should be reused, not start() and shutdown() repeatedly.

For example

from concurrent.futures import ThreadPoolExecutor, wait
from rocketmq.client import Producer, Message

producer = Producer('PID-XXX')
producer.set_name_server_address('127.0.0.1:9876')

def send_msg():
    msg = Message('YOUR-TOPIC')
    msg.set_keys('XXX')
    msg.set_tags('XXX')
    msg.set_body('XXXX')
    ret = producer.send_sync(msg)
    print(ret.status, ret.msg_id, ret.offset)

def main():
    executor = ThreadPoolExecutor()
    # start producer
    producer.start()
    # submit tasks
    futures = []
    for _ in range(10):
        futures.append(executor.submit(send_msg))
    wait(futures)
    producer.shutdown()

if __name__ == '__main__':
    main()
tom0392 commented 4 years ago

@messense Thanks for your good suggestions! Our application scenario is to send data once every 5 minutes. Maybe this frequency is acceptable to start() and shutdown() every time. As long as the customer's business is running, it needs to keep sending data. So we use apscheduler module: from apscheduler.schedulers.background import BackgroundScheduler and make the program a systemctl service. So far, the service is running stable.

tom0392 commented 4 years ago

send_sync is thread-safe, protect it with mutex is unnecessary.

@ifplusor Thank you for the same. We will test it without mutex.