davidfowl / BedrockFramework

High performance, low level networking APIs for building custom servers and clients.
MIT License
1.05k stars 154 forks source link

Question: Working on Kafka protocol, what're good ways to work with a protocol with almost 400 payload types? #30

Closed ChadJessup closed 3 years ago

ChadJessup commented 4 years ago

First off, this is a super rad API. I was able to parse raw messages to and from Kafka in an hour or two from scratch, without any of the normal socket/buffer manipulation that'd normally have to be done.

I've not attempted such performance-oriented protocol parsing code like this before, and I'm wondering if there are any good open-source examples, code, blogs, whitepapers, etc of low/amortized allocation protocol parsing code that deals with a large amount of payload types?

I've done quite a bit of googling on the subject, but I haven't come across something that deals with handling hundreds of distinct payloads...

The full Kafka API surface area is ~370 versioned payloads (some redundancy, but a lot of them are versioned) and ~20 primitives - and I know that my current approach of new'ing up typed request/response pairs isn't an ideal approach for various reasons. Especially, when some Kafka libraries can hit ~1 million msg/sec, and the requests often need to be kept around until the responses are returned.

I can think of multiple ways to try and solve this, but I can also think of multiple downsides to most of these approaches, and any help addressing maintainability alongside performance would be awesome.

As a followup, would Bedrock.Framework.Experimental be a good place to try novel approaches to common issues in this space?

davidfowl commented 4 years ago

Wonderful! I'm looking forward to that contribution!

As a followup, would Bedrock.Framework.Experimental be a good place to try novel approaches to common issues in this space?

Yes, make a kafka folder and lets start iterating on it. It doesn't need to be fast from the get go. We'll learn more as we write more parsers. Then you can write those whitepapers yourself πŸ˜‰

jkotalik commented 4 years ago

@ChadJessup I'd be interesting in giving technical guidance as well. I encourage you to do the same!

jzabroski commented 4 years ago

How does PluralSight do it in Scala? They open sourced Hydra which is built on top of Kafka. I can't really read Scala code but that might be a place to think of ideas.

ChadJessup commented 4 years ago

How does PluralSight do it in Scala? They open sourced Hydra which is built on top of Kafka. I can't really read Scala code but that might be a place to think of ideas.

I'll look into that project. I've been referring to librdkafka when docs and Wireshark fail me. But raw C code has some benefits on this front that I don't believe can be leveraged in C#.

AndyPook commented 4 years ago

Hi, just started having a go at this. Got some basic req/res pipelining. A handful of message types. Can produce a message. Yay!

There are only 40ish requests. It's the versioning that is a pain. It appears that librdkafka only really deals with certain API versions. So I've just been tying into those. Although the versions are generally additive, so was thinking some if(v>X) would be sufficient to start. Then the readers can be static. The biggest pita are all the prefixed blocks. I've been using a memory.shared based IBufferWritter to buffer the block before writing the length/CRC prefix and copying the buffer. It'd be nice if there was a built-in pattern for prefixes.

The other "problem" is that the Kafka protocol doc is spread around various places and the protocol bnf is somewhat consistent (wirehark to the rescue!)

A long way to go to get consumer group orchestration and message consuming to work. Then higher level client facades. Then.... ...

I'll stick what I have on github soon

AndyPook commented 4 years ago

https://github.com/AndyPook/A6k.Kafka

Just a start. Only works with single broker instances. No Record Batching yet, it would need per topic/partition queues. Works ok with single req/res. Needs some "manager/orchestrator" types for JoinGroup/SyncGroup chat.

TestConsole is a dumb demo console app for "testing" against a kafka instance (I use single-node\docker-compose.yml to run the test instance).

ChadJessup commented 4 years ago

