arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.09k stars 265 forks source link

Any update on indefinite hangs in destructor? #196

Closed jrots closed 5 years ago

jrots commented 5 years ago

Hi,

is there an update on the indefinite hangs in destructor when no brokers are available ?

I tried several options but can't seem to get over this issue.. it's affecting my production environment atm when kafka is restarting or all the brokers are down

you can retry it very easily with following code:

$conf = new  RdKafka\Conf();
$conf->set('socket.blocking.max.ms', '1'); // Will help librdkafka release your application to continue
$conf->set('queue.buffering.max.ms', '1'); // Dispatch asap:
$conf->set('queue.buffering.max.messages', '10'); // Wait for max 10 messages before sending to Kafka.
$conf->set("socket.timeout.ms", '10');
$conf->set('debug', 'all');
$conf->set('message.timeout.ms', '1000'); 

pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', (string) SIGIO);
$producer = new RdKafka\Producer($conf);
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers("127.0.0.1:51004,127.0.0.1:51004,127.0.0.1:51004");
$topicConfig = new RdKafka\TopicConf();
$topic = $producer->newTopic('TEST', $topicConfig);
//var_dump($topic);
try {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'TESTMESSAGE');
    $producer->poll(0);
} catch (\Exception $e) {
    var_dump($e);
}
die('DESTRUCT CALL HERE');

Afterwards rdkafka will be in a destructor loop :

