arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.08k stars 264 forks source link

[question] How producer exit, when broker is down? #184

Closed tesion99 closed 5 years ago

tesion99 commented 6 years ago
bglover commented 5 years ago

I was wondering that as well. After digging a litter deeper in all the configuration options, I found that when the broker is down, message delivery is contingent on the value of the 'message.timeout.ms'. The default value is 300000ms- which means it will wait that long before giving up.

Try playing with that value and see if it can help with your use-case.

nick-zh commented 5 years ago

i think the delivery report callback would handle this case, if you have registered one: https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-conf.setdrmsgcb.html you can handle that case there

unfrgivn commented 5 years ago

How are others handling this? I have tried both setDrMsgCb and setErrorCb and I can only get them to fire once the message.timeout.ms is reached even if I die() inside of them. In my case, I just want to ignore the delivery and continue with my process immediately if I get back an error response that the broker is unreachable.

I am using the same example as the OP on my producer.

As of right now, the only solution that works for me is to set the producer timeouts super low on the topic so that my PHP script doesn't hang when the brokers are down.

$conf->set('delivery.timeout.ms', 5000);
$conf->set('request.timeout.ms', 5000);
Steveb-p commented 5 years ago

@unfrgivn In my case I'm simply not using Kafka in any web-facing scripts, delegating tasks to background processes / queues. The behavior you described I'm unsure if it's something that can be done from phprdkafka, since it's the librdkafka threads that are keeping the main thread alive.

even if I die() inside of them

I doubt they will fire more than once if you die inside of them.

I just want to ignore the delivery and continue with my process immediately if I get back an error response that the broker is unreachable.

Then don't poll until you're finished with whatever else you were doing. Librdkafka will continue to try & push the message through to the broker using a subthread. You're not required to wait for it (though it's a good idea to poll eventually / at regular intervals.

unfrgivn commented 5 years ago

Thanks @Steveb-p, this is for logging API responses which are parsed out by a consumer later and written to a DB for permanent storage so it’s more critical that process continues than logs successfully. However other calls may need to trigger a system email or something that should definitely be written to the topic. I think I just don’t understand the concept of polling and need to read up on that more.

What you’re describing with a subthread may be what I want. I was hoping I could use this library to essentially async tasks that would previously hold up execution but maybe not.

unfrgivn commented 5 years ago

and just to clarify, removing the while loop on the queue and polling does not speed anything up without dropping the delivery timeout. As I said above, the two callbacks do not fire until these timeouts are fully met, so by default they are never called until after 300000ms.

According to the docs poll(0) should be non-blocking but it doesn't matter what I set that to or if I don't poll at all or if I turn acks off which should also not wait for a response. The produce() method seems to lock up the entire thread until the delivery timeout is met. Here is some sample code I'm using but I've tried just about every combination of values here and commenting out various lines:

public static function kafka(string $topic, string $value) {

        try {
            // configuration optimized for low latency. This allows a PHP process / request to send messages ASAP and to terminate quickly.
            $conf = new \RdKafka\Conf();
            $conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
            if (function_exists('pcntl_sigprocmask')) {
                pcntl_sigprocmask(SIG_BLOCK, [SIGIO]);
                $conf->set('internal.termination.signal', SIGIO);
            } else {
                $conf->set('queue.buffering.max.ms', 1);
            }

            // This prevents long timeouts if the server is unavailable
            $conf->set('delivery.timeout.ms', 1000);
            $conf->set('request.timeout.ms', 1000);
            $conf->set('acks', 0);

            // Default 1MB
            $conf->set('message.max.bytes', MAX_MESSAGE_BYTES);

            // Not working
            $conf->setErrorCb(function ($kafka, $err, $reason) {
                // TODO: Do something here
//              var_dump(rd_kafka_err2str($err)); // (-195) "Local: Broker transport failure"
                printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
//              die();
            });

            $conf->setDrMsgCb(function ($kafka, $message) {
                if ($message->err) {
                    var_dump($message->err);
                    // message permanently failed to be delivered
                } else {
                    // message successfully delivered
                }
            });

            $rk = new \RdKafka\Producer($conf);

            if (isDebug()) {
                $rk->setLogLevel(LOG_DEBUG);
            }

            $rk->addBrokers(KAFKA_BROKERS);

            $topicConf = new \RdKafka\TopicConf();
            $topicConf->set("auto.commit.interval.ms", 1000);
            $topicConf->set("message.timeout.ms", 1000);

            $topic = $rk->newTopic($topic);

            $topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);
            $rk->poll(0);

            // Polling after producing can also be important to reduce termination times
//          while ($rk->getOutQLen() > 0) {
//              print $rk->getOutQLen() . "<br>";
//              $rk->poll(50);
//          }

        } catch (\RdKafka\Exception $e) {
            // Fail silently
        }
    }