Nice! You and I have gone down some very similar paths (I'll be putting up a preliminary PR here soon) - even down to our docker-compose setup. :)

As for the prefixes, that was, and always has been, one of the biggest pains I've dealt with when parsing protocols like this. The first bytes usually being a size is great when reading, but with writing I've always had a difficult time trying to efficiently calculate the size and then write the payload without double-scanning, or having risky code like:

        private const int constantPayloadsize =
            sizeof(short) // acks
            + sizeof(int) // timeout
            + sizeof(int) // topics array count
            + sizeof(short) // topic array name length
            + sizeof(int) // single partition index
            + sizeof(long) // offset
            + sizeof(int) // message set size
            + sizeof(int) // message size
            + sizeof(int) // key length size
            + sizeof(int) // value length size
            ;

        public override int GetPayloadSize()
        {
            return constantPayloadsize
                + this.Topics.Sum(t => t.TopicName.Length)
                + (this.keyLength ?? 0)
                + (this.valueLength ?? 0);
        }

To try and deal with this pain, I just spent a day or two prototyping out a generic PayloadWriter class that let's me deal with this somewhat common scenario:

        public void WritePayload(ref PayloadWriterContext context)
        {
            var pw = context.CreatePayloadWriter()
                .StartCalculatingSize("message")
                    .StartCrc32Calculation()
                        .Write(this.Magic)
                        .Write(this.Attributes)
                        .Write(this.Key, this.KeyLength)
                        .Write(this.Value, this.ValueLength)
                    .EndCrc32Calculation()
                .EndSizeCalculation("message");
        }

It's letting me cleanup a significant amount of code/extension methods, gives me the 'shape' of the payload better, chaining these PayloadWriters allows reusable payload chunks, and should make versioning easier to deal with. Custom protocol needs are extension methods.

An entire schema would look like this combined (this is actually spread across multiple methods, but doesn't need to be:

Produce Request Version 0:

var pw = new PayloadWriter(isBigEndian: true));
    .StartCalculatingSize("totalSize")
      .Write(message.ApiKey);
      .Write(message.ApiVersion);
      .Write(correlationId);
      .WriteNullableString(ref clientId) // extension method for unique protocol
      .Write(message.Acks)
      .Write(message.Timeout)
      .WriteArray(message.Topics, this.WriteTopic) // this.WriteTopic is actually another method, but it writes unique protocol array delimiter
        .WriteString(topic.TopicName)
        .WriteArray(topic.Partitions, this.WritePartition)
          .Write(partition.Index)
          .StartCalculatingSize("messageSetSize")
            .Write(offset)
            .StartCalculatingSize("message")
              .StartCrc32Calculation() // extension method
                  .Write(message.Magic)
                  .Write(message.Attributes)
                  .Write(message.Key, message.KeyLength)
                  .Write(message.Value, message.ValueLength)
              .EndCrc32Calculation()
            .EndSizeCalculation("message");
          .EndSizeCalculation("messageSetSize");
    .EndSizeCalculation("totalSize");

pw.TryWritePayload(out ReadOnlySequence<byte> payload);

@AndyPook - Thoughts? Would something like this have helped with your efforts?

AndyPook commented 4 years ago

ha! fluent all the things!

Yeah, I probably would have used that. Still, looks like you'd need to take care with the Start/End. And Format-Document would probably mess with the indentation? And you prob need more options for all the int/long/varint/varintlong/signed-varint shenanigans. Though that's just more extensions (like for WriteNullableString), right?

I've ended up with a bit of a pattern that "works" (kinda). I've gone down the extension-method all the things route. I was hoping that once the API read/write stuff was "done" then we could just bury it under the covers. As long as it's "readable/debuggable enough", right? it's just a pita to begin with. I do like fluenty approach though :)

Do you think there's something to be done on the read side too? All the if(!TryRead) return false noise, is a bit tiresome (though I understand the need)

AndyPook commented 4 years ago

