tychedelia / kafka-protocol-rs

Rust Kafka protocol
Apache License 2.0
58 stars 22 forks source link

incorrect implementation of CreateTopics specification #84

Open pdeva opened 1 week ago

pdeva commented 1 week ago

the current implementation uses a map with topic names as keys:

pub struct CreateTopicsRequest {
    /// The topics to create.
    ///
    /// Supported API versions: 0-7
    pub topics: indexmap::IndexMap<super::TopicName, CreatableTopic>,
}

however, this prevents requests which have duplicate topic names.

so this test for example will fail here: https://github.com/segmentio/kafka-go/blob/a8e5eabf4a90025a4ad2c28e929324d18db21103/createtopics_test.go#L77

it creates 3 topics, out of which 2 are duplicate. it then expects an InvalidRequest error code for the duplicated topic and neither of the duplicated topics is created.

however, due to current implementation, the server will never receive the duplicate topic, since the map will just end up replacing the data of the duplicated topic, which will make server implementations create the topic instead of returning an error.

tychedelia commented 1 week ago

Wanted to confirm the Java behavior, and this is the underlying data structure used: ImplicitLinkedHashCollection.

pdeva commented 1 week ago

maybe i have an older version checked out, i see it implementing ImplicitLinkedHashMultiCollection Screenshot 2024-09-20 at 11 21 54 AM

pdeva commented 1 week ago

confirming that even in the latest kafka commit, topics are defined as

public static class CreatableTopicCollection extends ImplicitLinkedHashMultiCollection<CreatableTopic>

the MultiCollection part allows it to hold multiple topics with same same

pdeva commented 1 week ago

i dont think anything in the wire protcol use maps https://kafka.apache.org/protocol#protocol_api_keys

the only collection seemed to be used is a list.

the java implementation replaces that with a MultiCollection to make it easier to lookup via key and detect duplicates, but still keeps the semantics of a list. https://github.com/apache/kafka/blob/3783385dc1cc27246cf09ec791e4b43f577a26ea/generator/src/main/java/org/apache/kafka/message/FieldSpec.java#L527:L529

this is different from the behavior in this lib.

rukai commented 3 days ago

This sounds like a problem! I need to verify if this does actually affect all map types in the protocol.

But assuming for a second that it does, I wonder how best to solve this. We could just replace all of our map types from IndexMap<T> to IndexMap<Vec<T>>. But that adds an extra allocation for every element. We could avoid that by using https://docs.rs/smallvec/latest/smallvec/ such that it stores on the stack for 1 value, but for 2 values it stores on the heap.

However, such an API would be quite error prone, we would need to manually check the length of the list before using it.

Maybe what we want is:

enum MapEntry<T> {
  Single(T)
  Duplicates(Box<[T]>)
}

impl MapEntry {
  fn get(&self) -> Result<T, Vec<T>> {
    match self {
      MapEntry::Single(x) => Ok(x),
      MapEntry::Duplicates(x) => Err(x),
    }
  }
}

And then use IndexMap<MapEntry<T>>

Actually, maybe we want to simplify that to: IndexMap<Result<T, Vec<T>> And just document prominently what on earth that is about.

And this all assumes that it is always an error to have a duplicate map key which might not be true in some cases...

Diggsey commented 3 days ago

I think short term the best option would be to make a thin wrapper around a Vec<T> which adds methods to lookup by key (and does a linear search of the vec in that case).

Long term you could think about forking IndexMap to create a IndexMultiMap which would natively support multiple values per key.

tychedelia commented 2 days ago

I think we should go for the easy/ugly Vec fix now for correctness but block any 1.0 release on having a proper multi map api similar to the Java reference.

rukai commented 8 hours ago

Thinking about the ways in which the protocol is actually used I think that replacing IndexMap with Vec might actually be the best for performance.

Since the kafka protocol very rarely has us request data that we dont care about, when processing a response most of the time we just call .into_iter() and then iterate over all the elements, so spending time constructing a hashmap instead of a vec is a waste.

And when constructing a hashmap for encoding we certainly dont benefit from being able to perform lookups. I realized constructing values does actually benefit from the lookups, when creating a single batch request for multiple application side requests we want to be able to quickly lookup existing topics so that we can insert into that existing entry without creating a duplicate.

Diggsey commented 5 hours ago

@rukai I think it would still be good to use a wrapper around Vec (with zero-cost conversions) so that it can be changed in a backwards compatible way, and have additional methods provided, like get by key.