LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
470 stars 75 forks source link

Not receiving data from Azure Event Hub at my consumer #198

Closed Shikha4599 closed 2 years ago

Shikha4599 commented 2 years ago

Description

I want to consume messages from Azure Event Hub and then do processing on it but I am not receiving any data from event hub. Also I am not able to find the reason as I am not receiving any kind of error or exception on my debug console. I am able to send data successfully to Azure IOT Hub and data is flowing there but I am not receiving at my consumer end. This is my configuration for connection with Azure EventHub :

` public async Task ConsumerAsync() {

        // Stream configuration
        var config = new StreamConfig();
        config.ApplicationId = "testing-acc4";
        config.BootstrapServers = "xxxx";
        config.SecurityProtocol = SecurityProtocol.SaslSsl;
        config.SocketTimeoutMs = 60000;
        config.SessionTimeoutMs = 30000;
        config.SaslMechanism = SaslMechanism.Plain;
        config.SaslUsername = "$ConnectionString";
        config.SaslPassword = "xxxx";
        config.AutoOffsetReset = AutoOffsetReset.Earliest;
        config.BrokerVersionFallback = "1.0.0";

        config.DefaultKeySerDes = new StringSerDes();
        config.DefaultValueSerDes = new DataDto();

        StreamBuilder builder = new StreamBuilder();

        IKStream<string, DataDto> str = builder.Stream<string, DataDto>("xxxxx");
        str.Filter((k, v) => CheckFilterCriteria(v)).Print(Printed<string, DataDto>.ToOut());

        Topology t = builder.Build();

        // Create a stream instance with topology and configuration
        KafkaStream stream = new KafkaStream(t, config);

        //Subscribe CTRL +C to quit stream application
        Console.CancelKeyPress += (o, e) =>
        {
            stream.Dispose();
        };

        await stream.StartAsync();

    }`

This is the response I am getting on debug console:

azure1 azure2

This is the response I am getting when I stopped sending data to azure IOT hub:

azure3

Above screenshot tells that connection was established with azure Event Hub but still I was not getting any data or error messages

Can you please guide me on what is the issue and why I am not receiving any data from Event Hub. But when I am doing same stuff with kafka server locally, everything is happening perfectly fine. Kindly give me heads up.

How to reproduce

Checklist

Please provide the following information:

LGouellec commented 2 years ago

Hi @Shikha4599 ,

Could you activate librdkafka debug log as :

config.Debug = "debug,security,broker,protocol";
config.Logger = LoggerFactory.Create(builder => { 
builder.SetMinimumLevel(LogLevel.Debug);
builder.AddLog4Net(); // add your logger : console, log4net, whatever
});
Shikha4599 commented 2 years ago

Hi @Shikha4599 ,

Could you activate librdkafka debug log as :

config.Debug = "debug,security,broker,protocol";
config.Logger = LoggerFactory.Create(builder => { 
builder.SetMinimumLevel(LogLevel.Debug);
builder.AddLog4Net(); // add your logger : console, log4net, whatever
});

Hi @LGouellec

Even after adding above code, there is no change in console logs and not receiving any data at my consumer.

LGouellec commented 2 years ago

@Shikha4599 Which logger do you use ?

Shikha4599 commented 2 years ago

@LGouellec I am using console logger.

LGouellec commented 2 years ago

@LGouellec I am using console logger.

Which Streamiz version do you use ?

Can you share all the stuff regarding config and topology again please ?

Shikha4599 commented 2 years ago

@LGouellec I am using console logger.

Which Streamiz version do you use ?

Can you share all the stuff regarding config and topology again please ?

I am using Streamiz version 1.3.2

Below is my config and topology:

