confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
10 stars 2 forks source link

Issue with Byte Deserializer #598

Open saibayyavarapu opened 5 years ago

saibayyavarapu commented 5 years ago

Description

I tried to connect to kafka servers(Consumer) by using SSL protocol. Successfully connected to servers but not receiving any messages. Here, Producers are pushing data in byte format.

PLease find the below logs and code for reference :-

7|2018-08-17 15:41:07.550|Test1#consumer-1|CONNECTED| [thrd:ssl://myserver1:9093/bootstrap]: ssl://myserver1:9093/1: Connected (#1) 7|2018-08-17 15:41:07.544|Test1#consumer-1|SSLVERIFY| [thrd:ssl://myserver1:9093/bootstrap]: ssl://myserver1:9093/1: Broker SSL certificate verified 7|2018-08-17 15:41:07.508|Test1#consumer-1|CONNECTED| [thrd:ssl://myserver2:9093/bootstrap]: ssl://myserver2:9093/bootstrap: Connected (#1) 7|2018-08-17 15:41:07.544|Test1#consumer-1|SSLVERIFY| [thrd:ssl://myserver2:9093/bootstrap]: ssl://myserver2:9093/1: Broker SSL certificate verified

Messages :-

7|2018-08-17 16:43:55.896|Test1#consumer-1|SEND| [thrd:ssl://myserver1:9093/bootstrap]: ssl://myserver1:9093/2: Sent MetadataRequest (v2, 23 bytes @ 0, CorrId 2) 7|2018-08-17 16:43:55.908|Test1#consumer-1|RECV| [thrd:ssl://myserver1:9093/bootstrap]: ssl://myserver1:9093/2: Received MetadataResponse (v2, 414 bytes, CorrId 2, rtt 11.94ms)

Consumer Code :-

Consumer<string, byte[]> consumer consumer = new Consumer<string, byte[]>(config,new StringDeserializer(Encoding.UTF8), new ByteArrayDeserializer()); Message<string, byte[]> msg; if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100))) { continue; }

Here, Always getting msg as null.

Can you please help me why "msg" is always null even though we had messages against our topic? Based on the logs, assuming successfully connected to the servers and getting messages. Is my understanding correct ?

How to reproduce

Checklist

Please provide the following information:

mhowlett commented 5 years ago

you could try adding a handler to OnConsumeError. Note that in the 1.0 api there is no OnConsumeError and you'll get an exception instead - i'd recommend using the 1.0 api at this stage.

also, are you producing at the same time you are consuming and if not, did you set auto.offset.reset to earliest.

mhowlett commented 5 years ago

(OnConsumeError would occur if the string deserialization failed).

Otherwise, feel free to paste more comprehensive 'debug': 'all' logs.

saibayyavarapu commented 5 years ago

Hi, Sorry for late reply. Added "debug" : "all" and after that I am seeing the below error. From the below logs, it was like no topic but, I was able to read the data from command prompt by using consumer.bat file(Here, used JKS file to consume data)

Any pointers are really helpful.

7|2018-08-20 11:59:13.878|Test1#consumer-1|METADATA| [thrd:main]: ssl://server1:9093/3: Topic #0/1: "TopicName" with 0 partitions: Broker: Unknown topic or partition 7|2018-08-20 11:59:13.878|Test1#consumer-1|METADATA| [thrd:main]: ssl://server1:9093/3: 1/1 requested topic(s) seen in metadata 7|2018-08-20 11:59:13.878|Test1#consumer-1|METADATA| [thrd:main]: Purged 1/1 cached topic hint(s) 7|2018-08-20 11:59:13.878|Test1#consumer-1|SUBSCRIPTION| [thrd:main]: Group "Test1234": no topics in metadata matched subscription 7|2018-08-20 11:59:13.878|Test1#consumer-1|JOIN| [thrd:main]: Group "Test1234":join with 0 (1) subscribed topic(s) 7|2018-08-20 11:59:13.878|Test1#consumer-1|METADATA| [thrd:main]: Hinted cache of 1/1 topic(s) being queried

mhowlett commented 5 years ago

Maybe have a look at what Consumer.Metadata gives you, or see if AdminClient.DescribeConfigsAsync in the new API works. Maybe try creating a new topic with AdminClient.CreateTopicsAsync and see if you can consume from that. I don't have any good ideas as to what the problem might be atm if it's not that the topic doesn't exist.

saibayyavarapu commented 5 years ago

Hi, I looked into the metadata(Consumer.Metadata), it's not returning this topic. I was confused and checking with the Producer team on this. In between, Currently Producer team producing messages by using Java and authenticating by using JKS. There are some consumers for this and they are able to consume by Java.

We are trying to connect by using .NET. Do you have any pointers on what could be the reason ? Thanks in advance.

mhowlett commented 5 years ago

are they restricting access to the topic using ACLs? if you are not authorized, i suspect the result would be that you simply cannot view the topic (i'm not very familiar with Kafka ACLs though).

We need to write a cookbook like tutorial for converting java security configs to ones compatible with librdkaka. we haven't done this yet. I did start writing down how to set things up from scratch though here: https://github.com/mhowlett/confluent-kafka-dotnet/tree/security/examples/Security

anchitj commented 3 weeks ago

Is this still an issue?