arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.08k stars 263 forks source link

Gzipped headers are being corrupted with producev() method #334

Closed maks-rafalko closed 4 years ago

maks-rafalko commented 4 years ago

Issue description

We are using MessagePack as a serializer (instead of standard JSON) and Gzip for compressing to reduce the message size, so basically we do the following

gzencode(msgpack_pack($message));

The message is sent (in the payload) and read by consumer correctly.

But when we serialize and compress headers in the same way, they are being corrupted.

$topic->producev($partition, 0, $payload, $key, $headers);

This boils down to the following minimal reproducing case:

$headers = [
    'raw_123' => '123',
    'msgpack_pack_123' => msgpack_pack('123'),
    'gzencode_123' => gzencode('123'),
    'gzencode_msgpack_pack_123' => gzencode(msgpack_pack('123')),
];

$topic->producev($partition, 0, $payload, $key, $headers);

The sent headers from PHP look like:

headers_php

But when the consumer reads the message, headers are corrupted:

headers_test_corrupted

Note that corrupted are only gzipped headers. Raw and serialized with MessagePack headers work as expected.

When I open the message and its headers in Kafka Tool - they are already corrupted immediately after persisting:

kafka_tool

That means that there is no issue in the Consumer, but between PHP Producer and Kafka (or in Kafka itself).

I don't have any knowledge in C, so can't debug it farther. Any questions, please let me know.

Related to https://github.com/php-enqueue/enqueue-dev/issues/1005

Steveb-p commented 4 years ago

I've just ran tests with the following code:

--TEST--
Bug 334 - Gzipped messages are being corrupted in headers with producev() method
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';

$conf = new RdKafka\Conf();

$conf->set('auto.offset.reset', 'smallest');
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));

$producer = new RdKafka\Producer($conf);

$topicName = sprintf("test_rdkafka_%s", uniqid());
$topic = $producer->newTopic($topicName);

for ($i = 0; $i < 10; $i++) {
    $topic->producev(0, 0, gzencode('123'), null, []);
    $producer->poll(0);
}

while ($producer->getOutQLen()) {
    $producer->poll(50);
}

// Make sure there is enough time for the stats_cb to pick up the consumer lag
sleep(1);

$conf = new RdKafka\Conf();

$conf->set('auto.offset.reset', 'smallest');
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$topicName]);

while (true) {
    $msg = $consumer->consume(15000);

    if (!$msg || RD_KAFKA_RESP_ERR__PARTITION_EOF === $msg->err) {
        break;
    }

    if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
        throw new Exception($msg->errstr(), $msg->err);
    }

    var_dump(gzdecode($msg->payload));
}

--EXPECT--
string(3) "123"
string(3) "123"
string(3) "123"
string(3) "123"
string(3) "123"
string(3) "123"
string(3) "123"
string(3) "123"
string(3) "123"
string(3) "123"

...and data was properly decoded (apart from having a librdkafka error at the end (Local: Timed out), but that is irrelevant. Ran test against:

=====================================================================
PHP         : /usr/bin/php 
PHP_SAPI    : cli
PHP_VERSION : 7.4.1
ZEND_VERSION: 3.4.0
PHP_OS      : Linux - Linux steveb-Y700 5.4.6-2-MANJARO #1 SMP PREEMPT Tue Dec 24 15:55:20 UTC 2019 x86_64
INI actual  : /home/steveb/PhpstormProjects/php-rdkafka/tmp-php.ini
More .INIs  :   
CWD         : /home/steveb/PhpstormProjects/php-rdkafka
Extra dirs  : 
VALGRIND    : Not used
=====================================================================
TIME START 2020-01-04 12:14:09
=====================================================================

EDIT: I just noticed that what's actually happening is headers becoming corrupted, not the message itself. And yes, if headers are passed as an array of gzip encoded strings (binary strings) they become garbage. I'm looking if maybe it's an expected behavior of Kafka itself (headers should be key-value strings afaik, similar to http protocol headers).

maks-rafalko commented 4 years ago

@Steveb-p

var_dump(gzdecode($msg->payload));

I'm sorry if it wasn't clear from the ticket description (I just changed the wording) - the issue is with headers, not with the message (payload) itself.

Please look at the screenshot and $headers array from the first message.

maks-rafalko commented 4 years ago

I'm looking if maybe it's an expected behavior of Kafka itself

Thank you!

headers should be key-value strings afaik, similar to http protocol headers

If this is the case, then there is an issue with php-enqueue/enqueue-dev where serializer is used incorrectly (in this case I will close this ticket and we will discuss it in https://github.com/php-enqueue/enqueue-dev/issues/1005

UPD: please note that when I serialize headers with MessagePack - it works as expected

Steveb-p commented 4 years ago

headers should be key-value strings afaik, similar to http protocol headers

If this is the case, then there is an issue with php-enqueue/enqueue-dev where serializer is used incorrectly (in this case I will close this ticket and we will discuss it in php-enqueue/enqueue-dev#1005

I'm looking through Kafka docs and anything related - at first glance it seems that there is no limitation and Kafka should handle header value as a byte string LINK:

public interface Header {

   String key();

   byte[] value();
}

At this point I'm unsure in which library exactly the issue really is. Maybe there some issue with message being encoded a second time in the transport (librdkafka properly handles this, see configuration LINK).

I'd assume we have some type conversion near producev method implementation until proven otherwise.

EDIT: We do have some code that handles headers specifically. There even is some function call that looks like tries to convert whatever value we have as a string, which might result in value becoming corrupted: https://github.com/arnaud-lb/php-rdkafka/blob/master/topic.c#L591

nick-zh commented 4 years ago

@maks-rafalko thx for the report i'll have a look. out of curiosity, might i ask why you are not using the compression that kafka already offers? @Steveb-p thx for helping out, i'll check if that line is the cause of this

nick-zh commented 4 years ago

As @Steveb-p already said, we are casting everything to string during produce: https://github.com/arnaud-lb/php-rdkafka/blob/8a7b9e74b19268b76e43cf1521ba40d3866cfa16/topic.c#L591 same goes for consuming: https://github.com/arnaud-lb/php-rdkafka/blob/8a7b9e74b19268b76e43cf1521ba40d3866cfa16/message.c#L80 I totally agree, that we should comply to the Kafka definition and treat it as byte.

dirx commented 4 years ago

Autodetecting header value length might also be a problem (null byte?):

https://github.com/arnaud-lb/php-rdkafka/blob/20fb00c1c7d04edf198809429e6208f3ee69570a/topic.c#L602

arnaud-lb commented 4 years ago

@dirx: Good catch! This must be the issue.

maks-rafalko commented 4 years ago

Closing as PR is merged. Thank you!