SpinGo / op-rabbit

The Opinionated RabbitMQ Library for Scala and Akka
Other
232 stars 73 forks source link

RabbitSource only consumes when a log is placed #152

Closed afrancoc2000 closed 5 years ago

afrancoc2000 commented 6 years ago

There is something really weird happening to us. We have a queue we consume using RabbitSource like this:

op-rabbit {
  connection {
    virtual-host = "myvirtualhost"
    hosts = ["myserver.com"]
    username = "myuser"
    password = "mypassword"
    port = 5672
    automaticRecoveryEnabled = true
    requestedHeartbeat = 300
  }
}
val rabbitControlConfiguration: Config = ConfigFactory.load().getConfig("op-rabbit.connection")
val rabbitControl: ActorRef = system.actorOf(Props(new RabbitControl(ConnectionParams.fromConfig(rabbitControlConfiguration))), "rabbit-control")
val source = RabbitSource(
  rabbitControl = rabbitControl,
  channelDirective = channel(qos = 1),
  bindingDirective = consume(Queue.passive(queue)),
  handler = body(as[String]) & extract(x => {
    val headers = Option(x.properties.getHeaders).getOrElse(new java.util.HashMap[String, AnyRef]()).asScala
    headers.getOrElse("x-retry", 0)
  })
)
*** LogHelper.logger.info(s"Starting queue $queue") ***
source.mapAsync(1)(body => {
    processingFunction(body._1)
}).runForeach(x => ())(materializer)

With the LogHelper line the queue works normally without it it doesn't.

Why could this be happening?

We use version 2.1.0

Thanks

timcharper commented 6 years ago

That's a puzzling issue indeed. I have no idea :(

afrancoc2000 commented 6 years ago

We changed the code from the logger to a Thread.sleep(1000) and it did the work too. I think it's because the log takes some time to happen and something needs time to get initialized, but I can't figure what could it be. Maybe if I increase the hearthbeattimeout? some latency in the network maybe? what could it be?

timcharper commented 6 years ago

Hmm, there are some integration tests that I would expect to fail if that were the case. But, that's not to say it's impossible there's not a race somewhere that you're uncovering :(

timcharper commented 6 years ago

It'd be interesting to see logs from the rabbitmq server side; what does it see when the "race" is hit?

afrancoc2000 commented 6 years ago

Hmm there's no one I can ask for those logs right now, but give me until monday to post them. Thanks!

afrancoc2000 commented 5 years ago

Hi Tim,

We are running right now, there is nothing weird of different in Rabbit, only some security logs that seem to happen all the time, what I see here is that we have in each of the queues one unackaed message each.

The guys from infrastructure are sending me the logs now I will post them asap

afrancoc2000 commented 5 years ago

I found this errors initially:

=INFO REPORT==== 9-Sep-2018::05:50:51 === accepting AMQP connection <0.14052.1614> (xx.xxx.xx.x:xxxx -> x.xxx.xx.xx:5672)

=ERROR REPORT==== 9-Sep-2018::06:25:27 === closing AMQP connection <0.16326.1612> (xx.xxx.xx.x:xxxx -> x.xxx.xx.xx:5672): missed heartbeats from client, timeout: 60s

so as a first meassure we changed the heartbeat from 300 to 30, we are still experiencing problems, the second error we found is this:

=ERROR REPORT==== 10-Sep-2018::10:23:03 === Channel error on connection <0.14615.1641> (xx.xxx.xx.x:xxxxx -> x.xxx.xx.xx:5672, vhost: 'myhost.vh', user: 'myuser'), channel 17: operation queue.declare caused a channel exception not_found: "no queue 'op-rabbit.retry.myqueue' in vhost 'myhost.vh'"

Maybe I need to create the retry queues? I thought it created them automatically

afrancoc2000 commented 5 years ago

I added an extra log inside the mapAsync function and now the connection is not created I don't even get the unacked messages :(

afrancoc2000 commented 5 years ago

Ok, reviewing better the conection its created but the consumers aren't, and I don't get any logs, any ideas? EDIT: the connection is not created but we don't get any logs saying so, the only thing I get is:

[akka://myapp/user/rabbit-control/connection] null

afrancoc2000 commented 5 years ago

I changed to the non streams version and it started working, also I'm working inside an akka stream and The creation of the connection actor needs to be delayed until the cluster is formed.

Thanks ;)