Open anindyaju99 opened 8 years ago
Hi, Sorry for the delay, I was off in the mountains! And sorry for asking a basic question, but are you sure that your producer program does not exit before all messages have actually been pushed to the broker ? (see https://github.com/sstone/amqp-client/issues/72 for something similar). Is there a way that you can package a small, complete sample so that I can try and reproduce the problem ? Thanks
I am absolutely sure that the producer program has not exited, because when I am testing locally my producer and consumer actors are in the same executable.
I will try to reproduce with a self contained example.
This might be a bug in the way the producer actor is killed. For this specific use case I iterate over the messages and send them to a producer. Once the loop is done I send a PoisonPill message to the producer. It seems the producer actor and the RMQ channel is getting closed before the messages I have already sent are sent to RMQ.
For now I have added a delay (thread sleep) after sending the last message to producer actor to delay issuing the PoisonPill.
Let me know if this information helps in finding the issue.
In one of my applications I am generating an array of messages and iterating over them and pushing them (Publish) to the producer.
I am creating the producer as follows - val deadLetterOption = config.deadletter match { case true => clientProps ++ Map("x-dead-letter-routing-key" -> config.deadletterRoutingKey, "x-dead-letter-exchange" -> config.deadletterExchange) case _ => clientProps } val ttlOption = config.ttl > 0 match { case true => deadLetterOption ++ Map("x-message-ttl" -> new Integer(config.ttl)) case false => deadLetterOption } val channelParameters = Option(ChannelParameters(1)) val exchangeParams = ExchangeParameters(name = exchange, passive = false, exchangeType = "direct", durable = true, autodelete = false, clientProps) val queueParams = QueueParameters(queueName, passive = false, durable = true, exclusive = false, autodelete = false, clientProps) val producer = ConnectionOwner.createChildActor(connection, ChannelOwner.props(channelParams = channelParameters), timeout = timeout.second) Amqp.waitForConnection(system, producer).await() producer ! DeclareExchange(exchangeParams) producer ! DeclareQueue(queueParams) producer ! QueueBind(queue = queueName, exchange = exchange, routing_key = queueName)
I am getting the following in my deadletter watcher -
Publish(my_ex,my_q,[B@f94d482,None,true,false)
When I generate 100k messages, I get this dropped messages for > 10k messages. I am using unbounded mailbox (unless this library uses some other mailbox from code).
What could be the reason for these drops?