Edit: I forgot to add. To reproduce what I'm talking about, I comment out the delivery and request timeout settings and then set my brokers to a bogus IP that will simulate an unreachable host

nick-zh commented 5 years ago

@unfrgivn i hope i find soon time, to help you with this, in the meantime, i can say it is crucial to have something like this in your producer mechanism:

while ($this->producer->getOutQLen() > 0) {
    $this->producer->poll($this->pollTimeout);
}

otherwise your callbacks don't get called and sooner or later you will crash when the poll queue gets too big. Am i understanding correctly, that if you fail to send a message (and you want to move on), you can't because the producer awaits the timeout and does not finish sooner? If so, then i agree with @Steveb-p , you probably need to do this in a delegated fashion with another process, a swoole coroutine, etc. or lower the timeout if this is feasible.

unfrgivn commented 5 years ago

Thanks @nick-zh and that all makes sense, I’m just wondering how people are writing to Kafka PHP from any external data source like an inbound webhook or adding a method result to a topic to be queued for work later on? What about using it for sending web analytics to Kafka in realtime (vs reading the logs?) If you can’t queue the message locally and then terminate the thread immediately, that creates a huge roadblock if something is unavailable. Imagine thousands of incoming requests per second and the broker is unreachable, the server would crumble.

I’m sure it’s largely my ignorance (I’ve worked mostly with consumers and connect), but why are things like acks 0 and timeouts seemingly not respected the same way in PHP as they are with connectors and other language clients that also wrap librdkafka? The docs state ways of making this non-blocking but perhaps I’m misunderstanding that term? If there is a local queue that batches and sends messages, why does the initial thread that generates the message need to wait for a successful response?

Do I really need to do use a coroutine and one of the async PHP libraries or write the messages to a custom local tmp storage and then have a detached PHP process to send the messages? I think the answer might be I’m better off doing some of these examples in Node or another language, but we have so much infrastructure in PHP that we’d like to integrate that directly into our Kafka stack.

I appreciate everyone’s work on the project, thanks again for any insights.

nick-zh commented 5 years ago

@unfrgivn ah yeah, i totally get what you are aiming at now. To be perfectly honest, we haven't had any connection issues for a long time. But i totally agree this is very important. For a new project i am working on, i will be digging into stuff like this, but sadly i cannot offer any insights at the moment. The only problem we had was the initial connect (that happened a lot), but we were able to reduce that and since then, stuff is humming :smile: I will try to dig into this and see if i can offer any insights. For now i can say, even librdkafka is sometimes behind features and if you really want to leverage the full power of kafka, probably going with java (or maybe even something else) is probably the best. Stuff like Streams isn't even available for PHP (at least i was unable to find something, if you know something, please let me know :heart: ). I am in the same situation as you, we have a PHP stack and i want to make this work in the best fashion. I want to brush up my C knowledge, because there is still stuff from the lib, that is not in the extension and i think every contribution to projects like this count. I will let you know if i have any findings, now that i understand what you want to achieve :+1:

Steveb-p commented 5 years ago

