LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
470 stars 75 forks source link

Question: How to overwrite prefix of internal topics (application id fails) #253

Closed DatStorm closed 1 year ago

DatStorm commented 1 year ago

Hey.

First of thanks for a lot of great work!

Description

We have a problem with our internal topics they do not seem to follow the pattern described here: https://docs.confluent.io/platform/current/streams/developer-guide/manage-topics.html image

We are using Confluent cloud and to ensure no clashes between topics we have created an application id prefix that should be applied to all internal topics but this does not seem to be the case.

How to reproduce

  1. Start the example application
  2. Look at the initializing logs and see that internal stream topics are not prefixed with application id but with KSTREAM-MAPVALUES-ASYNC-0000000008-request

image

Checklist

More info can be provided if needed.

Please provide the following information:

LGouellec commented 1 year ago

Hi @DatStorm,

Can you share your topology please ?

Best regards,

DatStorm commented 1 year ago

Sure.

Our config

       var config = new StreamConfig<StringSerDes, StringSerDes>
            {

                ClientId = "<client_id>",
                ApplicationId = "<application_id>", // <--- Not being prefixed.
                Debug = "security,broker,topic,msg,consumer,cgrp,topic,fetch",
                BootstrapServers = "confluent.cloud....",
               // login removed...
                SaslMechanism = SaslMechanism.Plain,
                Logger = LoggerFactory.Create(builder => builder.AddLog4Net()),
                MetricsRecording = MetricsRecordingLevel.DEBUG
            };

        return config;

Our topology:

  var builder = new StreamBuilder();

        // re key stream 
        builder.Stream<string, string>(topicsConfig.SupplierInputTopicName())
            .Map((k, v) => KeyValuePair.Create(v, v))
            .To(topicsConfig.SupplierOutputTopicName());

        // transform stream 
        builder.Stream<string, string>(topicsConfig.SupplierOutputTopicName())
            .Peek((k, v) => _logger.LogInformation("key={k} value={v}", k,v))
            .MapValuesAsync<SupplierEventModel>(async (record, _) => await TransformationToSupplierEvent(record))
            .To(topicsConfig.SupplierTransformedTopic());

        var topology = builder.Build();

        return new KafkaStream(topology, streamConfig);
LGouellec commented 1 year ago

@DatStorm ,

I tried with this topology and the internal request and response topics are correctly prefixed with the application.id

image
      var config = new StreamConfig<StringSerDes, StringSerDes>();
            config.ApplicationId = "test-app";
            config.BootstrapServers = "localhost:9092";

            StreamBuilder builder = new StreamBuilder();

            builder.Stream<string, string>("inputs")
                .Peek((k, v) => logger.LogInformation("key={k} value={v}", k,v))
                .MapValuesAsync(async (record, _) => await Task.FromResult(record.Value.ToUpper()))
                .To("output");

            Topology t = builder.Build();
            KafkaStream stream1 = new KafkaStream(t, config);

            Console.CancelKeyPress += (_, _) => stream1.Dispose();

            await stream1.StartAsync();
DatStorm commented 1 year ago

I just ran it locally and you are right the topics created are in fact prefixed with the application id but the topology printed in the log (in the beginning) does not have them and that is a bit confusing.

image

LGouellec commented 1 year ago

The topology printed at the beginning is agnostic to the application id. Same for the others internal topics (changelog and repartition).

DatStorm commented 1 year ago

I was not aware. So for the misunderstanding.

The issue can be closed.