%7|1545411213.839|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v0.11.5 (0xb05ff) rdkafka#producer-1 initialized (debug 0xffff)
%7|1545411213.839|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1545411213.839|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1545411213.839|BROADCAST|rdkafka#producer-1| [thrd::0/internal]: Broadcasting state change
%7|1545411213.839|WAKEUPFD|rdkafka#producer-1| [thrd:app]: 127.0.0.1:51004/bootstrap: Enabled low-latency ops queue wake-ups
%7|1545411213.839|BROKER|rdkafka#producer-1| [thrd:app]: 127.0.0.1:51004/bootstrap: Added new broker with NodeId -1
%7|1545411213.839|BRKMAIN|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Enter main broker thread
%7|1545411213.839|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: broker in state INIT connecting
%7|1545411213.839|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: TEST
%7|1545411213.839|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW TEST [-1] 0x7f93d5f90550 (at rd_kafka_topic_new0:362)
%7|1545411213.839|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1545411213.840|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connecting to ipv4#127.0.0.1:51004 (plaintext) with socket 9
%7|1545411213.840|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state INIT -> CONNECT
%7|1545411213.840|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411213.840|BROKERFAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1545411213.840|FAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%3|1545411213.840|ERROR|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%7|1545411213.840|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state CONNECT -> DOWN
%3|1545411213.840|ERROR|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 1/1 brokers are down
%7|1545411213.840|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411213.840|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Purging bufq with 0 buffers
%7|1545411213.840|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Updating 0 buffers on connection reset
DESTRUCT CALL HERE%7|1545411214.841|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: broker in state DOWN connecting
%7|1545411214.841|NOINFO|rdkafka#producer-1| [thrd:main]: Topic TEST partition count is zero: should refresh metadata
%7|1545411214.841|METADATA|rdkafka#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1545411214.842|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connecting to ipv4#127.0.0.1:51004 (plaintext) with socket 9
%7|1545411214.842|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state DOWN -> CONNECT
%7|1545411214.842|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411214.842|BROKERFAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1545411214.842|FAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%7|1545411214.842|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state CONNECT -> DOWN
%7|1545411214.842|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411214.842|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Purging bufq with 0 buffers
%7|1545411214.842|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Updating 0 buffers on connection reset
%7|1545411215.845|NOINFO|rdkafka#producer-1| [thrd:main]: Topic TEST partition count is zero: should refresh metadata
%7|1545411215.846|METADATA|rdkafka#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1545411215.845|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: broker in state DOWN connecting
%7|1545411215.846|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connecting to ipv4#127.0.0.1:51004 (plaintext) with socket 9
%7|1545411215.846|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state DOWN -> CONNECT
%7|1545411215.846|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411215.846|BROKERFAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1545411215.846|FAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%7|1545411215.846|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state CONNECT -> DOWN
%7|1545411215.846|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411215.846|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Purging bufq with 0 buffers
%7|1545411215.846|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Updating 0 buffers on connection reset
%7|1545411216.848|NOINFO|rdkafka#producer-1| [thrd:main]: Topic TEST partition count is zero: should refresh metadata
%7|1545411216.848|METADATA|rdkafka#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1545411216.848|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: broker in state DOWN connecting
%7|1545411216.849|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connecting to ipv4#127.0.0.1:51004 (plaintext) with socket 9
%7|1545411216.849|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state DOWN -> CONNECT
%7|1545411216.849|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411216.849|BROKERFAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1545411216.849|FAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%7|1545411216.849|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state CONNECT -> DOWN
%7|1545411216.849|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411216.849|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Purging bufq with 0 buffers
%7|1545411216.849|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Updating 0 buffers on connection reset
%7|1545411217.851|NOINFO|rdkafka#producer-1| [thrd:main]: Topic TEST partition count is zero: should refresh metadata
%7|1545411217.851|METADATA|rdkafka#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1545411217.854|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: broker in state DOWN connecting
%7|1545411217.854|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connecting to ipv4#127.0.0.1:51004 (plaintext) with socket 9
%7|1545411217.854|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state DOWN -> CONNECT
%7|1545411217.854|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411217.854|BROKERFAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1545411217.854|FAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%7|1545411217.854|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state CONNECT -> DOWN
%7|1545411217.854|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411217.854|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Purging bufq with 0 buffers
%7|1545411217.854|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Updating 0 buffers on connection reset
%7|1545411218.855|NOINFO|rdkafka#producer-1| [thrd:main]: Topic TEST partition count is zero: should refresh metadata
%7|1545411218.855|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: broker in state DOWN connecting
%7|1545411218.855|METADATA|rdkafka#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1545411218.855|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connecting to ipv4#127.0.0.1:51004 (plaintext) with socket 9
%7|1545411218.855|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state DOWN -> CONNECT
%7|1545411218.855|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411218.855|BROKERFAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1545411218.855|FAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%7|1545411218.855|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state CONNECT -> DOWN
%7|1545411218.855|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411218.855|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Purging bufq with 0 buffers
%7|1545411218.855|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Updating 0 buffers on connection reset
%7|1545411219.856|NOINFO|rdkafka#producer-1| [thrd:main]: Topic TEST partition count is zero: should refresh metadata
%7|1545411219.856|METADATA|rdkafka#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1545411219.860|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: broker in state DOWN connecting
%7|1545411219.860|CONNECT|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connecting to ipv4#127.0.0.1:51004 (plaintext) with socket 9
%7|1545411219.861|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state DOWN -> CONNECT
%7|1545411219.861|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411219.861|BROKERFAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1545411219.861|FAIL|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Connect to ipv4#127.0.0.1:51004 failed: Connection refused
%7|1545411219.861|STATE|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Broker changed state CONNECT -> DOWN
%7|1545411219.861|BROADCAST|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: Broadcasting state change
%7|1545411219.861|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Purging bufq with 0 buffers
%7|1545411219.861|BUFQ|rdkafka#producer-1| [thrd:127.0.0.1:51004/bootstrap]: 127.0.0.1:51004/bootstrap: Updating 0 buffers on connection reset
.... etc
have to hard kill the process to stop it.. 

Thanks! Regards J

Steveb-p commented 5 years ago

