Azure / azure-event-hubs-for-kafka

Azure Event Hubs for Apache Kafka Ecosystems
https://docs.microsoft.com/azure/event-hubs/event-hubs-for-kafka-ecosystem-overview
Other
231 stars 213 forks source link

Error "The broker does not support DESCRIBE_CONFIGS" #61

Open yorek opened 5 years ago

yorek commented 5 years ago

Description

When using Debezium, which is a Kafka Connect connector, I get the error "The broker does not support DESCRIBE_CONFIGS", when trying to use Event Hub to store schema changes. See Issue #53 which has been closed but, though a workaround (not saving the schema changes) has been found, the problem is still there and prevents a full debezium usage.

How to reproduce

Setup Debezium to use EventHub instead of Kafka

Has it worked previously?

No

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

arerlend commented 5 years ago

Error message is accurate, DESCRIBE_CONFIGS is in fact not supported. We're aware that it's a blocker for some Kafka applications (not just for the Debezium connector) - it will be implemented in the near future. Note to readers - DESCRIBE_CONFIGS is not needed for base Kafka functionality.

yorek commented 5 years ago

Thanks a lot Arthur. Any timeframe that you can share? Thanks!

arerlend commented 5 years ago

No exact timeframe I can give you now. We're aware it's heavily utilized by a large portion of Kafka extensions so it's relatively high priority on the roadmap.

arerlend commented 4 years ago

@yorek DescribeConfigs is being investigated now.

@conorr thanks for the contribution on the other repo. If you've got the time I'd love to get a sample or working config, and I'll post it on this repo.

yorek commented 4 years ago

Yeah! :)

conorr commented 4 years ago

No problem. Do you mean a sample for running kafdrop UI against Event Hubs for Kafka? Or a sample consumer? I can provide either.

arerlend commented 4 years ago

@conorr I was thinking the kafdrop UI config. I'll be adding a repo folder with configs for management tooling and this looks like an interesting addition.

conorr commented 4 years ago

Save config to kafka.properties:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<connection-string-goes-here>";

Pull kafdrop docker image (the DESCRIBE_CONFIGS fix is not yet in the latest tag, but I imagine it will be soon):

docker pull obsidiandynamics/kafdrop:3.22.0-SNAPSHOT

Run the container, referencing the kafka.properties file:

docker run -d --rm -p 9000:9000 \
    -e KAFKA_BROKERCONNECT=your-event-hubs-namespace.servicebus.windows.net:9093 \
    -e KAFKA_PROPERTIES=$(cat kafka.properties | base64) \
    obsidiandynamics/kafdrop:3.22.0-SNAPSHOT

This will run kafdrop UI at http://localhost:9000.

arerlend commented 4 years ago

We've added a handler for DescribeConfigs. I'll try out kafdrop now.

conorr commented 4 years ago

I confirm the DescribeConfigs fix. I used an older version of the kafdrop docker image that didn't have my workaround fix. Thank you!

arerlend commented 4 years ago

Thanks @conorr. Closing for now, but pinged @yorek to see if he can try out his scenario as well.

For future readers - please open a new issue if you're seeing exceptions in DescribeConfigs paths.

arerlend commented 4 years ago

Broker-level configs need support.

ywchang commented 4 years ago

Hi @arerlend, can I understand the current state of this issue? I was using the same kafka connect plugin debezium/connect and I am currently having issue with event hub.

The error looks like this, and I high suspect it fails when trying to get broker config from calling adminclient describeConfigs method.

org.apache.kafka.connect.errors.ConnectException: Creation of database history topic failed, please create the topic manually
    at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:365)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.initializeStorage(HistorizedRelationalDatabaseSchema.java:63)
    at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:82)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:77)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
    at io.debezium.relational.history.KafkaDatabaseHistory.getKafkaBrokerConfig(KafkaDatabaseHistory.java:403)
    at io.debezium.relational.history.KafkaDatabaseHistory.getDefaultTopicReplicationFactor(KafkaDatabaseHistory.java:371)
    at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:355)
    ... 11 more 

The code snippet,

String nodeId = nodes.iterator().next().idString();
        Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId));
        final Map<ConfigResource, Config> configs = admin.describeConfigs(resources).all().get(
                KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
ywchang commented 4 years ago

Hi @arerlend

Good day!

Is there any update for this issue?

arerlend commented 4 years ago

@ywchang we haven't implemented adding broker-level configs to the configs yet. Is this blocking a required production scenario for you?

ywchang commented 4 years ago

Hi @arerlend

For me, the scenario is that we are using Debezium Kafka Connector with Azure Event Hub to extract data from a external database to Event Hub in production environment. The Debezium will query the broker config to get a replication settings before using its default value.

Since the Event Hub hasn't supported this query, an exception will be thrown using the official debezium. I have to fork the repo and do some code changes to make it work through. So just wondering if there is a timeline for this so that we can avoid changing the source code.

Adpersonam commented 4 years ago

Hi @arerlend, Any update on this issue?

ssugar commented 3 years ago

Hi @arerlend - any recent update on this? I receive the same "org.apache.kafka.connect.errors.ConnectException: Creation of database history topic failed, please create the topic manually" error when connecting Debezium (Kafka-Connect) in Azure Event Hubs.

There is a workaround listed in #53 which is to add the following to the connector configuration:

"database.history":"io.debezium.relational.history.MemoryDatabaseHistory"

My understanding is that this would store the database history in memory, which probably won't survive a restart. During some research, it looks like it may be possible to switch the above to:

"database.history":"io.debezium.relational.history.FileDatabaseHistory",
"database.history.file.filename":"history.dat"

And I'm assuming that the database history would then be able to survive a restart. (initial tests indicate this does survive a restart of the container).

Overall, it would be ideal if no additional workaround was required.

Anyone have any thoughts on this?

yehudamakarov commented 3 years ago

@ssugar, what are you asking? if it is ideal that azure not completely support the kafka endpoint, because we can use

"database.history":"io.debezium.relational.history.FileDatabaseHistory",
"database.history.file.filename":"history.dat"
ssugar commented 3 years ago

@yehudamakarov - I wasn't asking anything, was just saying it would be ideal if EventHubs provided a kafka endpoint that didn't require any workaround

yehudamakarov commented 3 years ago

Ah!! :) ok great we are definitely on the same page! On Apr 28, 2021, 22:19 -0400, ssugar @.***>, wrote:

@yehudamakarov - I wasn't asking anything, was just saying it would be ideal if EventHubs provided a kafka endpoint that didn't require any workaround — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or unsubscribe.

alastairs commented 2 years ago

@yehudamakarov @arerlend Is there any update available on whether Azure Event Hubs now supports broker-level configs for DESCRIBE_CONFIGS requests? I'm trying to determine whether or not the workaround for Debezium is still required.