gklijs / schema_registry_converter

A crate to convert bytes to something more useable and the other way around in a way Compatible with the Confluent Schema Registry. Supporting Avro, Protobuf, Json schema, and both async and blocking.
Apache License 2.0
102 stars 38 forks source link

Schema resolution slowing down encoding #117

Open baltendo opened 1 week ago

baltendo commented 1 week ago

Describe the bug We heavily use Kafka, Avro and the Schema Registry with Java. I wanted to implement now a service in Rust. The service is running fine but producing a message is very slow and I found the schema resolution to be the slow part. I read about the schema resolution and I wonder why it is called during encoding. As far as I understood it is needed during decoding when the schema is different than the one used during encoding.

We are using a quite big schema with many records that are used multiple times so they become named references after the first definition. Unfortunately, I cannot just attach the schema. Its already mentioned in avro-rs that this path is slow:

image

To Reproduce Steps to reproduce the behavior: EasyAvroEncoder.encode_struct()with a schema with many named references

Here the .resolve() method is called and I don't understand why (see comment):

pub(crate) fn item_to_bytes(
    avro_schema: &AvroSchema,
    item: impl Serialize,
) -> Result<Vec<u8>, SRCError> {
    match to_value(item)
        .map_err(|e| {
            SRCError::non_retryable_with_cause(e, "Could not transform to apache_avro value")
        })
        // not sure why schema resolution should happen on serialization/writing
        .map(|r| r.resolve(&avro_schema.parsed))
    {
        Ok(Ok(v)) => to_bytes(avro_schema, v),
        Ok(Err(e)) => Err(SRCError::non_retryable_with_cause::<SRCError>(e, "Failed to resolve")),
        Err(e) => Err(e),
    }
}

I tried to write a test. The child struct could be duplicated to get more named references:

    #[test]
    fn named() {
        #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
        pub struct Parent {
            #[serde(rename = "child1")]
            pub child1: Option<Child>,
            #[serde(rename = "child2")]
            pub child2: Option<Child>,
            #[serde(rename = "child3")]
            pub child3: Option<Child>,
        }

        #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
        pub struct Child {
            #[serde(rename = "name")]
            pub name: Option<String>,
        }

        let writer_schema = r#"{
  "type": "record",
  "name": "Parent",
  "fields": [
    {
      "name": "child1",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Child",
          "fields": [
            {
              "name": "name",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      ]
    },
    {
      "name": "child2",
      "type": [
        "null",
        "Child"
      ],
      "default": null
    },
    {
      "name": "child3",
      "type": [
        "null",
        "Child"
      ],
      "default": null
    }
  ]
}"#;

        let schema = AvroSchema {
            id: 6,
            raw: String::from(writer_schema),
            parsed: Schema::parse_str(writer_schema).unwrap(),
        };

        let item = Parent {
            child1: Some(Child { name: Some("child1".to_string()) }),
            child2: Some(Child { name: Some("child2".to_string()) }),
            child3: Some(Child { name: None }),
        };

        let now = Instant::now();
        let result = crate::avro_common::item_to_bytes(&schema, item);
        let elapsed = now.elapsed();
        println!("writing took: {:.2?}", elapsed);
        let bytes = result.unwrap();

        assert_eq!(bytes.len(), 25);
    }

Here is a screenshot of the running service from the IDE with some additional logs "Sending" and "Sent" around the EasyAvroEncoder.encode_struct() plus .await().

image

Expected behavior I expect it to be faster. When I remove all the data related to named references (because I have many nullable fields so its possible) then it is much faster. The following screenshot shows first sending of a big event with many named references and then a small event with no named references:

image

Options

gklijs commented 1 week ago

Can you ensure it's not just the first one that is slow? Next ones should be a lot faster because of the cache? I think all three options are out of scope for this library. As I understand correctly, you want to bypass schema registry for the first two, in which case this library is just overhead, and it's much better to use avro-rs directly. Or seeding up, which is the third point. Maybe you can make a flame graph, see where it's slow, and there might be a quick win?

martin-g commented 1 week ago

Hi,

Yesterday I read https://medium.com/@saiharshavellanki/building-a-blazing-fast-kafka-ingestion-pipeline-in-rust-with-protobuf-1cdc2f768c5f by @saiharshavellanki . It says:

Sync vs. Async Decoding: By adding metrics, we identified the main bottleneck in the decoding process. The sync
 module of the schema_registry_converter crate doesn’t cache schemas, leading to performance issues. 
Switching to the async module, which includes built-in caching, resolved this and significantly improved throughput.

Could that be the issue ?

gklijs commented 1 week ago

Interesting, the sync module should also do some caching, but it might by before actual parsing the schema.

baltendo commented 1 week ago

Yes, I can ensure that the problem stays and I didn't consider the first event in the screenshots.

baltendo commented 1 week ago

I use the EasyAvroDecoder so I am using already the async implementation where the article claims that it is faster.

saiharshavellanki commented 1 week ago

@baltendo the article mentions about protobuf decoder, not the AvroDecoder

gklijs commented 1 week ago

It's been a while I wrote the code. I think there could be an additional option to bypass resolving. Which can be used if you are sure the schema you are using for producing the data is exactly the same as the schema in schema registry. Does that sound like somethign that makes sense and would give additional performance?