I managed to get Fetch to work!!! The doc lies!! (at least there's plenty of room for interpretation).

RecordSet is tricky. It contains arrays but some of them has no count prefix. You just keep reading until you've consumed the specified number of bytes. Some things are varint, some are signed-varint. This was made super hard due to WireShark also not understanding the Fetch response. Just says "Malformed Packet" 😬

Plenty more to do here: compression; crc checking (this one uses crc32, not crc32c used elsewhere). This only works with a single broker, against a topic with a single partition (or where you can guarantee all the lead replicas are on that broker).

Next step is using cluster metadata (=> multiple connections) and using JoinGroup properly. Which leads to the same kind of pre-fetching, internal queue malarkey that librdkafka has.

btw: "toppar" (seen all over librdkafka) means topic-partition tuple thing. Used as a key to queues and some internal housekeeping.

Anyway... getting Fetch to work feels like success πŸ˜€

AndyPook commented 4 years ago

added super simple (bordering on willfully dumb) Consumer. Discovers which broker the topic is hosted on, makes Fetch req to that/those brokers. Has ZERO understanding of consumer groups.

The DI makes new'ing a Consumer a little awkward compared to Confluent. Suggestions welcome.

AndyPook commented 4 years ago

To speak to the actual question asked (what to do about 400 payloads). Still a good question...

It seems that a lot of the versions for a lot of Requests are actually identical in format. In those cases it really just tells the broker what Response version to reply with. The Response versions just add extra fields (mostly). So just some if(version>x) conditional decoding ought to work.

However, how to get that version into the IMessageReader? There's not "context" concept, so we'd need to new up a new reader for each message?

Also, it seems that librdkafka will use the version just before TAGGED_FIELDS are introduced.

NOTE: I don't think https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#supported-protocol-versions is completely up-to-date. eg I see it sending v11 FETCH requests when that page says it only support v4.

AndyPook commented 4 years ago

I think what I have deals with the wire protocol in an "ok" fashion.

The problem now is dealing with the higher level coordination (eg Metadata and Consumer Groups) which isn't really a bedrock thing. Kafka is "interesting" in that the clients must be pretty smart and do quite a lot of client coordination while only having a req/resp with the broker. The brokers do not take any responsibility. It is even up to one of the clients in the group to assign partitions to the rest of the members. I guess this does mean that clients can then do "interesting" things without being constrained by broker opinions. But it does make writing good clients fairly tricky.

One thing I have found difficult is finding good, definitive doc for various things. It requires reading a lot of cwiki proposals which it is unclear if they are current. Plus tracking all the KIPS. I've been trying to follow some of the other clients (librdkafka, scala, a few in golang). They all have slightly different interpretations.

But then, if it was easy, everyone would be doing it πŸ˜„

ChadJessup commented 4 years ago

Just an fyi, I'll be a bit slow to respond here due to vacation. I also noticed the Kafka docs are hard to follow, and since the various clients only having partial implementations, it's doubly difficult to track down the right direction.

The Java client is pure Java, whereas the rest seem to be using librdkafka, which isn't as full featured as the Java client. :(

One thing I'm not sure on, is if Bedrock should contain an entire Kafka client, or just enough of the primitives and helper code to try and benefit other protocols? Perhaps @davidfowl has more thoughts on that aspect?

AndyPook commented 4 years ago

There are some decent looking pure go libraries out there. I've been looking at https://github.com/Shopify/sarama (because a I think they actually use it) as much to learn a bit about golang as anything else. And https://github.com/confluentinc/kafka has some useful bits under the "clients" folder. But they are missing await so it gets a bit noisy with all the "listeners" and callbacks.

My assumption was that Bedrock was a bit of a playground for ideas that may (or may not) get pulled into System. (or Microsoft.) packages. Experimental being some "demos" to see if the patterns work out. Sort of the reason that I put my effort in a separate repo that just takes a package ref.

(maybe/probably making huge/inappropriate assumptions here)

ChadJessup commented 4 years ago

@AndyPook - can't add you directly to the PR for some reason. I'd love for some of your feedback on #68

henningms commented 4 years ago

Has there been any more progress on the Kafka initative here? 😁 Very interesting!

AndyPook commented 4 years ago

Hey, thanks for checking in...

Both Tim and I are no longer working for clients that use Kafka. So a lot of the motivation has gone. Sad as I was learning things, but other pressures have pushed this way down on "the list".

Speaking only for my own work, https://github.com/AndyPook/A6k.Kafka made some decent progress. It can talk to a broker, process responses etc. I was working on the group management part. All of which was really "just code" nothing to do with bedrock. Trying to understand the Kafka message flows was a bit of a challenge :) I was in the process of testing the Consumer Group bits which would have meant it was close to having a "real" consumer that would have played nice with the Confluent.Kafka clients. I mean you could have had a6k and confluent clients in the same consumer group. Quite exciting!

From a bedrock pov, I think the approach was pretty reasonable. The IValueTaskSource stuff is interesting. Though it need pushing through to other parts. Otherwise, once you have an appropriate message loop working and you can move up an abstraction layer or two then it's really just "normal" coding. It's amazing being able to write that sentence. Being able to write a handful of classes to deal with raw sockets is a great sign that the underlying framework is in good shape.

The bits that were frustrating was down to the Kafka protocol. It needs a lot of blocks to be written so you can scan back and gzip or generate a crc, then write that via bedrock. It makes the code look quite messy. There isn't a "nice" pattern (at least there wasn't at that point).

Feel free to take whatever you want from a6k. Feel free to ask questions. Maybe I'll refind some motivation :)

One other thing that did come out of this small experience was an even greater respect for anyone that runs an OSS project. To anyone that reads this... please be nice to the maintainers!!