ricbra / rabbitmq-cli-consumer

Consume RabbitMQ messages into any cli program
MIT License
247 stars 49 forks source link

Process stops processing messages after certain amount of time #28

Open alsar opened 8 years ago

alsar commented 8 years ago

I'm using your library in combination with Symfony console commands. I set it up, configured it added it to Supervisor and started 2 processes. Everything works fine, but after a few hours (around 3-5) it just stops processing messages from the queue. I then have to restart the processes and then it starts processing messages again. But again, just for few hours.

I looked into log files but its nothing there. Have you any idea why this could be happening?

My config file:

[rabbitmq]
host = localhost
username = user
password = pass
vhost = /media
queue = media-converter
compression = Off

[exchange]
name = media-converter
type = direct
durable = On

[logs]
error = /home/user/log/rabbitmq-cli-error.log
info = /home/user/log/rabbitmq-cli-info.log

I'm using the v1.1 (apt package) on Ubuntu 14.04.

ricbra commented 8 years ago

Could you also post your supervisord config? And both logs are completely empty? Not a single line?

alsar commented 8 years ago
[program:project_dev_rabbitmq-cli-convert]
command=/usr/bin/rabbitmq-cli-consumer -e "/home/user/development/project/current/app/console convert-upload --rabbitmq" -c /home/user/config/rabbitmq_cli_project_dev.conf
process_name=%(program_name)s_%(process_num)02d
numprocs=2
autostart=true
autorestart=unexpected
user=user
stdout_logfile=/home/user/log/project_dev/convert-out.log
stderr_logfile=/home/user/log/project_dev/convert-error.log

I cleared all logs today before I started testing. After the process stopped processing messages I checked the logs from rabbitmq-cli-consumer and supervisord and nothing was in it.

alsar commented 8 years ago

The problem seems to be in the Supervisor config. The minimal config seems to work now:

[inet_http_server]
port = 127.0.0.1:9001
username=user
password=pass

[unix_http_server]
file=/var/run/supervisor.sock
chmod=0700

[supervisord]
logfile=/tmp/supervisord.log
pidfile=/var/run/supervisord.pid

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///var/run/supervisor.sock

