Graylog2 / graylog2-server

Free and open log management
https://www.graylog.org
Other
7.24k stars 1.05k forks source link

GELF AMQP Input not support some RabbitMQ settings such as transient and/or quorum queues #7747

Open Slapper opened 4 years ago

Slapper commented 4 years ago

What?

We have some queues on our RabbitMQ cluster and we want Graylog to consume these messages with GELF AMQP input. These RabbitMQ queues are set with Durability = transient . But from what i understand Graylog GELF AMQP Input works only with durability = durable

if i try to start the input and the queue on RabbitMQ is transient i get the following error

2020-03-16T12:21:55.710Z ERROR [InputLauncher] The [org.graylog2.inputs.gelf.amqp.GELFAMQPInput] input with ID <5e6f6dfda8617091da0cbaf5> misfired. Reason: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - **inequivalent arg 'durable' for queue 'name_of_the_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10**). org.graylog2.plugin.inputs.MisfireException: org.graylog2.plugin.inputs.MisfireException: Could not launch AMQP consumer. at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:158) ~[graylog.jar:?] at org.graylog2.shared.inputs.InputLauncher$1.run(InputLauncher.java:84) [graylog.jar:?] at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:181) [graylog.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232] Caused by: org.graylog2.plugin.inputs.MisfireException: Could not launch AMQP consumer. at org.graylog2.inputs.transports.AmqpTransport.doLaunch(AmqpTransport.java:179) ~[graylog.jar:?] at org.graylog2.plugin.inputs.transports.ThrottleableTransport.launch(ThrottleableTransport.java:76) ~[graylog.jar:?] at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:155) ~[graylog.jar:?] ... 7 more Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) ~[graylog.jar:?] at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) ~[graylog.jar:?] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) ~[graylog.jar:?] at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962) ~[graylog.jar:?] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) ~[graylog.jar:?] at org.graylog2.inputs.transports.AmqpConsumer.run(AmqpConsumer.java:105) ~[graylog.jar:?] at org.graylog2.inputs.transports.AmqpTransport.doLaunch(AmqpTransport.java:176) ~[graylog.jar:?] at org.graylog2.plugin.inputs.transports.ThrottleableTransport.launch(ThrottleableTransport.java:76) ~[graylog.jar:?] at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:155) ~[graylog.jar:?] ... 7 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'name_of_the_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[graylog.jar:?] at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[graylog.jar:?] at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[graylog.jar:?] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[graylog.jar:?] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[graylog.jar:?] at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962) ~[graylog.jar:?] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) ~[graylog.jar:?] at org.graylog2.inputs.transports.AmqpConsumer.run(AmqpConsumer.java:105) ~[graylog.jar:?] at org.graylog2.inputs.transports.AmqpTransport.doLaunch(AmqpTransport.java:176) ~[graylog.jar:?] at org.graylog2.plugin.inputs.transports.ThrottleableTransport.launch(ThrottleableTransport.java:76) ~[graylog.jar:?] at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:155) ~[graylog.jar:?] ... 7 more

Why?

GELF AMQP Input cannot start & fails with the errors above if rabbitMQ setting on queue is set : x-queue-type=quorum and/or Durability=transient

It would be great during the creation of GELF AMQP Input to have the following extra options available :

Your Environment

radykal-com commented 4 years ago

Hi,

I've been checking this one. And the solution seems pretty simple. Let's me explain it:

With the current code, graylog declares AMQP queues like they don't exist, if they do and the creation options don't match leads to the errors that you described.

We could add a new option (checkbox) to the AMQP input like "Queue already exists" or "Don't create queue", so it won't try to create it and only connect to the existing one with whatever options it was created.

If the opion is enabled, we can declare the queue with the option described as "Passive Declaration" in the AMQP client documentation: https://www.rabbitmq.com/api-guide.html#passive-declaration

Let's wait for a maintainer to say if they agree with this solution and I can help with it.

Slapper commented 4 years ago

Hi @radykal-com ,

Yeap i agree with you, it would be nice to have this functionality in AMQL GELG input. Graylog input should by agnostic by other options that rabbitmq queue has . So let's wait for a maintainer to check this approach.

Regards,

bernd commented 4 years ago

@radykal-com I think it would be good if we can expose the missing options. In addition we could experiment with the "passive declaration" and check if this has any drawbacks. :+1:

radykal-com commented 4 years ago

Good @bernd I'll start by adding some AMQP integration tests with testcontainers.

Well... actual code related with AmqpTransport & AmqpConsumer is not very testable..

Slapper commented 4 years ago

hello!

Any news for adding these options on AMQP GELF Input ?

radykal-com commented 4 years ago

Sorry, I did some tests and it could work but creating tests for the actual code is not an easy task and I dont have much time lately

Slapper commented 4 years ago

hi @radykal-com

Thanks for you response! i can't wait for this feature in AMQP GELF input :) it will really give us great flexibility to handle large amount of msgs from RabbitMQ

Cheers!

gastoncantera commented 3 years ago

Hi @radykal-com

I just tested quorum queues using -as you suggested- RabbitMQ passive declaration and works really well! I used 3.3.2 as a base

Regards!

fflow commented 2 years ago

Hi @radykal-com, we have a similar problem - but it's related to the max length attribute of a queue. We would like to set that and other attributes of a queue - but the options on the GELF AMQP dialog are rather limited.

To avoid overflowing scenarios in case graylog is done the max-length options would be very helpful. Any news on when that topic is getting picked up again?

Best Regards Florian

davama commented 3 weeks ago

Hello, Sorry for the bump. Wondering if this FR is still being considered. Best, Dave