cploutarchou / tiny_kafka

This repository contains Rust-based implementations of a Kafka producer and a Kafka consumer. It leverages the `rdkafka` library to communicate with Kafka and manage the production and consumption of messages.
MIT License
1 stars 0 forks source link

`tiny_kafka` ruins the value before sending to Kafka server #2

Open amab8901 opened 1 month ago

amab8901 commented 1 month ago

Problem

You have different versions in crates.io vs Github. Crates.io seems like a later version. So probably need to update the Github version.

The crates.io version has a problem in this code:

    pub async fn send_message(&self, topic_name: &str, message: Message) {
        let json_payload = to_string(&message.value).expect("Failed to serialize message");

        let delivery_status = self
            .producer
            .send(
                FutureRecord::to(topic_name)
                    .payload(&json_payload)
                    .key(&message.key),
                Duration::from_secs(0),
            )
            .await;

        // Log the status of the message delivery.
        match delivery_status {
            Ok((partition, offset)) => {
                info!(
                    "Message with key {} delivered to partition {} at offset {}",
                    message.key, partition, offset
                );
            }
            Err((error, _message)) => {
                error!(
                    "Failed to deliver message with key {}: {}",
                    message.key, error
                );
            }
        }
    }

I'm trying to send Line Protocol string to Kafka so that Kafka can send it to InfluxDB. I insert the value exactly as it should be in the request body, into the value field of Message, which I insert into message argument in send_message. But the code changes this string value into something else (by adding backslashes and quotes for example) in this line:

let json_payload = to_string(&message.value).expect("Failed to serialize message");

When this altered string value goes to Kafka, then Kafka is unable to send it to InfluxDB because the string value has changed so that it's no longer valid Line Protocol syntax.

I tried to send PR but since the Github version is old relative to the Crates.io version, I'll instead add the requested change below:

Solution

Replace this:

        let json_payload = to_string(&message.value).expect("Failed to serialize message");

...with this:

let json_payload = message.value;
cploutarchou commented 1 month ago

Hi @amab8901 ,

Thank you for bringing this to my attention. I appreciate your detailed explanation and suggested solution.

I wanted to let you know that I am currently working on updating the GitHub repository to match the latest version on crates.io. This will include addressing the issue you've described with the send_message function. Your suggested change makes sense, and I will ensure it is included in the update.

You can expect the updated version to be pushed to GitHub within the next few days. There will be no need to use any other Kafka dependencies.

Thank you for your patience and contribution.