[include]
files = /etc/supervisor/conf.d/*.conf

Before that i had a much bigger config, but I didn't investigate further, which line was causing the problem. For now it works - after 15 hours.

alsar commented 8 years ago

It seems that the problem still exists. It worked for like 20 hours, but then stopped to processing messages. I restarted the Supervisor process and then again stopped processing after 5 hours.

I setup a test script that I run through Supervisor and it runs without a problem for 2 days now. So Supervisor should not be the problem. I think the problem lies somewhere in the rabbitmq-cli-consumer.

ricbra commented 8 years ago

What do you mean with "stopped processing messages"? Is the consumer still connected to rabbitmq? Or does it die out and supervisor stops restarting it after it fails too many times?

Also, I really need some logs from rabbitmq-cli-consumer before I can say anything useful about the problem you describe.

alsar commented 8 years ago

The error log is empty and in the stdout log is just

Waiting for messages...
2016/02/09 11:22:04 Processing message...
2016/02/09 11:22:09 Processed!
2016/02/09 11:22:09 Processing message...
2016/02/09 11:22:13 Processed!
...

By "stopped processing messages" I mean that the process is still running, but no messages are processed anymore.

I just discovered that the consumer disconnects from RabbitMQ. So the process is running, but after certain amount of time (few hours) it disconnects from RabbitMQ.

I'll keep an eye on the RabbitMQ log to see what happens when the rabbitmq-cli-consumer disconnects.

ricbra commented 8 years ago

There should indeed be some info in the RabbitMQ log when rabbitmq-cli-consumer disconnects.

alsar commented 8 years ago

After 21 hours it stoped working and I got this in the RabbitMQ log:

=WARNING REPORT==== 11-Feb-2016::09:08:34 ===
closing AMQP connection <0.342.0> (127.0.0.1:50993 -> 127.0.0.1:5672):
client unexpectedly closed TCP connection

Supervisor still shows that the rabbitmq-cli-consumer is running.

ricbra commented 8 years ago

What is your server setup? OS, version, multiple servers?

Is it possible to create a vagrant environment in which I can reproduce the errors? We have rabbitmq-cli-consumers processes which are running for months, so I think something in your environment is triggering this behaviour.

alsar commented 8 years ago

It's a Ubuntu 14.04 machine. rabbitmq-cli-consumer v1.1.0 is installed via the APT repository. Supervisor 3.2.1 installed via pip. RabbitMQ is v3.6.0.

Locally on my Vagrant environment I didn't encounter this problem.

alsar commented 8 years ago

I setup another server (Ubuntu 15.10) and there it currently works for two days. I hope that it will work from now on. Thanks for your help.

raul-dan commented 8 years ago

@alsar I'm having the same problems. I have ~5 consumers and they all seem to die after 2-3 days. Did you find anything else that could lead to solving this problem?

ricbra commented 8 years ago

@lrauldan can you tell me about your situation? What OS etc?

We're on Debian Wheezy and RabbitMQ 3.5.1. Our consumers are running for months with no restart at all. Our queues are quiet only during the nights.

We need to find out how to reproduce this problem.

raul-dan commented 8 years ago

@ricbra we're using ubuntu 14.04 on aws EC2, a t2.micro for staging purposes, RabbitMQ 3.5.6, Erlang R16B03, consumer version 1.1.0.

I have no experience with go but I was wondering if there is a way to check here if the TCP connection has gone away and to reconnect the consumer?

alsar commented 8 years ago

I'll reopen the issue, because it seems that I'm not the only one that has/had this problem. On my new 15.10 server it now works for 4 days.

Maybe this problem is only present on Ubuntu 14.04, as both I and @lrauldan have this problem on the same Ubuntu version.

alsar commented 8 years ago

But on the other side I didn't have any problems on Vagrant with Ubuntu 14.04.

raul-dan commented 8 years ago

Neither did I but after a couple of days on staging we had over 100 messages stuck in our queues. Once I restarted supervisor all of them went thru and got processed without problems.

I will fork the consumer over the weekend and see if I can debug it and see what happens when the connection with rabbitmq gets lost.

ricbra commented 8 years ago

The connection itself is handled by streadway/amqp library (https://github.com/streadway/amqp). Maybe there is something usefull in the bug tracker on their Github repository?

Nevertheless it could also be caused by something in rabbitmq-cli-consumer, so looking forward to your findings.

ricbra commented 8 years ago

Any news on this @lrauldan ?

sroze commented 8 years ago

I have the same problem with the release 1.1.0. Somehow, I don't have any issues on another project with only one consumer. My configuration is the following:

[rabbitmq]
host = rabbitmq
username = *******
password = *******
vhost=/
port=5672
queue=river_commands
compression=Off

[logs]
error =/dev/stderr
info = /dev/stdout

[prefetch]
count=1
global=Off

[exchange]
name=river_commands
autodelete=Off
type=direct
durable=On
raul-dan commented 8 years ago

@ricbra I can confirm that we were not able to reproduce the issue again. The consumers have been running for a couple of months now and none hanged again.

hellracer commented 7 years ago

Hi Ric,

I encounter the same problem as others I also run the script under supervisord with minimal config there was no error on supervisord as well and the script status is running as per supervisord.

https://github.com/streadway/amqp as per your suggestion on the other post I visit that page and found this information on that github page, I don't know if this is related or not :)

mattwilliamson commented 17 days ago Just to make sure you don't chase your tail, the memory leak goes away when using QoS of 1, but when not using QoS, closing the channel and connection do not release the implicit buffer.

When i check your consumer.go is that right The QoS parameter was 0 ?

if err := ch.Qos(cfg.Prefetch.Count, 0, cfg.Prefetch.Global); err != nil { return nil, errors.New(fmt.Sprintf("Failed to set QoS: %s", err.Error())) }

Still chasing the ghost here the problem resurface every two days or so. Using Ubuntu 16.04 I will try to Set the QoS parameter to 1 land recompile your consumer app let see how this fly.

ricbra commented 7 years ago

Refer the official docs for the exact params of this method: https://godoc.org/github.com/streadway/amqp#Channel.Qos

Second param is prefetchSize. I don't understand exactly what mattwilliamson means with "QoS of 1".

Let me know if you solve the problem with this fix.

hellracer commented 7 years ago

This is a sample message that was stucked on queue as soon as I delete this message on queue the consumer script continue to run ric another suggestion I hand in mind how can you enclosed the argument on the invoke script with double quotes?

e.g php test.php "base64_encoded message" ?

The actual payload is a json encoded message

eNoUjjFPwzAQRv/K1bNx7uzETr2lEkJIgCq1A0MWIx+pReJEaQMD4r/jbk+np/ver5jmjzSy8AL35BpjTd02QoqJr9cw3O/PeQw5wvn98cGiheOBoBt5ve120MW4Fs/D27zeLvAaUobum/PGEp5CDnGWcEj9hvhJIUt4CcOWA5zTxF4jub5C3Ve6BSJvjCcr4bQwR49fU19dypPjyVOttEO0xSZNChvnalMKl7JZ8qjg8JOWO+61ItsqjagMib//AAAA//+muzzA

andrefigueira commented 7 years ago

That doesn't seem to decode correctly? are you sure what's being encoded is valid?

hellracer commented 7 years ago

//$str = "eNoUjjFPwzAQRv/K1bNx7uzETr2lEkJIgCq1A0MWIx+pReJEaQMD4r/jbk+np/ver5jmjzSy8AL35BpjTd02QoqJr9cw3O/PeQw5wvn98cGiheOBoBt5ve120MW4Fs/D27zeLvAaUobum/PGEp5CDnGWcEj9hvhJIUt4CcOWA5zTxF4jub5C3Ve6BSJvjCcr4bQwR49fU19dypPjyVOttEO0xSZNChvnalMKl7JZ8qjg8JOWO+61ItsqjagMib//AAAA//+muzzA";

//$str = base64_decode($str); //$str = gzuncompress($str);

//$a = json_decode($str); //print_r($a);

Yes that was I was thinking but try to run the script it will successfully decode the encoded message

hellracer commented 7 years ago

Hi guys,

Out of desperation move I slightly modify command_factory.go to send the base64 encoded via STDIN instead of sending the body as parameter. My issue I believed would be the encoded string still large enough even though it was compressed. Though to be honest it's just my hunch because I don't see the argument too long error from PHP resulting the PHP script to bailout and not processed the message.

Even though this will not fixed my issue the string limitation as argument is now removed and will be a major leap forward to triage my issue, thanks guys

hellracer commented 7 years ago

To other who might be interested of doing this I attach the code snippet on the other thread look for STDIN on the subject

hellracer commented 7 years ago

Hi Ric,

I was able to triage my issue and it's irrelevant on this post, what's relevant is basically we can't never tell what your developer will going to passed on the Queue, all I can say the STDIN enhancement should be in as another user option in the queue config section.

Please disregard the QoS issue i didn't touch the code obviously... for me I can say you can now close this issue 👍

soodkritika commented 7 years ago

Hi I am using rabbitmq on heroku. It is working fine that is consumer keeps on processing messages. But whenever I make any change in any file and push the code the connection gets lost. In this case , I need to restart the connection manually by calling the consumer process. I have also checked the logs but I didnt get anything there also. Please help me out.

Consumer file class ConsumerController extends AppController { public function consume() {

    $this->writeLog("shopcontroller:: index::starting to make connection with rabbitmq ..");

    try {
        $connection = new AMQPStreamConnection("hostname", port, "username", "password", "vhost");

        $resultData = print_r($connection, 1);
        $this->writeLog("shopcontroller:: index::connection object value with rabbitmq :: $resultData");

    } catch (Exception $e) {
        $resultData = print_r($e, 1);
        $this->writeLog("shopcontroller:: index::exception while making connection with rabbitmq : $resultData");
    }

    if ($connection)
    {
        try {
            $channel = $connection->channel();

            $channel->queue_declare('testqueue', false, false, false, false);
            echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

            $channel->basic_consume('testqueue', '', false, true, false, false,  array($this, 'processOrder') );
            while(count($channel->callbacks)) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        } catch (Exception $e) {
            $resultData = print_r($e, 1);
            $this->writeLog("shopcontroller:: index::exception while making channel with rabbitmq connection : $resultData");
        }

    }

}   

Function processOrder() { $testCron = new testCron(); $testCron->test(); }

kolte commented 6 years ago

Hi,

I am also facing the same issue, consumer stops processing messages from queue, even though consumer are running. Queue continues to fill up. I tried to restart the service and consumers but it does not process, it will sometime process couple messages then stops again.

RMQ-log.txt crash.log I have attached the log files. I am using latest RMQ on Windows server 2012 R2, please help.

Thanks G

uchm4n commented 5 years ago

same issue here. my workers dieing when my script hits curl timeout in some cases. any solution ? for now I think I'll just execute supervisor reload command inside a script before script even runs. but this is a hacky solution.

jerry73204 commented 4 years ago

Also same issue here. My consumer received queue 'detectron_detector' in vhost '/' process is stopped by supervisor after the detectron_detector queue started for a long time (~1hrs or 6hrs, it depends). Here is my setup and rabbitmq log.

The queue setup was intended for real-time messaging. Normally it accepts 50~90 msgs/s and three consumers subscribes this queue. Since it has x-message-ttl: 10000 property so I expect the queue size is limited to some degree.

I grepped the log and merely and found it relevant. Not knowing why it stopped unexpectedly.

2019-11-24 04:27:16.083 [warning] <0.3911.5> Queue detectron_detector in vhost / has its master node is down and no mirrors available or eligible for promotion. Forcing queue deletion.

Besides, the consumer cannot queue_declare to replace the stopped queue. It cannot recover without deleting the queue manually.

johnrhunt commented 1 year ago

This also affects us. Please note that you don't need to use this tool (or it's fork) any more since PHP7 came out as that can run for a long time without going funny. There's plenty of articles about using supervisord to do this.

As a really crummy stop gap solution, we have set up a weekly cron job to restart the listener (we found it only hung between 2 - 18 months). It's not pretty, but it'll work.

krys-codes commented 11 months ago

same here, got the same problem.. We set up restart cron jobs as a workaround too : X