In my case a similar problem happens, when my instance is unable to connect to a broker instance (while testing I'm simply stopping kafka docker container). This causes Producer to retry sending the message over and over, locking up php-fpm in the process.

For normal cases when Kafka is online, it seems that after message is acknowledged process is freed properly.

sustmi commented 5 years ago

I think that the problem lies in an "infinite" loop inside kafka_free() function that is called when destructing the PHP object: https://github.com/arnaud-lb/php-rdkafka/blob/master/rdkafka.c#L85

The loop looks similar to the one in librdkafka's rd_kafka_flush() function: https://github.com/edenhill/librdkafka/blob/9b3fce7b882b43302fb983d0e0e555225e672f92/src/rdkafka.c#L3790 but rd_kafka_flush() checks not only whether the output queue is empty but it also checks for a timeout condition.

I think that polling in the destructor is not the best solution and it would be better to expose the rd_kafka_flush() function so that users can call it before the PHP application/script ends. The reason is that in order to do any error handling for undelivered messages, one needs to set the dr_msg_cb (message delivery callback) which gets called while polling. When the polling happens in the PHP object destructor, it's usually too late to do any error handling. For example, in my Symfony app I am unable to log the error using Monolog because its logger object is already destroyed at that time.

Currently, as a workaround I added custom flush() method to the \Enqueue\RdKafka\RdKafkaProducer class (because I am using the \Enqueue\RdKafka wrapper):

public function flush(int $timeoutInMilliseconds): void
{
    // Make at least one non-blocking poll
    $this->vendorProducer->poll(0);

    $endTime = microtime(true) + $timeoutInMilliseconds / 1000;

    $remainingTimeInMilliseconds = $timeoutInMilliseconds;

    while ($this->vendorProducer->getOutQLen() > 0 && $remainingTimeInMilliseconds > 0) {
        $this->vendorProducer->poll($remainingTimeInMilliseconds);

        $remainingTimeInMilliseconds = (int) (($endTime - microtime(true)) * 1000);
    }

    if ($this->vendorProducer->getOutQLen() > 0) {
        throw new \RuntimeException('Flush timed out');
    }
}

Then I can call the flush() method on kernel.terminate event where I can do some error handling.

arnaud-lb commented 5 years ago

Thanks for the analysis @sustmi

The current behavior (poll until the queue is empty) is safe by default, as it will attempt to send the messages until they are actually sent. I think that this is a good behavior to retain.

Though I agree that there should be a way to quit earlier. If we expose rd_kafka_fllush() and rd_kafka_purge(), it would be possible to do something like this:

$rk->flush($timeout);
$rk->purge(); // After this, the destruction would be non-blocking

WDYT ?

nick-zh commented 5 years ago

@arnaud-lb building on what was previously mentioned, i think it would be important, that at least all the callbacks that can be triggered from poll, can be executed still (so the application could handle errors). If this is not possible, i would actually get rid of the poll in the destructor and adjust all the examples, and really stress that we need to do flush and purge for a proper shutdown. If it is guaranteed that all the callbacks still will be triggered during the destruct and the app can still handle them, i think it would be ok to leave it in. What do you think?

arnaud-lb commented 5 years ago

Unfortunately, there is no guarantee that callbacks are going to be called during destruction, because the callbacks themselves might have been destroyed by the GC already at this point.

Here are options I can think of:

Option 1: Don't poll in the destructor, and let users poll/flush manually. If the user forgets to do so, messages are lost. Option 2: Poll in the destructor, and let users poll/flush/purge manually before destruction if they need to ignore unflushed messages. This is the current behavior. Option 3: Add a setting on RdKafka\Producer to chose between the two behaviors.

nick-zh commented 5 years ago

@arnaud-lb in my opinion we should go with Option 1 (we could always adapt if we see, that too many people have problems and add Option 2 again). My reason for option one is, if we don't poll / flush, we can actually anyway never be sure if poll will trigger a callback that is unhandled, which actually then could result in lost messages as well. So i think it is not the extensions responsibility to take care of this, since it is very hard to acomplish in a good way. If you are ok with Option 1, i would adjust readme and doc, to stress and show how a proper shutdown should be done of course.

Steveb-p commented 5 years ago

Option 1: Don't poll in the destructor, and let users poll/flush manually. If the user forgets to do so, messages are lost.

Simple solutions are probably best.

This option gives programmer-user the most control in my opinion and should be preferred. It is explicit and shows exactly what is happening. Current behavior is difficult to explain to newcomers, since it involves "hidden" operations that may or may not occur (message is not able to be delivered).

Actually, I believe it will prevent more issues than it might introduce, since what usually happens is programmer-user does no error checking and is eventually surprised when inevitable connectivity issues occur and PHP processes start to chog, retrying until destroyed and/or lose messages.

Enforcing proper error handling in user-land is the way to go for me.

arnaud-lb commented 5 years ago

Agreed, the current behavior is confusing. Let's go for option 1. We have to document this change, and release this in a new major version, though.