CopernicaMarketingSoftware / AMQP-CPP

C++ library for asynchronous non-blocking communication with RabbitMQ
Apache License 2.0
864 stars 334 forks source link

Q: What is the correct way to run in multithreading env #520

Closed matwey closed 6 months ago

matwey commented 6 months ago

Hello,

I apology creating the issue here because there are no links to community support in the README.

What I am trying to do. I use the libboostasio handler to run multi-threading consumer. I created N channels consuming the same queue and started io_context.run() in N+1 threads. So far so good, but I've found that onReceived() callbacks are executed only in one thread at the same time. It is an issue for me, because my callbacks do some heavy job. I would like this job to be done in parallel but I efficiently have single-thread consumer currently.

I would like somebody to confirm that it is expected behavior, because README says nothing about onReceived() callback should not block.

EmielBruijntjes commented 6 months ago

If you need to do some heavy lifting by the consumer, and you do not mind that this blocks other amqp operations, you can just do that without any worries (but do make sure that you disable the heartbeats so that the rabbitmq server does not break the connection because of inactivity).

If you want the application to stay responsive during the cpu intensive operations, start a new thread and do the work there. That is just standard c++ stuff for which you can fibd examples everywhere.

Anyway, amqp-cpp objects (like almost all classes in c++) are not thread safe. You cannot consume in one thread and do something else with the same connection in a different thread.

matwey commented 6 months ago

If you need to do some heavy lifting by the consumer, and you do not mind that this blocks other amqp operations, you can just do that without any worries (but do make sure that you disable the heartbeats so that the rabbitmq server does not break the connection because of inactivity).

If you want the application to stay responsive during the cpu intensive operations, start a new thread and do the work there. That is just standard c++ stuff for which you can fibd examples everywhere.

This is exactly what I am doing on top of Boost ASIO.

Anyway, amqp-cpp objects (like almost all classes in c++) are not thread safe. You cannot consume in one thread and do something else with the same connection in a different thread.

Do you mean that writing to different channel objects isn't thread-safe too?

EmielBruijntjes commented 6 months ago

If the channels all use the same connection, it is indeed not thread safe. If you use a different connection in each thread it is ok.

matwey commented 6 months ago

Thanks, I see now. Bad news is that current LibBoostAsioHandler implementation is unusable in multi-threading environment.

EmielBruijntjes commented 6 months ago

I don't know about that (and whether it is true) - but that is outside of the scope of AMQP-CPP

matwey commented 6 months ago

For the record. Any user code invoking channel.publish() should be run in the same strand as internal read/write completion handlers inside LibBoostAsioHandler. The possible solution is to pass any_io_executor to LibBoostAsioHandler instead of io_context, this would allow user to provide strand<> executor to LibBoostAsioHandler.