gklijs / schema_registry_converter

A crate to convert bytes to something more useable and the other way around in a way Compatible with the Confluent Schema Registry. Supporting Avro, Protobuf, Json schema, and both async and blocking.
Apache License 2.0
105 stars 39 forks source link

`EasyProtoRawEncoder`'s `encode` currently does not skip writing data that not part of protobuf schema #120

Closed hongbo-miao closed 1 month ago

hongbo-miao commented 1 month ago

Describe the bug

EasyProtoRawEncoder's encode currently does not skip writing data that not part of protobuf schema.

To Reproduce

Create a schema (Note inside it has temperature1 to temperature5)

curl --location 'https://confluent-schema-registry.internal.hongbomiao.com/subjects/production.iot.motor.proto-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
    "schemaType": "PROTOBUF",
    "schema": "syntax = \"proto3\"; package production.iot; message Motor { optional string motor_id = 1; optional string timestamp = 2; optional double temperature1 = 3; optional double temperature2 = 4; optional double temperature3 = 5; optional double temperature4 = 6; optional double temperature5 = 7; }"
}'

(Note besides temperature1 to temperature5, it has one extra temperature6)

use chrono::Utc;
use rand::Rng;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use schema_registry_converter::async_impl::easy_proto_raw::EasyProtoRawEncoder;
use schema_registry_converter::async_impl::schema_registry::SrSettings;
use schema_registry_converter::schema_registry_common::SubjectNameStrategy;
use serde_json::json;
use std::env::args;
use std::time::Duration;
use tokio::time;

fn create_producer(bootstrap_server: &str) -> FutureProducer {
    ClientConfig::new()
        .set("bootstrap.servers", bootstrap_server)
        .create()
        .expect("Failed to create producer")
}

fn generate_motor_data(motor_id: &str) -> serde_json::Value {
    let mut rng = rand::thread_rng();
    let temperature = rng.gen_range(10.0..100.0);
    json!({
        "motor_id": motor_id,
        "timestamp": Utc::now().to_rfc3339(),
        "temperature1": temperature,
        "temperature2": temperature,
        "temperature3": temperature,
        "temperature4": temperature,
        "temperature5": temperature,
        "temperature6": temperature
    })
}

#[tokio::main]
async fn main() {
    println!("Starting motor data Generator...");

    let bootstrap_server = args()
        .nth(1)
        .unwrap_or_else(|| "localhost:9092".to_string());
    let producer = create_producer(&bootstrap_server);

    let schema_registry_url = "https://confluent-schema-registry.internal.hongbomiao.com";
    let sr_settings = SrSettings::new(schema_registry_url.to_string());
    let encoder = EasyProtoRawEncoder::new(sr_settings);

    let motor_ids = vec!["motor_001", "motor_002", "motor_003"];
    let topic = "production.iot.motor.proto";

    println!("Sending data to Kafka topic: {}", topic);

    let mut interval = time::interval(Duration::from_secs(1));

    loop {
        interval.tick().await;

        for motor_id in &motor_ids {
            let sensor_data = generate_motor_data(motor_id);
            let json_bytes =
                serde_json::to_vec(&sensor_data).expect("Failed to serialize sensor data");
            let proto_payload = encoder
                .encode(
                    &json_bytes,
                    "production.iot.Motor",
                    SubjectNameStrategy::TopicNameStrategy(topic.to_string(), false),
                )
                .await
                .expect("Failed to encode with schema registry");

            match producer
                .send(
                    FutureRecord::to(topic)
                        .payload(&proto_payload)
                        .key(motor_id.as_bytes()),
                    Timeout::After(Duration::from_secs(1)),
                )
                .await
            {
                Ok(_) => {}
                Err((err, _)) => {
                    eprintln!("Failed to send data for motor {}: {}", motor_id, err);
                }
            }
        }
    }
}

Expected behavior

I expect EasyProtoRawEncoder's encode will only write temperature1 to temperature5 and skip writing temperature6 value to Kafka as it is not part of schema. However, it got written to Kafka as well.

Please correct me if I did wrong, thanks! 😃

gklijs commented 1 month ago

You're not reallty doing something wrong. The documentation might be a bit better. But the idea of the raw encoders is to 'just' do the bits directly related to schema registry. So appending the bytes with the magic byte, the registered schema reference, and the encoded message withing the schema. It does not do any validation in the provided bytes. I don't think encoding the JSON bytes is the proper approach, it's been a while I worked on the protobuf integration.

hongbo-miao commented 1 month ago

Thank you @gklijs for the guide! 😃

I fixed my code at https://github.com/hongbo-miao/hongbomiao.com/pull/20005/files

Now it is using the library prost (protobuf) instead of serde (json) to serialize data. And prost will throw error if the data is not matching the local proto file.

My AKHQ now can see Protobuf data correctly with corresponding schema in Confluent Schema Registry:

image

Reference:

I fixed mainly based on these two Python Protobuf examples. Hopefully this time the code is correct ☺️