tchiotludo / akhq

Kafka GUI for Apache Kafka to manage topics, topics data, consumers group, schema registry, connect and more...
https://akhq.io/
Apache License 2.0
3.34k stars 646 forks source link

package custom serde classes #149

Closed jorgheymans closed 3 years ago

jorgheymans commented 4 years ago

We use protobuf as message format, and have custom serializers for this. Is the only way to include these serializers to build kafkahq from source and include our jar somehow in the gradle build ?

tchiotludo commented 4 years ago

Hey @jorgheymans ,

It will need to code on KafkaHQ to handle this case, and will need some precision about the need.

How to you see this ?

Feel free to give me some advice on this !

jorgheymans commented 4 years ago

Thanks ! I was thinking this could be done on the level of the consumer because it has standard configuration and properties for this value.deserializer (http://kafka.apache.org/documentation/#consumerconfigs) . Ofcourse it could be more flexible per topic etc but then serde will end up in your problem space with interfaces to maintain, not sure you would want this. So maybe all that is needed is documentation how to easily package a custom jar ? Looking at the exec jar, all it's jars are expanded in package directories, that makes it slightly more difficult to add custom ones.

jorgheymans commented 4 years ago

Looked into this a bit deeper

This, together with manually copying my required classes for proto serde into kafkahq.jar, makes that i have something working. But it's cumbersome.

So if maybe you would agree that in first phase, custom serde should be done via key and value serde properties of the Kafka Consumer i could clean up this part of my patch and propose a PR.

jorgheymans commented 4 years ago

@tchiotludo any thoughts on this ?

tchiotludo commented 4 years ago

@jorgheymans Will need to have a deeper look on that part.

If you want to use a custom deserializer value.deserializer config, it need from kafkahq part to remove deserializer from the whole codebase, since kafka ignore the config if I pass one:

            if (valueDeserializer == null) {
                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
                this.valueDeserializer.configure(config.originals(), false);
            } else {
                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                this.valueDeserializer = valueDeserializer;
            }

I don't know what is the impact for now, but I have doubt that can work. Using Object will lead also to ugly cast on the whole code base.
I'm not really sure how to handle it right now. First feeling is to allow to add a custom config on kafkahq (not on kafka config) that allow to implement this kind of class :

package org.kafkahq;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class ValueDeserializer implements Deserializer<byte[]> {

    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public byte[] deserialize(String topic, byte[] data) {
        return "{\"a\": 1, \"b\": 1}".getBytes();
    }

    @Override
    public byte[] deserialize(String topic, Headers headers, byte[] data) {
        return "{\"a\": 1, \"b\": 1}".getBytes();
    }

    @Override
    public void close() {

    }
}

Mostly the same than Kafka for that force to return byte[]

Not really for now.

xakassi commented 4 years ago

Hi, @tchiotludo ! Our team also use Protobuf as message format, and we really like your AKHQ, it will be perfect to have a possibility to pass custom deserializer class (or .proto file as it's done in Kafdrop). Do you plan to add this feature?

tchiotludo commented 4 years ago

I think this feature is more than welcome, even more since confluent include support for Protobuf on their schema registry.

Looking quickly to Kafdrop and .proto files seems to be a better options than a custom jar packaging.

Maybe another try catch case here, looping from proto files to try deserialize could do the trick https://github.com/tchiotludo/akhq/blob/0a882c1979d6b9daf301e690494fa301c01f6e6c/src/main/java/org/akhq/models/Record.java#L100-L113

But to be honest, I not a pro with protobuf at all, never used personally. PR from a guy who have better knowledge than me are clearly welcome on that.

@jorgheymans what do you think about that (simple proto / try catch deserializer) ? I've the feeling it's not perfect but it will not break the application changing from byte[] to Object

jorgheymans commented 4 years ago

We have a fork running inhouse that adds proto serde based on protobin descriptiors. I will check if we're allowed to clean it up and submit a PR.

xakassi commented 4 years ago

Hi, @jorgheymans ! Any updates? Are you going to submit a PR? Or should I start an investigation and propose a PR? Sorry for the rush, but we really need this functionality ASAP.

xakassi commented 4 years ago

Some additional thoughts about this theme. I think it will be nice to have a possibility to choose deserializer for each topic when you want to read the topic. But I'm not sure how it could be done beatiful on UI part. Because a topic is read immediatly when you click on magnifier icon. So maybe a pop up should appear when you click on a magnifier?

tchiotludo commented 4 years ago

Maybe a simply try catch can do the trick, if a custom serde, we try to deserialize with all of them. Most not be a real cpu intensive task on 50 message per page.

xakassi commented 3 years ago

Hi, @tchiotludo ! Seems no one working on it. I'm going to start an investigation, and I hope will propose a PR soon.

tchiotludo commented 3 years ago

thanks @xakassi :+1: :pray: