alanxz / rabbitmq-c

RabbitMQ C client
MIT License
1.76k stars 669 forks source link

amqp_channel_flow usage to indicate for publisher that queue is not ready #811

Open arun-sathya opened 8 months ago

arun-sathya commented 8 months ago

Want to check if the usage of amqp_channel_flow is correct or not ?

Version Details

Scenario Consumer starts consuming messages from a queue(with exclusive, auto-delete bits set). An upper limit of x-max-length-bytes is set, overflow is set to reject-publish. Internally, Consumer keeps track of how much data is in processing state in bytes and will use amqp_channel_flow method to toggle the consumer consuming/pausing the messsage contents from the queue.

Publisher checks if queue is ready to accept the message before sending, In Order to achieve, publisher redeclares the queue in passive mode and checks consumer_count. If consumer count is zero, tries to resend after some time.

When amqp_channel_flow(conn,1,0) is used by consumer to pause the reading contents from queue, an connection exception is thrown saying _operation channel.flow caused a connection exception notimplemented: "active=false"

Pseudo Code Of Consumer { // declares a exclusive auto-delete queue Q1 amqp_queue_declare_ok_t *) declare_ok = amqp_queue_declare(conn,channel, Q1 , 0, 0, 1, 1, amqp_empty_table);

//basic consume amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

while(1) { amqp_rpc_reply_t res; amqp_envelope_t envelope; amqp_maybe_release_buffers(conn); res = amqp_consume_message(conn, &envelope, NULL, 0); if (AMQP_RESPONSE_NORMAL != res.reply_type) { break; } //under some condition consumer wants no more messages to be consumed for some time(too much content is already in //pipeline) . Want to channel.flow event. **amqp_channel_flow(conn,1,false) //this is throwing connectoin exception saying operation channel.flow caused a connection exception not_implemented: "active=false" **

// after some time want to resume consumer to accept more messages from the queue amqp_channel_flow(conn,1,true); }

} Pseudo Cod Publisher { //want to check if consumer is ready to accept before sending. declare the exclusive queue in passive mode (amqp_queue_declare_ok_t *) declare_ok = amqp_queue_declare(conn,channel, Q1 , 1, 0, 0, 1, amqp_empty_table); if(decalre_ok-> consumer-count) //publish message else //retry after some time }