public async Task ConsumerAsync()
        {
            // Stream configuration
            var config = new StreamConfig();
            config.ApplicationId = "testing-acc4";
            config.BootstrapServers = "xx";
            config.SecurityProtocol = SecurityProtocol.SaslSsl;
            config.SocketTimeoutMs = 60000;
            config.SessionTimeoutMs = 30000;
            config.SaslMechanism = SaslMechanism.Plain;
            config.SaslUsername = "$ConnectionString";
            config.SaslPassword = "xxxx";
            config.Debug = "debug,security,broker,protocol";
            config.Logger = LoggerFactory.Create(builder =>
            {
                builder.SetMinimumLevel(LogLevel.Information);
                builder.AddConsole(); 
            });
            config.AutoOffsetReset = AutoOffsetReset.Earliest;
            config.BrokerVersionFallback = "1.0.0";

            config.DefaultKeySerDes = new StringSerDes();
            config.DefaultValueSerDes = new DataDto();

            StreamBuilder builder = new StreamBuilder();

            IKStream<string, DataDto> str = builder.Stream<string, DataDto>("iothub");
            str.Filter((k, v) => CheckFilterCriteria(v)).Print(Printed<string, DataDto>.ToOut());

            Topology t = builder.Build();

            // Create a stream instance with topology and configuration
            KafkaStream stream = new KafkaStream(t, config);

            //Subscribe CTRL +C to quit stream application
            Console.CancelKeyPress += (o, e) =>
            {
                stream.Dispose();
            };

            await stream.StartAsync();

        }
LGouellec commented 2 years ago

@LGouellec I am using console logger.

Which Streamiz version do you use ?

Can you share all the stuff regarding config and topology again please ?

I am using Streamiz version 1.3.2

Below is my config and topology:


public async Task ConsumerAsync()

        {

            // Stream configuration

            var config = new StreamConfig();

            config.ApplicationId = "testing-acc4";

            config.BootstrapServers = "xx";

            config.SecurityProtocol = SecurityProtocol.SaslSsl;

            config.SocketTimeoutMs = 60000;

            config.SessionTimeoutMs = 30000;

            config.SaslMechanism = SaslMechanism.Plain;

            config.SaslUsername = "$ConnectionString";

            config.SaslPassword = "xxxx";

            config.Debug = "debug,security,broker,protocol";

            config.Logger = LoggerFactory.Create(builder =>

            {

                builder.SetMinimumLevel(LogLevel.Information);

                builder.AddConsole(); 

            });

            config.AutoOffsetReset = AutoOffsetReset.Earliest;

            config.BrokerVersionFallback = "1.0.0";

            config.DefaultKeySerDes = new StringSerDes();

            config.DefaultValueSerDes = new DataDto();

            StreamBuilder builder = new StreamBuilder();

            IKStream<string, DataDto> str = builder.Stream<string, DataDto>("iothub");

            str.Filter((k, v) => CheckFilterCriteria(v)).Print(Printed<string, DataDto>.ToOut());

            Topology t = builder.Build();

            // Create a stream instance with topology and configuration

            KafkaStream stream = new KafkaStream(t, config);

            //Subscribe CTRL +C to quit stream application

            Console.CancelKeyPress += (o, e) =>

            {

                stream.Dispose();

            };

            await stream.StartAsync();

        }

Please set minimum log level to Debug instead Information for getting librdkafka debug logs. Thanks

Shikha4599 commented 2 years ago

@LGouellec I am using console logger.

Which Streamiz version do you use ?

Can you share all the stuff regarding config and topology again please ?

I am using Streamiz version 1.3.2 Below is my config and topology:


public async Task ConsumerAsync()

        {

            // Stream configuration

            var config = new StreamConfig();

            config.ApplicationId = "testing-acc4";

            config.BootstrapServers = "xx";

            config.SecurityProtocol = SecurityProtocol.SaslSsl;

            config.SocketTimeoutMs = 60000;

            config.SessionTimeoutMs = 30000;

            config.SaslMechanism = SaslMechanism.Plain;

            config.SaslUsername = "$ConnectionString";

            config.SaslPassword = "xxxx";

            config.Debug = "debug,security,broker,protocol";

            config.Logger = LoggerFactory.Create(builder =>

            {

                builder.SetMinimumLevel(LogLevel.Information);

                builder.AddConsole(); 

            });

            config.AutoOffsetReset = AutoOffsetReset.Earliest;

            config.BrokerVersionFallback = "1.0.0";

            config.DefaultKeySerDes = new StringSerDes();

            config.DefaultValueSerDes = new DataDto();

            StreamBuilder builder = new StreamBuilder();

            IKStream<string, DataDto> str = builder.Stream<string, DataDto>("iothub");

            str.Filter((k, v) => CheckFilterCriteria(v)).Print(Printed<string, DataDto>.ToOut());

            Topology t = builder.Build();

            // Create a stream instance with topology and configuration

            KafkaStream stream = new KafkaStream(t, config);

            //Subscribe CTRL +C to quit stream application

            Console.CancelKeyPress += (o, e) =>

            {

                stream.Dispose();

            };

            await stream.StartAsync();

        }

