Open the100rabh opened 8 years ago
Is there any plan to implement this? I think it's most important feature for PHP which running in FPM environment. Otherwise each request it have to trigger a bunch of requests to Kafka brokers which really makes it slow.
@arnaud-lb Can we have this ? How can I help ?
I would gladly accept help on this.
Basically, we would create an API like this:
$instanceName = "somename";
if (!$rk = \RdKafka\Producer::getInstance($instanceName)) {
$rk = new \RdKafka\Producer($conf, $instanceName);
}
\RdKafka\Producer::getInstance()
would search for an existing persistent Producer instance with the given name, and return it.
\RdKafka\Producer($conf, $name)
would register itself if a name is passed as second argument.
Internally, only the rdkafka handle would be persistent, not the actual PHP object. getInstance()
would search the persistent rdkafka handle, and create a new Producer PHP object with the rdkafka handle.
Producer() would not destroy the rdkafka handle if it's a persistent handle.
Hi @arnaud-lb! Since we need the persistent producer we decided to implement something ourselves. I already have something started (still WIP, only tested on PHP 7.1) here: https://github.com/awons/php-rdkafka/tree/issue-42/persistent-producer
I am not a C developer and this is my first time with PHP's extension so hopefully this is not too bad ;) I managed to implement a persistent kafka instance. It works quite well (tested within our own integration tests using the extension) but still have one main issue - callbacks. When request ends all callbacks are cleared and kafka, obviously, segfaults when trying to call an arbitrary memory location.
I wanted to ask what do you think about the following solution: For persistent producers we create a hash table with all possible callbacks doing nothing. Those callbacks are then registered in kafka. Then on getInstance()
we also need to provide new callbacks like getInstance('name', function(){})
.
This way kafka always sees pointer to the same global callback and we only replace pointers within this callback.
After request those temporary pointers are nullified and when we forget to re-register them kafka will still call the global function doing nothing.
I am still not sure how to handle situation when different requests will need register different callbacks for the same instance, but I will try to figure something out.
What do you think about this idea?
@awons: I'll take a look this week!
Awesome!
I like the general idea.
About callbacks: You are right, we could do it that way. Maybe we could use the opaque setting to store this data (see rd_kafka_conf_set_opaque() / rd_kafka_opaque()). C would just forward calls to the user callbacks referenced in the opaque struct. We could add methods to change these callbacks after getInstance().
About the instance table: Zend hash tables can be created persistent (see zend_hash_init()).
Totally agree with the approach to callbacks. There is just one more thing I cannot grasp. We need a way to figure out which callback needs to be called exactly. One instance can only register one callback. Hence one event can, in theory, be propagated to n different requests simultaneously. We the would need a way to register per-request callbacks somehow (maybe with request ID/thread ID?). Not sure yet how to approach this.
As for zend_hash_init
. I know it can be used for persistent allocation but the issue I found is that every element in that table needs to be a zval. And php < 5 cannot store arbitrary pointers in zvals. PHP 7 can, but then we either have to keep two different implementations or drop support for php<7.
At least this is what I figured.
Maybe we could treat pointers as integers and store them like that... It is kind of a hack but maybe it could work.
Hence one event can, in theory, be propagated to n different requests simultaneously
Are you sharing the rdkafka instance between multiple threads ? I believe that it could become very difficult to manage. From the point of view of the user, we will receive events that are not related to the current request. From the point of view of the extension, there is a lot of work required to make sure that callbacks are called in the right thread.
It would be simpler to not share rdkafka instances between multiple threads. Sharing the instance between subsequent requests on the same thread would be enough I think (this is how it would behave on non-threaded environments, like php-fm, too).
PHP internals have a notion of per-thread globals that you can use to achieve that. This is not used in php-rdkafka currently, but you can add support for per-thread globals by applying something like this: https://gist.github.com/arnaud-lb/ed0a6501f17eee05e3f814b6ae2dccd6. Basically, the ZEND_BEGIN_MODULE_GLOBALS()
declares a struct that will be thread-specific (each thread has its own instance). You can access field with the RDKAFKA_G()
marco, e.g. RDKAFKA_G(some_value)=1
.
As for zend_hash_init. I know it can be used for persistent allocation but the issue I found is that every element in that table needs to be a zval
Actually you can store pointers, even though it's not super obvious in PHP 5. See add_consuming_toppar() in rdkafka.c.
Sorry for late response. Was too busy with another project.
Are you sharing the rdkafka instance between multiple threads ?.
That was my initial idea - to have this feature also available in ZTS. But right now I see it is probably way too complicated and could lead to too many issues. It is probably just not worth it.
We could simply throw a RutimeException
if someone tries to use this method in ZTS mode, or not even compile it there; not sure which one would be better.
Actually you can store pointers, even though it's not super obvious in PHP 5. See add_consuming_toppar() in rdkafka.c
That is a really nifty workaround :) I will refactor instances handling to use this tick.
I was just thinking about another thing. You mentioned module globals. Correct me if I am wrong, but if we are not going to implement this feature for ZTS then it makes no sense to use module globals because in NTS mode they always resolve to a simple variable.
We could simply throw a RutimeException if someone tries to use this method in ZTS mode, or not even compile it there; not sure which one would be better.
Actually there is no problem with using this feature in ZTS mode, as long as we use module globals. Module globals are the way to go when we need variables whose value persist across requests, but should not be shared across threads.
Actually there is no problem with using this feature in ZTS mode, as long as we use module globals. Module globals are the way to go when we need variables whose value persist across requests, but should not be shared across threads.
Did my homework. Got confused by an explanation of threaded model I found somewhere on the internet. But you are 100% right. We either have one PHP interpreter per process or one per thread. One interpreter will be responsible for handling multiple requests sequentially (either withing one process or one thread), so it makes perfect sense having this feature working in ZTS mode. Getting back to work then :)
@arnaud-lb I think I am missing something again. You mentioned the add_consuming_toppar()
method as an example of how to use raw pointers with hash tables in PHP 5. The problem I see is that methods like zend_hash_str_add_ptr
and zend_hash_str_find_ptr
are only available in PHP 7. PHP 5 only has zend_hash_add
and zend_hash_find
.
Now, I see three options here:
Which is it then?
These functions are defined in php_rdkafka_priv.h if the version of PHP doesn't have then already: https://github.com/arnaud-lb/php-rdkafka/blob/6f6f4b046b5bd0bdd8e2ed1aab82b37566ac57d7/php_rdkafka_priv.h#L129-L134
Hey guys
Is there like an update, status, eta? I am very interested in this as well.
+1
Another country, another job, and too little time for after hour projects. And I don't use Kafka in my current job anymore. But the problem here is that the kafka object that we have is not the same as the one used later by kafka. Kafka makes a copy of it and I couldn't figure out how get the correct reference. Additionally this would caused lots of changes in the extension itself. My C knowledge is unfortunately very limited :(
Thanks for the update @awons :bowing_man:
Hi @arnaud-lb! Since we need the persistent producer we decided to implement something ourselves. I already have something started (still WIP, only tested on PHP 7.1) here: https://github.com/awons/php-rdkafka/tree/issue-42/persistent-producer
I am not a C developer and this is my first time with PHP's extension so hopefully this is not too bad ;) I managed to implement a persistent kafka instance. It works quite well (tested within our own integration tests using the extension) but still have one main issue - callbacks. When request ends all callbacks are cleared and kafka, obviously, segfaults when trying to call an arbitrary memory location.
I wanted to ask what do you think about the following solution: For persistent producers we create a hash table with all possible callbacks doing nothing. Those callbacks are then registered in kafka. Then on
getInstance()
we also need to provide new callbacks likegetInstance('name', function(){})
. This way kafka always sees pointer to the same global callback and we only replace pointers within this callback. After request those temporary pointers are nullified and when we forget to re-register them kafka will still call the global function doing nothing.I am still not sure how to handle situation when different requests will need register different callbacks for the same instance, but I will try to figure something out.
What do you think about this idea?
@awons I have test it under php-fpm, It could not work.
Below is code
$conn = $this->config->getConn();
34 $this->produce = \Rdkafka\Producer::getInstance($conn);
35 $pid = getmypid();
36 if (!$this->produce) {
37 Log::info("<kafka($conn)>php-process($pid):create single instance of producer");
38 $this->produce = new \RdKafka\Producer($this->config->getProduceConf());
39 } else {
40 Log::debug("<kafka($conn)>php-process($pid):no need to create producer for exists instance");
41 }
Below is log:
[2019-09-20 16:55:59.110786] test.INFO: #[RL][b3167be39f61e38a72a0][3][0.001/0.08] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:01.560371] test.INFO: #[RL][cfe51416e3e3e73953e9][3][0.031/0.10] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:02.310157] test.INFO: #[RL][d11170019b80e9a13ee6][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:02.911125] test.INFO: #[RL][098a32f35726af04f142][3][0.001/0.09] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:03.602770] test.INFO: #[RL][8521dbb62aed83ea2d46][3][0.001/0.14] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:04.204491] test.INFO: #[RL][eaf5cdbdd848addf1185][3][0.001/0.22] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:04.611187] test.INFO: #[RL][ba0b84f5d65a283a9181][3][0.001/0.13] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:04.920222] test.INFO: #[RL][cfc30d377a7d1e404711][3][0.001/0.07] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:05.409298] test.INFO: #[RL][5f58979bf1646cf45273][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:05.917587] test.INFO: #[RL][0d5603654ec3dde23c0d][3][0.001/0.09] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:06.511175] test.INFO: #[RL][c8141141725e66436a66][3][0.001/0.19] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:06.920342] test.INFO: #[RL][2a8f4ef4a096490bf70b][3][0.001/0.15] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:07.306934] test.INFO: #[RL][875b552fe5b2ec7e4491][3][0.001/0.13] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:07.605764] test.INFO: #[RL][6149c1deb2d07e4dbc2a][3][0.001/0.08] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:10.114968] test.INFO: #[RL][62347f12aa59ee944183][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:10.804955] test.INFO: #[RL][9cda48bb918466d49bf8][3][0.001/0.22] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:11.321044] test.INFO: #[RL][ac6473b208cc20518860][3][0.001/0.11] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:11.811838] test.INFO: #[RL][3b6acd3425719aeddad4][3][0.001/0.10] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:12.305731] test.INFO: #[RL][073a3af1b0596a5158ce][3][0.001/0.09] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:13.587820] test.INFO: #[RL][e1883b9d64def0a0bc95][3][0.001/0.20] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:14.628406] test.INFO: #[RL][a84e795007746f9137d8][3][0.001/0.07] <kafka(device_status)>php-process(382):create single instance of producer
@dawei101 as @awons stated before, he is not working on this anymore. There is no current progress on this. I would say this is on hold for now and i am not sure if this will move forward in the next few months unless somebody picks up where @awons left off
@dawei101 as @awons stated before, he is not working on this anymore. There is no current progress on this. I would say this is on hold for now and i am not sure if this will move forward in the next few months unless somebody picks up where @awons left off
Ok, thanks
I can have a look at this next, but i will first take care of introducing the Admin API, so my rough estimate would be, maybe i can find time in november, just as fyi
Excuse me if I'm wrong, but what if there is a php background script running as a daemon accepting new messages (maybe via socket/ api / or checking a mysql table for queued messages) and forward them to kafka?
This script would then just open the connection with kafka once and produce messages when needed.
It's probably not a good idea to shoot messages to kafka during a page load right?
@MrMoronIV you're partially right, currently it's not exactly 100% safe to send messages to Kafka from PHP process that handles web requests, like PHP-FPM, Apache PHP mod or PHP SAPI - but the reason is not that it's slow.
Usually what phprdkafka does when you're producing a message is:
poll
ing (to check the result of producing a message - this basically means thread reports which messages we're delivered successfully, and call any registered callbacks). Note that during this time the message is actually being sent.Point 3. is what will cause issues if you're not careful - a dead Kafka broker or connectivity issues will prevent thread from sending a message and will cause PHP processes to live longer than expected. Since usually your webserver is configured to allow only a specific number of PHP processes to handle incoming requests this eventually causes those threads to starve webserver from resources.
Phprdkafka 4.0 will no longer contain the code handling point 3: thread will terminate as soon as PHP process reaches shutdown. User (programmer) will be responsible to handle delivery checks on their own.
In case of my apps I'm doing exactly what you described - I'm keeping a background process that all messages are delegated into.
In case of my apps I'm doing exactly what you described - I'm keeping a background process that all messages are delegated into.
Are you willing to share this code? I'm so confused as to why there is no popular standalone daemon that takes messages via a socket and delivers them in the order they were received (and takes care of retries in case of outage)
Are you willing to share this code? I'm so confused as to why there is no popular standalone daemon that takes messages via a socket and delivers them in the order they were received (and takes care of retries in case of outage)
@MrMoronIV It's nothing too complicated really.
What I've meant by delegating is I'm saving them to a "temporary" storage (in my case Redis, but it can be anything). Background process simply reads that storage and pushes messages into Kafka.
If you think about it this helps alleviate a number of issues. Using a permanently running process that listens on a socket would be fine, but in case of any failure it would mean potentially losing messages (or storing them somewhere, but then... what's the point? :P). Introducing a temporary storage prevents that and does not introduce any complicated concepts.
I'm using php-enqueue with two transport - redis and kafka. However, I'm not willing to recommend it because you can run into issues with it. You can give it a go and see if it works for you. I also have to mention that I'm helping with maintenance of Kafka package there so I can be biased in my opinion. I'm only mentioning this because you've asked what I'm using :)
you can run into issues with it. You can give it a go and see if it works for you.
Why do you say this? Did you have issues yourself or do you just want to be sure I'm not fixated on the first thing mentioned and should consider all available options first? You kinda make it sound like I should avoid this package all together!
On a side note: I do like the redis approach but yeah, it means another package installed on my servers yet again.
@Steveb-p @MrMoronIV i love the inputs, don't get me wrong, but i think we are getting side tracked here now. If you want to continue discussing this, i would appreciate it, that you move it to another issue, thx guys :+1:
@Steveb-p @MrMoronIV i love the inputs, don't get me wrong, but i think we are getting side tracked here now. If you want to continue discussing this, i would appreciate it, that you move it to another issue, thx guys
Sure, we will move to phprdkafka gitter. Do you mind @MrMoronIV ?
Why do you say this? Did you have issues yourself or do you just want to be sure I'm not fixated on the first thing mentioned and should consider all available options first? You kinda make it sound like I should avoid this package all together!
I don't want to advertise it, and I know some people run into issues with it, so I don't want you to waste your time for too long if you run into some. Give it a shot and see if it's alright for you. For all I know Timesplinter (which is a contributor here) does use it, I think?
On a side note: I do like the redis approach but yeah, it means another package installed on my servers yet again.
You can use filesystem to store messages. Php-enqueue actually does have a transport which does exactly that. However, it should be rather simple to just file_put_contents($filename, json_encode($message))
and then subsequently use DirectoryIterator
to iterate over directory contents (to prevent memory issues with scandir
) and sent each message to Kafka.
// Offtopic off For permanent Kafka connections, doesn't Mysqli and postgres drivers not allow something similar with permanent database connections? So it should be at least possible?
@Steveb-p it is indeed possible, but from what i got, it is still not that easy. I don't have a lot of time right now and since we use swoole in our project, i don't have an urgent need for this feature anymore, but i will still try to give it a go after i resolved issue 215
Sorry for awakening this issue, but I have encountered similar problems with Confluent Cloud (cloud-native service for Kafka) and hitting its 'Open Connection Attempts' limit.
I have two questions:
php-rdkafka
or librdkafka
level, so that this request doesn't need to be issued on each connect?Thanks!
- what people use nowadays as an alternative to overcome this problem, since persistent connections will obviously never be implemented? I read that some suggest an intermediate step with storing messages in Redis or filesystem. I also saw the Swoole suggestion, but I don't know the details.
It's a simple concept really.
Instead of pushing messages to Kafka in the original process, you store them anywhere else (be it database, redis, filesystem) for a different, background process to pick up.
This has the added benefit of relieving original process of the need to wait for any confirmation from Kafka - receiving a confirmation from the storage is usually good enough that it will be eventually sent (with note that it's not actually Kafka responding).
After all, it makes sense, since you're not supposed to consume messages in short-lived processes anyway. So kinda the same applies to producers.
Swoole is different in that it's a event-loop based approach. Any event-loop based solution is able to re-use Kafka connection (since you're operating in confines of a process that handles multiple connections, not just one as it usually happens with PHP-FPM (simplified, but in general true)).
Thanks for the answer @Steveb-p, very useful. 👍 Yeah, I'm familiar with the first concept, I also use Enqueue on top of this extension so it should be easy to switch the transport. What I'm concerned about with that idea is increased latency of messages due to the intermediate step.
@dtmp just keep in mind that REST will be inherently slower than talking to a Kafka instance directly, without an added layer.
Each time PHP script gets executed, a new instance is created of Kafka. It would be nice if there was some way of keeping the connection persistent and not having to fetch meta data for each request being made. This is especially helpful in case of web services with php-fpm and nginx to avoid latencies due to meta data fetch.
Instead of creating a new topic object each time, it would reuse the same topic object across script invocations.