JuliaComputing / AMQPClient.jl

A Julia AMQP (Advanced Message Queuing Protocol) / RabbitMQ Client.
Other
39 stars 21 forks source link

Consumer not reading the existing messages from Queue #26

Closed marosko89 closed 3 years ago

marosko89 commented 3 years ago

When a message is in queue or msg is unacked(after restart consumer is msg returns back to queue) - consumer(webservice ) cannot process another message from the queue. Subscribing consumer to a non-empty queue with unacknowledged messages occasionally causes the consumer to hang (after consuming, but not acknowledging the first message in the queue)

@tanmaykm I have code which demostraiting this

module BugUnacked

using AMQPClient

# include("loggerWrapper.jl") #1 

const RESPONSES_QUEUE = "qResponses"
ENV["RMQ_USER"] = "user"
ENV["RMQ_PASS"] = "pass"
ENV["RMQ_HOST"] = "8.8.8.8"

function initAMQP()
  #1 with_logger(LoggerWrapper(current_logger())) do
    rabbitMQ()
  #1 end
  println("initAMQP successful")
end

function rabbitMQ()
  AMQPClient.CLIENT_IDENTIFICATION["connection_name"] = "Web Server"
  auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>ENV["RMQ_USER"], "PASSWORD"=>ENV["RMQ_PASS"])
  host = ENV["RMQ_HOST"]
  println("connecting to RabbitMQ host: $host")
  global AMQP_CONN = connection(;virtualhost="/", host=host, port=5672, auth_params=auth_params)
  atexit(() -> isopen(AMQP_CONN) && close(AMQP_CONN))
  global AMQP_CHANNEL = channel(AMQP_CONN, AMQPClient.UNUSED_CHANNEL, true)
  basic_qos(AMQP_CHANNEL, 0, 1, true)
  declareQueue(RESPONSES_QUEUE)
  sleep(1) #2. FIXED bug with yield(). If there is something in the queue, you must not call declareQueue and subscribeConsumer immediately after each other. See too src/loggerWrapper.jl
  subscribeConsumer(RESPONSES_QUEUE)
end

function consumer(msg)
  response = String(msg.data)
  id = String(copy(msg.properties[:correlation_id].data))
  while true
    try
      x = sum(sin.(rand(50_000_000)))
      break
    catch exc
      st = stacktrace(catch_backtrace())
      println(exc)
      println.(st)
      println("[AMQP consumer]\tinserting job $id has failed.")
    end
  end
  basic_ack(AMQP_CHANNEL, msg.delivery_tag)
end

function declareQueue(queue::String)
  success, queue_name, message_count, consumer_count = queue_declare(AMQP_CHANNEL, queue, durable=true)
  if success
    println("queue $queue declared successfully, message_count: $message_count, consumer_count: $consumer_count")
  else
    println("declaration of queue $queue failed")
    exit(1)
  end
end

function subscribeConsumer(queue::String)
  success, consumer_tag = basic_consume(AMQP_CHANNEL, queue, consumer)
  if success
    println("consumer on queue $queue registered with tag $consumer_tag")
  else
    println("subscribing consumer to queue $queue failed")
    exit(1)
  end
end
[](url)

function main()
  initAMQP()
end

end # end of module

I run the consumer with

using BugUnacked; BugUnacked.main()
while true
  sleep(5)
end

This probably also relates to issue #14. We at Tangent Works tried to fix it with the following wrapper (loggerWrapper.jl - apply with uncommented comments with 1.):

# This is a workaround for a bug in AMQPClient package:
# Subscribing consumer to a non-empty queue occasionally causes the consumer to hang
# (after consuming, but not acknowledging the first message in the queue).
# However, the bug doesn't happen if debug messages are turned on.
# Printing debug messages implicitly calls yield().
# So the workaround is to call yield() explicitly for each @debug in AMQPClient.
# Julia's built-in logging system makes this possible (by creating custom logger).

using Logging

struct LoggerWrapper <: AbstractLogger
  logger::T where T <: AbstractLogger
end

function Logging.handle_message(logger::LoggerWrapper, level, message, _module, group, id, args...; kwargs...)
  (_module == AMQPClient && level == Logging.Debug) && yield()
  if Logging.shouldlog(logger.logger, level, _module, group, id) && level ≥ Logging.min_enabled_level(logger.logger)
    Logging.handle_message(logger.logger, level, message, _module, group, id, args...; kwargs...)
  end
end

Logging.shouldlog(logger::LoggerWrapper, level, _module, args...) =
  Logging.shouldlog(logger.logger, level, _module, args...) || (_module == AMQPClient && level == Logging.Debug)
Logging.min_enabled_level(logger::LoggerWrapper) = min(Logging.Debug, Logging.min_enabled_level(logger.logger))
Logging.catch_exceptions(logger::LoggerWrapper) = Logging.catch_exceptions(logger.logger)

however, this workaround is helpful though in some cases consumer will remain hanging and we had to use the sleep function which in the background calls yield. sleep(1) #2. FIXED bug with yield().

Can you please look at this issue and inform us?Thx.

tanmaykm commented 3 years ago

I shall take a look at it. Thanks for reporting.

tanmaykm commented 3 years ago

Unfortunately I was unable to replicate the issue with the above consumer implementation.

I will try more and dig deeper. But would it be possible to post the code for a producer too that can be used to replicate this on a standalone local setup?

marosko89 commented 3 years ago

I used the standard RabbitMQ Management interface to capture the issue. My simple message looked exactly like this. image However, the problem surfaced in the web service as a consumer, and the compiled AOT worker was a producer. But it's the same.

tanmaykm commented 3 years ago

Thanks. I am able to replicate the issue now. Will post a fix.