There were a lot of changes in librdkafka since phprdkafka 3.0 was released (almost two years worth of commits). You might want to try running it with 0.11.6 version of librdkafka instead of 1.0+ and checking if it changes anything. There might have been a change tha phprdkafka has to yet accomodate for that went under the radar.

@nick-zh afaik Streams are not supported in librdkafka yet either. I might be wrong here, but it's something you still cannot do.

EDIT: Edenhill, maintainer of librdkafka, said in an issue regarding this (december 2018):

librdkafka does not implement Kafka Streams and there are no plans for this either.

Unfortunately a lot of people move away from phprdkafka to more mature / better supported solutions (understandably so). Practically there is only arnaud-lb that is the original author and maintainer of phprdkafka. I cannot speak about nick-zh, but my knowledge of C / PHP extensions is wayyy too low to contribute meaningfully in a timely manner (btw, if anyone knows some good sources on PHP extensions, I'd be grateful - whatever I've found seems to be all over the place).

Regarding possible solutions to your problem - as I said I delegate producing to Kafka to another process. Specifically in my case, I'm using local Redis instance to store messages that should be sent (and web PHP process produces only to this Redis instance). Then a background process that is constantly running picks them up as they arrive and produce Kafka. Since I do not care how long this process takes or even if it can fail (web client was already served) I can safely "ignore" connectivity errors. You can use supervisord or similar tools/services to keep the background process always running. I'm using PM2 from NodeJS (which I found to be relatively easy to control without having to sudo all the time, so my sysadmin is happy). Something along the lines:

pm2 start --name="parser" -f current/bin/console --interpreter php -- app:events:parse
unfrgivn commented 5 years ago

Thanks for the responses. We do use nodefluent with node-rdkafka for streams and connectors which is an almost complete clone of the Java frameworks and works very well (plus we already know JS).

For our PHP I’m thinking of attempting something really similar @Steveb-p but using blackhole tables in MySQL and the Debeezium connector we already leverage. So all our streams can run ETL off inserts to that table and create the specific job topics we want. Redis would be a great alternative so I might play around with that idea, but the beautiful thing about writing to a SQL table is we cut down on external decencies to other libraries and servers. Idea from Yelp

Anyways good discussion. This is one of the top ranked threads when searching for this connectivity issue so maybe it will help others.

unfrgivn commented 5 years ago

Also just more info @nick-zh we did look at using Promises with ReactPHP to achieve this as you hinted. The problem is we would have to rearchitect our entire application to use this bc otherwise the async call will still terminate when the main thread dies. There may be a way to achieve this using pthreads, but it started to feel a little duct tape as I went down that path.

nick-zh commented 5 years ago

@unfrgivn sry it has been a while. I will try to sum things up, let me know if afterwards we can close this or if you see a way where we can dig in deeper.

So first of all, when i produce a message, it actually goes into a local queue, so even when the producer is down, i am being able to produce messages to that local queue. The problem arises later, when the broker doesn't come back up in time (meaning all my timeouts have been reached). My delivery report callback will get triggered and i will get the error, that my messages haven't been delivered to the broker. From then on, it is in the hands of the application to act accordingly with requeue, disregard, etc.

The problem is, even when you have very large timeouts and re-queue mechanisms to avoid the scenario mentioned above, the ultimate problem still exists if your process gets killed. Your messages will be lost and there is no way of knowing which messages got lost.

So if you need to absolutely know if your message is produced with PHP, i see no other way around, than to have a persistent log / record of messages that were sent / are in flight so you can recover from it, even if the applications crashes. Unlike connectors, we as simple producers don't keep track on what has been sent, so imho, as stated above, you would need to build something similar to what connect does, to keep track.

Regarding the process itself, from what i know, it ends when your scripts end. There have been efforts on something, that could solve this and other problems we face not having an open connection in the background (Issue #42), but sadly this issue is stale and my skills are too limited to help out there (at least for now).