Please set minimum log level to Debug instead Information for getting librdkafka debug logs. Thanks

I replaced log level with Debug, still I am not receiving anything. Please look at the attached screenshot

azure4
LGouellec commented 2 years ago

@Shikha4599 Looks strange. Can you update to 1.4.0-RC2 please ?

Shikha4599 commented 2 years ago

@Shikha4599 Looks strange. Can you update to 1.4.0-RC2 please ?

It is showing 1.3.2 as latest one. Kindly look at the attached screenshot.

azure5
LGouellec commented 2 years ago

@Shikha4599 Looks strange. Can you update to 1.4.0-RC2 please ?

It is showing 1.3.2 as latest one. Kindly look at the attached screenshot.

azure5

1.4.0-RC2 is a pre release version. You have to check the checkbox in the nugget package manager view

Shikha4599 commented 2 years ago

@Shikha4599 Looks strange. Can you update to 1.4.0-RC2 please ?

It is showing 1.3.2 as latest one. Kindly look at the attached screenshot.

azure5

@Shikha4599 Looks strange. Can you update to 1.4.0-RC2 please ?

It is showing 1.3.2 as latest one. Kindly look at the attached screenshot.

azure5

1.4.0-RC2 is a pre release version. You have to check the checkbox in the nugget package manager view

After updating it to 1.4.0-RC2, I am still not receiving any data. Please look at the attached screenshot.

azure6
LGouellec commented 2 years ago

@Shikha4599,

I will try to reproduce asap.

Shikha4599 commented 2 years ago

@Shikha4599,

I will try to reproduce asap.

Okay, Thanks!

LGouellec commented 2 years ago

@Shikha4599 ,

After regarding your configuration, I think you miss to set this property

config.SslCaLocation

You can found an example here

LGouellec commented 2 years ago

@Shikha4599,

I reproduced the issue.

I added some logs to understand more the issue. Before starting a stremiz application, a Kafka Admin Client is created to list the configs of brokers and maybe create internal topics (changelog and repartition), if the application is stateful.

Unfortunately, Event Hubs doesn't support DESCRIBE_CONFIGS. Please have a look to : https://github.com/Azure/azure-event-hubs-for-kafka/issues/61.

I created a new pre-release package 1.4.0-RC3 with more logs. Feel free to test, you could appear an error logs where the broker is unknown because Azure Event Hubs does not support DESCRIBE CONFIG yet. Sample here

Shikha4599 commented 2 years ago

@Shikha4599,

I reproduced the issue.

I added some logs to understand more the issue. Before starting a stremiz application, a Kafka Admin Client is created to list the configs of brokers and maybe create internal topics (changelog and repartition), if the application is stateful.

Unfortunately, Event Hubs doesn't support DESCRIBE_CONFIGS. Please have a look to : Azure/azure-event-hubs-for-kafka#61.

I created a new pre-release package 1.4.0-RC3 with more logs. Feel free to test, you could appear an error logs where the broker is unknown because Azure Event Hubs does not support DESCRIBE CONFIG yet. Sample here

Hi @LGouellec After doing above changes, I am receiving the following error. Kindly give heads up.

Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[testing-acc4] Error during initializing internal topics : An error occurred describing the following resources: [[Broker] 0]: [Broker: Unknown topic or partition].
LGouellec commented 2 years ago

Yes exactly, feel free to add your upvote into the azure event issue. It opened since 2019 ... ☹️

So for now, Streamiz is not compatible with Azure Event Hubs, like some kafka connectors I suppose.

I close this issue. If you have any other question, feel free to reopen this one.

Best regards,