weiboad / kafka-php

kafka php client
Apache License 2.0
1.44k stars 451 forks source link

run producer in cunsumer #205

Open SunnnyGohil opened 6 years ago

SunnnyGohil commented 6 years ago

Hii

Can i run producer into Consumer in kafka-php?

lcobucci commented 6 years ago

@SunnnyGohil for now you can only run the sync producer while consuming messages, since the control of the event loop is done inside of the classes

SunnnyGohil commented 6 years ago

thanks for reply @lcobucci

when i run producer into consumer that time i got "Exception 'LogicException' with message 'Cannot run() recursively; event reactor already active" this error.

if you have solution than please help me

lcobucci commented 6 years ago

@SunnnyGohil the producer should be run in the synchronous mode: https://github.com/weiboad/kafka-php#synchronous-mode

SunnnyGohil commented 6 years ago

@lcobucci i aleardy run producer in sync mode but i got error

lcobucci commented 6 years ago

Can you put your code here?

SunnnyGohil commented 6 years ago
require '../vendor/autoload.php';
    date_default_timezone_set('PRC');

     $config = \Kafka\ConsumerConfig::getInstance();
    $config->setMetadataRefreshIntervalMs(10000);
    $config->setMetadataBrokerList('127.0.0.1:9092');
    $config->setGroupId('test');
    $config->setBrokerVersion('0.10.1.0');
    $config->setTopics(['test']);
    $config->setOffsetReset('earliest');
    $config->setClientId(1);
    $consumer = new \Kafka\Consumer();
    $consumer->start(function($topic,$part,$message) use (&$partition){

        $part=(int)$partition;
        $st=@(time()+microtime());
        $data=(array)json_decode($message['message']['value']);
        $data['data']=(array)$data['data'];
                $GLOBALS['log']=$data;
                $config_producer = \Kafka\ProducerConfig::getInstance();
                $config_producer->setMetadataRefreshIntervalMs(10000);
                $config_producer->setMetadataBrokerList('127.0.0.1:9092');
                $config_producer->setBrokerVersion('0.10.1.0');
                $config_producer->setRequiredAck(1);
                $config_producer->setIsAsyn(FALSE);
                $config_producer->setProduceInterval(500);
                $producer = new \Kafka\Producer(
                    function() {
                        return [
                            [
                                'topic' => 'make',
                                'partId'=>0,
                                'value' => json_encode($GLOBALS['log']),
                                'key' => 'click',
                            ],
                        ];
                    }
                );
                $producer->send(true);
    });

this is my code

lcobucci commented 6 years ago

@SunnnyGohil setIsAsyn() does not put the consumer in synchronous mode, if you compare your code with the one in the link I've sent you you'll notice that there are differences in:

$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'make',
                'partId'=>0,
                'value' => json_encode($GLOBALS['log']),
                'key' => 'click',
            ],
        ];
    }
);

$producer->send(true);
SunnnyGohil commented 6 years ago

thanks @lcobucci i changes in my code but i got there new exception "Exception 'Kafka\Exception\Protocol' with message 'unpack failed. string(raw) length is 0 , TO N"

lcobucci commented 6 years ago

@SunnnyGohil are you using the stable release? If so, could you try with dev-master?

SunnnyGohil commented 6 years ago

yes @lcobucci i am using stable release. my project is on it, so i can't change it. can you give me another solution?

SunnnyGohil commented 6 years ago

i solv this by calling self API

Sevavietl commented 6 years ago

@SunnnyGohil I run into the same problem. Can you, please, explain what does it mean:

i solv this by calling self API

My error message is

Argument 2 passed to Kafka\\Protocol\\Protocol::unpack() must be of the type string, null given, called in /var/www/ads-export/vendor/nmred/kafka-php/src/Producer/SyncProcess.php on line 135

From what I was able to figure out the problem is that if you configure ConsumerConfig and ProducerConfig using the same metadata broker, one ends up overwritten. In my particular case I am faced with situation when I have Kafka\Socket inside Kafka\Producer\SyncProcess. So when calling syncMeta() method, $socket->read(4) returns null instead of string (Kafka\SocketSync returns string in this case):

$dataLen       = Protocol::unpack(Protocol::BIT_B32, $socket->read(4));

And as unpack() method expects string, but no null the script fail.

@lcobucci, can you, please, advise how I can overcome this problem.

Thank you in advance.

kevinmiao commented 6 years ago

dose partId is work?

dawood commented 5 years ago

@SunnnyGohil can you please tell me how did you solve this issue?

djklim87 commented 5 years ago

I has this problem too. And have no idea how i can solve it

Sevavietl commented 5 years ago

@dawood @djklim87 it was solved in aforementioned PR, but it never merged.

dawood commented 5 years ago

@Sevavietl yes i saw that, bad that PR did not get merged.

@djklim87 I was able to use producer from consumer using low level api provided. look at https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php

so, you need to use low level api which is used internally anyways to communicate. Let me know if you need more help.

lijunchen22 commented 4 years ago

run producer into consumer,can any body solve this issue.

dawood commented 4 years ago

@lijunchen22 refer to https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php

you would need to use low level api as mentioned in above file.

lijunchen22 commented 4 years ago

@dawood I try. but it doesn't work. Can you show me your code here?

dawood commented 4 years ago

@lijunchen22

<?
$data = [
    'required_ack' => 1,    
    'timeout' => '1000',
    'data' => [
        [
            'topic_name' => 'SOME_TOPIC',
            'partitions' => [
                [
                    'partition_id' => 0,
                    'messages'     => $eventsToSend,
                ],
            ],
        ],
    ],
];
Protocol::init('1.0.0');
$requestData = Protocol::encode(Protocol::PRODUCE_REQUEST, $data);
$socket = new Socket($kafkaBrokerHost, $kafkaBrokerPort);
$socket->setOnReadable(function ($data): void {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = Protocol::decode(Protocol::PRODUCE_REQUEST, substr($data, 4));
echo json_encode($result);
});
$socket->connect();
$socket->write($requestData);
lijunchen22 commented 4 years ago

@dawood it work,thanks a million