noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Queue crashed with reason timeout to call RabbitMQMessageDeduplication.CacheManager #94

Closed OliverLiangx closed 1 year ago

OliverLiangx commented 1 year ago

Hey I got some promblems when using this plugin. Here are the error logs from rabbitMq:

2022-10-31 20:16:22.637 [info] <0.12710.131> supervisor: {<0.12710.131>,rabbit_amqqueue_sup}, errorContext: child_terminated, reason: {timeout,{'Elixir.GenServer',call,['Elixir.RabbitMQMessageDeduplication.CacheManager',{destroy,cache_queue__amq_gen_ioc_40bkebjbxaqlfhx6zq},5000]}}, offender: [{pid,<0.16620.131>},{id,rabbit_amqqueue},{mfargs,{rabbit_prequeue,start_link,[{amqqueue,{resource,<<"/">>,queue,<<"amq.gen-ioC-40bKEbjBxAQlfhX6ZQ">>},false,true,<0.6150.0>,[{<<"x-message-deduplication">>,bool,true}],none,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"combee">>},rabbit_classic_queue,#{}},declare,<0.17214.131>]}},{restart_type,intrinsic},{shutdown,30000},{child_type,worker}]
2022-10-31 20:16:22.638 [error] <0.12710.131> Supervisor {<0.12710.131>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"amq.gen-ioC-40bKEbjBxAQlfhX6ZQ">>},false,true,<0.6150.0>,[{...}],...}, declare, <0.17214.131>) at <0.16620.131> exit with reason {timeout,{'Elixir.GenServer',call,['Elixir.RabbitMQMessageDeduplication.CacheManager',{destroy,cache_queue__amq_gen_ioc_40bkebjbxaqlfhx6zq},5000]}} in context child_terminated
2022-10-31 20:16:22.639 [error] <0.31616.131> Restarting crashed queue 'amq.gen-kbak8aBJM__lFa5z2WILsQ' in vhost '/'.
2022-10-31 20:16:22.639 [error] <0.16072.131> ** Generic server <0.16072.131> terminating

It seems CacheManager has some problems in destroying cache for queue. Any idea how to avoid this siutation? It happens 26 times for one day in out system and will cause queue restarting frequently.

noxdafox commented 1 year ago

Hello,

How do you trigger the above error? Are you explicitely deleting the queue? Is this a transient queue and it's being removed automatically as there are no more consumers?

OliverLiangx commented 1 year ago

Hello, In our system, we will create many temporary queue for listening a job completed message and they will be deleted after finishing handle completed message. I guess when rabbitmq deleted these queue automatically and then trigger above error. And queues will be restarted without consumer which causes much message stuck in these queue. It's really a big trouble for us.

OliverLiangx commented 1 year ago

We will change our solution to avoiding frequently creating and deleting temporary queue to see if this problem still happen. Hope you can find out why timeout call on CacheManager will cause queue crashing down and restarting.

noxdafox commented 1 year ago

I need to know which parameters you use to create the queue. Do you have a minimum reproducible example? A code snippet that once used against RMQ shows the problem?

I just tested queue deletion mechanisms and it works so I cannot reproduce the issue with this little amount of information.

Same with auto-delete queue creation. Once the consumer disconnects, the queue is correctly removed and no crash appears.

OliverLiangx commented 1 year ago

We declare temporary queue with these args:

"name" = "", (This will resuse queue name from channel.)
"durable" = false,
"exclusive" = true,
"autodelete" = true,
"x-message-deduplication" = true

And here is our code that possibly causes this problem:

    private final ConcurrentMap<String, BasicJobFuture> jobs = new ConcurrentHashMap<>();
    private final ConcurrentMap<String, Map.Entry<EventListener, AtomicInteger>> completionCounter =
            new ConcurrentHashMap<>();

    private JobStatus scheduleJob(Job job) {
        completionCounter.compute(
                job.getName(),
                (name, pair) -> {
                    if (pair != null) {
                        pair.getValue().incrementAndGet();
                        return pair;
                    }
                    var eventName = "job_completed." + job.getName();
                    var listener = new CompleteJobFutureOnJobCompleted(eventName);
                    eventDispatcher.enlist(listener);
                    return Map.entry(listener, new AtomicInteger(1));
                });
        publish(job);
        return new JobStatus(job);
    }

    class CompleteJobFutureOnJobCompleted extends AbstractEventListener<JobCompleted> {

        private final String eventName;

        CompleteJobFutureOnJobCompleted(@NonNull String eventName) {
            this.eventName = eventName;
        }

        @Override
        public String getEventName() {
            return eventName;
        }

        @Override
        public boolean handle(JobCompleted jobCompleted) {
            var key = Jobs.getKey(jobCompleted.getName(), jobCompleted.getId());
            var future = jobs.remove(key);

            future.setStatus(status);
            var isContinue = new AtomicBoolean(true);
            completionCounter.computeIfPresent(
                    jobCompleted.getName(),
                    (name, pair) -> {
                        var counter = pair.getValue().decrementAndGet();
                        if (counter == 0) {
                            isContinue.setPlain(false);
                            return null;
                        }
                        pair.getValue().decrementAndGet();
                        return pair;
                    });
            return isContinue.getPlain(); // when false return , consumer will be canceled.
        }    

When we schedule a job (just a rabbitmq message), we will create a listener for job completed message and close the connection when last job completed message handled. This listener will be declare as a temporary queue with above RMQ args. When the amount of job will get to 100,000 - 200,000, maybe more. It will causes RMQ connection close and auto-delete queue frequently. In this situation, calling CacheManager will get timeout error which makes these queue crashed down and restarted. Then these temporary queue without consumer will exists in RMQ and message accumulated.

We're sure that our application on temporary queue is not right. But when we disable message-deduplication plugin, RMQ will work normally and no temporary queue will be crashed down.

noxdafox commented 1 year ago

I tried to reproduce the problem with the provided queue parameters and I could not.

I also tried to brute-force a node with lots of queues created, filled up and destroyed. I did not observe any issue.

I think that the reason why CacheManager.destroy is timing out is due to the fact that it hangs when trying to interface with Mnesia. This usually indicates that there is a problem with Mnesia DB itself. Mostly due to an ongoing or an unresolved network partition.

Did you suffer from network partitions recently? Some node went down with a crash? Some node was replaced? Have you tried restarting the whole cluster?

https://www.rabbitmq.com/partitions.html

OliverLiangx commented 1 year ago

We only deploy one node for RMQ. Therefore I think it's not related to network partitions. We got lots of error logs:"2022-11-04 17:50:19.457 [error] <0.24860.381> closing AMQP connection <0.24860.381> (ip:12321 -> ip:5672):" which show no any causes. And we are trying to figure out why this problems happen first. Thanks for your patience on helping me on this. We will try to fix this problem and once we made it, we will try message-deduplication plugin again to see if problem is fixed.