Closed ivanpedersen closed 1 year ago
Hello @ivanpedersen , Thanks for reporting this. We will look at reproducing this on a Synapse cluster and report if there is a fix needed. Which version of the connector do you use ? Also , can you scrub the credentials from the KafkaConnect configuration and share that with us?
I'm using 4.0.0
Using a docker build step to install from confluent-hub:
FROM confluentinc/cp-kafka-connect-base as cp
RUN confluent-hub install --no-prompt microsoftcorporation/kafka-sink-azure-kusto:4.0.0
Which I later copy into the image like
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.2.1
...
COPY --from=cp /usr/share/confluent-hub-components/microsoftcorporation-kafka-sink-azure-kusto /opt/kafka/plugins/microsoftcorporation-kafka-sink-azure-kusto
How do you mean I should scrub credentials?
Sure, You can just leave the AAD creds blank and just give the basic config used in the connector
Attaching output from the commands: k describe kctr kusto-sink-connector-test
Name: kusto-sink-connector-test
Namespace: subscription-manager
Labels: app=subscription-manager-connect-dev
argocd.argoproj.io/instance=connect-subscription-manager
strimzi.io/cluster=subscription-manager-connect-cluster
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: KafkaConnector
Metadata:
Creation Timestamp: 2023-05-02T12:22:52Z
Generation: 1
Managed Fields:
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:labels:
.:
f:app:
f:argocd.argoproj.io/instance:
f:strimzi.io/cluster:
f:spec:
.:
f:class:
f:config:
.:
f:aad.auth.appid:
f:aad.auth.appkey:
f:aad.auth.authority:
f:kusto.ingestion.url:
f:kusto.query.url:
f:kusto.tables.topics.mapping:
f:topics:
f:value.converter:
f:tasksMax:
Manager: argocd-application-controller
Operation: Update
Time: 2023-05-02T12:22:52Z
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:status:
.:
f:conditions:
f:connectorStatus:
.:
f:connector:
.:
f:state:
f:worker_id:
f:name:
f:tasks:
f:type:
f:observedGeneration:
f:tasksMax:
f:topics:
Manager: okhttp
Operation: Update
Subresource: status
Time: 2023-05-02T12:23:16Z
Resource Version: 691710594
UID: 00930466-22ca-44c1-b4f2-c6c837980f87
Spec:
Class: com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
Config:
aad.auth.appid: APP_ID_UUID
aad.auth.appkey: ${secrets:subscription-manager/subscription-manager-connect-secrets:appkey}
aad.auth.authority: TENANT_ID_UUID
kusto.ingestion.url: https://ingest-CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net
kusto.query.url: https://CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net
kusto.tables.topics.mapping: [{'topic': 'TOPICNAME', 'db': 'SubscriptionManager','table': 'test', 'format': 'json','streaming':'false'}]
Topics: TOPICNAME
value.converter: org.apache.kafka.connect.storage.StringConverter
Tasks Max: 1
Status:
Conditions:
Last Transition Time: 2023-05-02T13:29:15.506976Z
Status: True
Type: Ready
Connector Status:
Connector:
State: RUNNING
worker_id: 10.48.138.179:8083
Name: kusto-sink-connector-test
Tasks:
Id: 0
State: FAILED
Trace: org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error in flushByTime. Current file: kafka_TOPICNAME_0_0.MULTIJSON.gz, size: 144.
at com.microsoft.azure.kusto.kafka.connect.sink.FileWriter.writeData(FileWriter.java:283)
at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:212)
at com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask.put(KustoSinkTask.java:450)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
worker_id: 10.48.138.179:8083
Type: sink
Observed Generation: 1
Tasks Max: 1
Topics:
TOPICNAME
Events: <none>
And k get kctr kusto-sink-connector-test -o yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"app":"subscription-manager-connect-dev","argocd.argoproj.io/instance":"connect-subscription-manager","strimzi.io/cluster":"subscription-manager-connect-cluster"},"name":"kusto-sink-connector-test","namespace":"subscription-manager"},"spec":{"class":"com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector","config":{"aad.auth.appid":"APP_ID_UUID","aad.auth.appkey":"${secrets:subscription-manager/subscription-manager-connect-secrets:appkey}","aad.auth.authority":"TENANT_ID_UUID","kusto.ingestion.url":"https://ingest-CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net","kusto.query.url":"https://CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net","kusto.tables.topics.mapping":"[{'topic': 'TOPICNAME', 'db': 'SubscriptionManager','table': 'test', 'format': 'json','streaming':'false'}]","topics":"TOPICNAME","value.converter":"org.apache.kafka.connect.storage.StringConverter"},"tasksMax":1}}
creationTimestamp: "2023-05-02T12:22:52Z"
generation: 1
labels:
app: subscription-manager-connect-dev
argocd.argoproj.io/instance: connect-subscription-manager
strimzi.io/cluster: subscription-manager-connect-cluster
name: kusto-sink-connector-test
namespace: subscription-manager
resourceVersion: "691710594"
uid: 00930466-22ca-44c1-b4f2-c6c837980f87
spec:
class: com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
config:
aad.auth.appid: APP_ID_UUID
aad.auth.appkey: ${secrets:subscription-manager/subscription-manager-connect-secrets:appkey}
aad.auth.authority: TENANT_ID_UUID
kusto.ingestion.url: https://ingest-CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net
kusto.query.url: https://CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net
kusto.tables.topics.mapping: '[{''topic'': ''TOPICNAME'', ''db'':
''SubscriptionManager'',''table'': ''test'', ''format'': ''json'',''streaming'':''false''}]'
topics: TOPICNAME
value.converter: org.apache.kafka.connect.storage.StringConverter
tasksMax: 1
status:
conditions:
- lastTransitionTime: "2023-05-02T13:29:15.506976Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: RUNNING
worker_id: 10.48.138.179:8083
name: kusto-sink-connector-test
tasks:
- id: 0
state: FAILED
trace: "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException:
Error in flushByTime. Current file: kafka_TOPICNAME_0_0.MULTIJSON.gz,
size: 144. \n\tat com.microsoft.azure.kusto.kafka.connect.sink.FileWriter.writeData(FileWriter.java:283)\n\tat
com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:212)\n\tat
com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask.put(KustoSinkTask.java:450)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\t...
10 more\n"
worker_id: 10.48.138.179:8083
type: sink
observedGeneration: 1
tasksMax: 1
topics:
- TOPICNAME
Hello @ivanpedersen
This is great , right now in theflush.interval.ms
is not set in the config. Is this right
aad.auth.appid: APP_ID_UUID
aad.auth.appkey: ${secrets:subscription-manager/subscription-manager-connect-secrets:appkey}
aad.auth.authority: TENANT_ID_UUID
kusto.ingestion.url: https://ingest-CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net
kusto.query.url: https://CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net
kusto.tables.topics.mapping: [{'topic': 'TOPICNAME', 'db': 'SubscriptionManager','table': 'test', 'format': 'json','streaming':'false'}]
Topics: TOPICNAME
value.converter: org.apache.kafka.connect.storage.StringConverter
So i am going to try and replicate this config (which seems to be the default) as-is against a synapse cluster and try to replicate this. I will try and report back on this issue in a day or 2 if we find something.
Correct, I've not set something for flush.interval.ms
. Default is supposed to be 30s?
Thanks for looking into this!
seems like we don't throw the complete exception can you look for the log without the connectException and send it here? as we do have this line Catch(e) flushError = String.format("Error in flushByTime. Current file: %s, size: %d. ", fileName, currentSize); // This need to be fixed to include the error log.error(flushError, e); . . . throw ConnectException(flushError)
So you see the exception without the inner exception
This is the complete log I see:
2023-05-05 07:29:04,416 ERROR [kusto-sink-connector-test|task-0] WorkerSinkTask{id=kusto-sink-connector-test-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error in flushByTime. Current file: kafka_topicname_0_0.MULTIJSON.gz, size: 160. (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-kusto-sink-connector-test-0]
org.apache.kafka.connect.errors.ConnectException: Error in flushByTime. Current file: kafka_topicname_0_0.MULTIJSON.gz, size: 160.
at com.microsoft.azure.kusto.kafka.connect.sink.FileWriter.writeData(FileWriter.java:283)
at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:212)
at com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask.put(KustoSinkTask.java:450)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-05-05 07:29:04,416 WARN [kusto-sink-connector-test|task-0] Closing writers in KustoSinkTask (com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,417 ERROR [kusto-sink-connector-test|task-0] WorkerSinkTask{id=kusto-sink-connector-test-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-kusto-sink-connector-test-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error in flushByTime. Current file: kafka_topicname_0_0.MULTIJSON.gz, size: 160.
at com.microsoft.azure.kusto.kafka.connect.sink.FileWriter.writeData(FileWriter.java:283)
at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:212)
at com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask.put(KustoSinkTask.java:450)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
2023-05-05 07:29:04,417 WARN [kusto-sink-connector-test|task-0] Stopping KustoSinkTask (com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,418 INFO [kusto-sink-connector-test|task-0] [Consumer clientId=connector-consumer-kusto-sink-connector-test-0, groupId=connect-kusto-sink-connector-test] Revoke previously assigned partitions topicname-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,418 INFO [kusto-sink-connector-test|task-0] [Consumer clientId=connector-consumer-kusto-sink-connector-test-0, groupId=connect-kusto-sink-connector-test] Member connector-consumer-kusto-sink-connector-test-0-77c78321-e27c-4769-8228-9215c8087947 sending LeaveGroup request to coordinator 10.40.205.29:9094 (id: 2147483644 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,418 INFO [kusto-sink-connector-test|task-0] [Consumer clientId=connector-consumer-kusto-sink-connector-test-0, groupId=connect-kusto-sink-connector-test] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,418 INFO [kusto-sink-connector-test|task-0] [Consumer clientId=connector-consumer-kusto-sink-connector-test-0, groupId=connect-kusto-sink-connector-test] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,422 INFO [kusto-sink-connector-test|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,422 INFO [kusto-sink-connector-test|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,422 INFO [kusto-sink-connector-test|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics) [task-thread-kusto-sink-connector-test-0]
2023-05-05 07:29:04,423 INFO [kusto-sink-connector-test|task-0] App info kafka.consumer for connector-consumer-kusto-sink-connector-test-0 unregistered (org.apache.kafka.common.utils.AppInfoParser) [task-thread-kusto-sink-connector-test-0]
Perhaps we got a bit further now by opening a firewall I wasn't aware of. Now I'm instead seeing this error:
I'm confused about the address ybwkstrlddeweufbddev01.blob.core.windows.net
which is different to the ingestion URL I set in the config. It appears to be unable to lookup this particular URL on my K8s cluster (locally I can lookup without issues). Question is, where does this come from?
2023-05-05 10:46:32,819 ERROR [kusto-sink-connector-test|task-0] Error in flushByTime. Current file: kafka_topicname_0_0.MULTIJSON.gz, size: 292. (com.microsoft.azure.kusto.kafka.connect.sink.FileWriter) [Timer-19]
reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: ybwkstrlddeweufbddev01.blob.core.windows.net: Name or service not known
at reactor.core.Exceptions.propagate(Exceptions.java:392)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:133)
at com.azure.storage.blob.BlobClient.uploadFromFileWithResponse(BlobClient.java:504)
at com.azure.storage.blob.BlobClient.uploadFromFile(BlobClient.java:451)
at com.azure.storage.blob.BlobClient.uploadFromFile(BlobClient.java:402)
at com.azure.storage.blob.BlobClient.uploadFromFile(BlobClient.java:364)
at com.microsoft.azure.kusto.ingest.AzureStorageClient.uploadFileToBlob(AzureStorageClient.java:83)
at com.microsoft.azure.kusto.ingest.AzureStorageClient.uploadLocalFileToBlob(AzureStorageClient.java:64)
at com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl.ingestFromFile(QueuedIngestClientImpl.java:176)
at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.handleRollFile(TopicPartitionWriter.java:94)
at com.microsoft.azure.kusto.kafka.connect.sink.FileWriter.finishFile(FileWriter.java:176)
at com.microsoft.azure.kusto.kafka.connect.sink.FileWriter.flushByTimeImpl(FileWriter.java:270)
at com.microsoft.azure.kusto.kafka.connect.sink.FileWriter$1.run(FileWriter.java:253)
at java.base/java.util.TimerThread.mainLoop(Timer.java:556)
at java.base/java.util.TimerThread.run(Timer.java:506)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
... 15 more
Caused by: java.net.UnknownHostException: ybwkstrlddeweufbddev01.blob.core.windows.net: Name or service not known
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:929)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1529)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:848)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:169)
at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:166)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.SocketUtils.allAddressesByName(SocketUtils.java:166)
at io.netty.resolver.DefaultNameResolver.doResolveAll(DefaultNameResolver.java:50)
at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79)
at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:71)
at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:73)
at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:31)
at io.netty.resolver.AbstractAddressResolver.resolveAll(AbstractAddressResolver.java:158)
at reactor.netty.transport.TransportConnector.doResolveAndConnect(TransportConnector.java:346)
at reactor.netty.transport.TransportConnector.lambda$connect$6(TransportConnector.java:165)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
at reactor.netty.transport.TransportConnector$MonoChannelPromise._subscribe(TransportConnector.java:642)
at reactor.netty.transport.TransportConnector$MonoChannelPromise.lambda$subscribe$0(TransportConnector.java:562)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.net.UnknownHostException: ybwkstrlddeweufbddev01.blob.core.windows.net: Name or service not known
... 30 more
Suppressed: java.net.UnknownHostException: ybwkstrlddeweufbddev01.blob.core.windows.net
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
... 26 more
Suppressed: java.net.UnknownHostException: ybwkstrlddeweufbddev01.blob.core.windows.net: Name or service not known
... 30 more
Hello @ivanpedersen , this also seems to be firewall issue. The ingestion to the Kusto tables happens through blob storage. The list of resources that are accessed can be found ( run this query on the DM at ingest-cluster.kusto.windows.net)
.get ingestion resources
You probably will need to relax firewall rules for these resources
@ag-ramachandran , I am working with @ivanpedersen on this... Could you please explain what you mean by "DM" in your comment?
@pmalanpr, Sure it would be https://ingest-CLUSTER.syn-weu-fdac-fdw-dev.kusto.azuresynapse.net
. The query we'd have to run is .get ingestion resources
.
This should return you a set that resembles the following
Thanks, @ag-ramachandran. I got the output.
Hello @pmalanpr, Good day! Did this work for you.
If so, can we close this issue? If you need any assistance, happy to help.
Hey @ag-ramachandran, sorry for the delay on this.
We're receiving a different error now.
com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException: Error refreshing IngestionAuthTokenIOError when trying to retrieve CloudInfo at com.microsoft.azure.kusto.ingest.ResourceManager.refreshIngestionAuthToken(ResourceManager.java:300) at com.microsoft.azure.kusto.ingest.ResourceManager$1RefreshIngestionAuthTokenTask.run(ResourceManager.java:145) at java.base/java.util.TimerThread.mainLoop(Timer.java:556) at java.base/java.util.TimerThread.run(Timer.java:506) Caused by: com.microsoft.azure.kusto.data.exceptions.DataServiceException: IOError when trying to retrieve CloudInfo at com.microsoft.azure.kusto.data.auth.CloudInfo.retrieveCloudInfoForCluster(CloudInfo.java:125) at com.microsoft.azure.kusto.data.auth.CloudDependentTokenProviderBase.initialize(CloudDependentTokenProviderBase.java:31) at com.microsoft.azure.kusto.data.auth.CloudDependentTokenProviderBase.acquireAccessToken(CloudDependentTokenProviderBase.java:45) at com.microsoft.azure.kusto.data.ClientImpl.generateIngestAndCommandHeaders(ClientImpl.java:291) at com.microsoft.azure.kusto.data.ClientImpl.executeToJsonResult(ClientImpl.java:145) at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:105) at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:100) at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:95) at com.microsoft.azure.kusto.ingest.ResourceManager.lambda$refreshIngestionAuthToken$e961bae5$1(ResourceManager.java:290) at io.github.resilience4j.retry.Retry.lambda$decorateCheckedSupplier$3f69f149$1(Retry.java:137) at com.microsoft.azure.kusto.ingest.ResourceManager.refreshIngestionAuthToken(ResourceManager.java:291) ... 3 more Caused by: java.net.SocketException: Connection reset at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186) at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140) at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:478) at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:472) at java.base/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160) at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:111) at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1506) at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1416) at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:456) at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:427) at org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436) at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384) at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at com.microsoft.azure.kusto.data.auth.CloudInfo.retrieveCloudInfoForCluster(CloudInfo.java:98) ... 13 more
I'm guessing this is a permission issue where I'm not allowed to do some metadata (CloudInfo) call?
Hello @ivanpedersen, sure. Let us try the following. Can you try and reach this endpoint and see if you can get the metadata from Kusto?
https://
Thanks for the offer @ag-ramachandran, we think we've got this working now though! It was some corporate firewall issues that had to ping-pong though several teams before it was resolved. We're able to push messages through now! Thanks for the assistance, much appreciated!
I'm getting a connection error like:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.ConnectException: Error in flushByTime. Current file: kafka_topicname_0_0.MULTIJSON.gz, size: 144. at com.microsoft.azure.kusto.kafka.connect.sink.FileWriter.writeData(FileWriter.java:283) at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:212) at com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask.put(KustoSinkTask.java:450) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) ... 10 more
We're using a "kusto.azuresynapse.net" ingestion URL like:
https://ingest-[cluster].kusto.azuresynapse.net
Also setting: aad.auth.appid, aad.auth.authority and aad.auth.appkey accordingly.Is it possible to use a azuresynapse cluster when using this connector? Or perhaps the issue with with some setting I'm not applying correctly. https://github.com/Azure/kafka-sink-azure-kusto#4-connect-worker-properties talks about some connect worked level settings, but I'm not using confluent cloud and this blog post for example doesn't set anything special as far as I can see.