Closed ankitag2013 closed 5 years ago
Debug logs : 10:31:33.706 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [ankiteventhubnamespace.servicebus.windows.net:9093] buffer.memory = 33554432 client.id = KafkaExampleProducer compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.LongSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = PLAIN security.protocol = SASL_SSL send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer
10:31:33.803 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time 10:31:33.809 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records 10:31:34.229 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [ankiteventhubnamespace.servicebus.windows.net:9093 (id: -1 rack: null)], partitions = []) 10:31:34.299 [main] INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in. 10:31:34.411 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time 10:31:34.420 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-closed: 10:31:34.420 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-created: 10:31:34.420 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name successful-authentication: 10:31:34.421 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name failed-authentication: 10:31:34.421 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received: 10:31:34.421 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent: 10:31:34.422 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-received: 10:31:34.424 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name select-time: 10:31:34.424 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name io-time: 10:31:34.427 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name batch-size 10:31:34.427 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name compression-rate 10:31:34.427 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name queue-time 10:31:34.427 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name request-time 10:31:34.427 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-per-request 10:31:34.428 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name record-retries 10:31:34.428 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name errors 10:31:34.428 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name record-size 10:31:34.429 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name batch-split-rate 10:31:34.429 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.producer.internals.Sender - [Producer clientId=KafkaExampleProducer] Starting Kafka producer I/O thread. 10:31:34.430 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0 10:31:34.431 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d 10:31:34.432 [main] DEBUG o.a.k.clients.producer.KafkaProducer - [Producer clientId=KafkaExampleProducer] Kafka producer started Test Data #0 from thread #14 10:31:34.438 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Initialize connection to node ankiteventhubnamespace.servicebus.windows.net:9093 (id: -1 rack: null) for sending metadata request 10:31:34.438 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Initiating connection to node ankiteventhubnamespace.servicebus.windows.net:9093 (id: -1 rack: null) 10:31:34.505 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST 10:31:34.507 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=ankiteventhubnamespace.servicebus.windows.net;mechs=[PLAIN] 10:31:34.823 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 10:31:34.824 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 10:31:34.824 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 10:31:34.828 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.kafka.common.network.Selector - [Producer clientId=KafkaExampleProducer] Created socket with SO_RCVBUF = 33700, SO_SNDBUF = 132104, SO_TIMEOUT = 0 to node -1 10:31:34.886 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Completed connection to node -1. Fetching API versions. 10:31:35.394 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.network.SslTransportLayer - SSL handshake completed successfully with peerHost 'ankiteventhubnamespace.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' 10:31:35.397 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 10:31:35.716 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_HANDSHAKE_REQUEST 10:31:35.717 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 10:31:36.034 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INITIAL 10:31:36.035 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INTERMEDIATE 10:32:34.448 [pool-1-thread-1] DEBUG o.a.k.clients.producer.KafkaProducer - [Producer clientId=KafkaExampleProducer] Exception occurred during message send: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Process finished with exit code 1
Hi @ankitag2013 - I would check that your SASL set up is configured correctly. Looks like SASL client never reaches COMPLETE state.
I resolved this with local forward from a bastion machine, and able to produce to event hub, but i am not able to consume it with same config : bootstrap.servers=localhost:9093 request.timeout.ms=60000 security.protocol=SASL_SSL sasl.mechanism=PLAIN auto.offset.reset=earliest group.id=$Default session.timeout.ms=30000 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://ankiteventhubnamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
Debug logs: 13:43:15.867 [pool-1-thread-1] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9093] check.crcs = true client.id = KafkaExampleConsumer#0 connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = $Default heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.LongDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 60000 retry.backoff.ms = 100 sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = PLAIN security.protocol = SASL_SSL send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
13:43:15.870 [pool-1-thread-1] DEBUG o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Initializing the Kafka consumer 13:43:15.930 [pool-1-thread-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [localhost:9093 (id: -1 rack: null)], partitions = []) 13:43:15.997 [pool-1-thread-1] INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in. 13:43:16.136 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time 13:43:16.146 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-closed: 13:43:16.148 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-created: 13:43:16.148 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name successful-authentication: 13:43:16.149 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name failed-authentication: 13:43:16.149 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received: 13:43:16.149 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent: 13:43:16.150 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-received: 13:43:16.151 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name select-time: 13:43:16.152 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name io-time: 13:43:16.166 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency 13:43:16.166 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name join-latency 13:43:16.166 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name sync-latency 13:43:16.168 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name commit-latency 13:43:16.171 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched 13:43:16.171 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-fetched 13:43:16.172 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-latency 13:43:16.172 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-lag 13:43:16.174 [pool-1-thread-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0 13:43:16.174 [pool-1-thread-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d 13:43:16.176 [pool-1-thread-1] DEBUG o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Kafka consumer initialized 13:43:16.176 [pool-1-thread-1] DEBUG o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Subscribed to topic(s): test Polling 13:43:16.176 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending GroupCoordinator request to broker localhost:9093 (id: -1 rack: null) 13:43:16.238 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Initiating connection to node localhost:9093 (id: -1 rack: null) 13:43:16.288 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST 13:43:16.289 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=localhost;mechs=[PLAIN] 13:43:16.294 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 13:43:16.295 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 13:43:16.295 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 13:43:16.298 [pool-1-thread-1] DEBUG o.a.kafka.common.network.Selector - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1 13:43:16.301 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Completed connection to node -1. Fetching API versions. 13:43:17.494 [pool-1-thread-1] DEBUG o.a.k.c.network.SslTransportLayer - SSL handshake completed successfully with peerHost 'localhost' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' 13:43:17.499 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 13:43:17.885 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_HANDSHAKE_REQUEST 13:43:17.886 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 13:43:18.291 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INITIAL 13:43:18.292 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INTERMEDIATE 13:43:18.652 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to COMPLETE 13:43:18.652 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Initiating API versions fetch from node -1. 13:43:19.017 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Recorded API versions for node -1: (Produce(0): 3 to 5 [usable: 5], Fetch(1): 4 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): UNSUPPORTED, OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 2 [usable: 1], DeleteRecords(21): UNSUPPORTED, InitProducerId(22): UNSUPPORTED, OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): UNSUPPORTED, AddOffsetsToTxn(25): UNSUPPORTED, EndTxn(26): UNSUPPORTED, WriteTxnMarkers(27): UNSUPPORTED, TxnOffsetCommit(28): UNSUPPORTED, DescribeAcls(29): UNSUPPORTED, CreateAcls(30): UNSUPPORTED, DeleteAcls(31): UNSUPPORTED, DescribeConfigs(32): UNSUPPORTED, AlterConfigs(33): UNSUPPORTED, AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): UNSUPPORTED, CreatePartitions(37): UNSUPPORTED, UNKNOWN(42): 0) 13:43:19.018 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending metadata request (type=MetadataRequest, topics=test) to node localhost:9093 (id: -1 rack: null) 13:43:19.423 [pool-1-thread-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = null, nodes = [ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 0 rack: null)], partitions = [Partition(topic = test, partition = 0, leader = 0, replicas = [], isr = [], offlineReplicas = [])]) 13:43:19.827 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Received GroupCoordinator response ClientResponse(receivedTimeMs=1568621599826, latencyMs=3590, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=KafkaExampleConsumer#0, correlationId=0), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 0 rack: null))) 13:43:19.828 [pool-1-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Discovered coordinator ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:43:19.828 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Initiating connection to node ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:43:20.081 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST 13:43:20.081 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=ankitEventHubNameSpace.servicebus.windows.net;mechs=[PLAIN] 13:43:20.083 [kafka-coordinator-heartbeat-thread | $Default] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Heartbeat thread started 13:43:20.083 [pool-1-thread-1] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending synchronous auto-commit of offsets {} 13:43:20.084 [pool-1-thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Revoking previously assigned partitions [] 13:43:20.084 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Disabling heartbeat thread 13:43:20.084 [pool-1-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] (Re-)joining group 13:43:20.085 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending JoinGroup ((type: JoinGroupRequest, groupId=$Default, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@66a0b3d3)) to coordinator ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:43:20.412 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-sent 13:43:20.414 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-received 13:43:20.415 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.latency 13:43:20.416 [pool-1-thread-1] DEBUG o.a.kafka.common.network.Selector - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Created socket with SO_RCVBUF = 66052, SO_SNDBUF = 132104, SO_TIMEOUT = 0 to node 2147483647 13:43:20.416 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Completed connection to node 2147483647. Fetching API versions. 13:43:20.988 [pool-1-thread-1] DEBUG o.a.k.c.network.SslTransportLayer - SSL handshake completed successfully with peerHost 'ankitEventHubNameSpace.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' 13:43:20.989 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 13:43:21.305 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_HANDSHAKE_REQUEST 13:43:21.306 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 13:43:21.621 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INITIAL 13:43:21.621 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INTERMEDIATE 13:44:20.191 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Disabling heartbeat thread 13:44:20.191 [pool-1-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] (Re-)joining group 13:44:20.191 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending JoinGroup ((type: JoinGroupRequest, groupId=$Default, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@41c40398)) to coordinator ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:45:20.368 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Disabling heartbeat thread 13:45:20.368 [pool-1-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] (Re-)joining group 13:45:20.368 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending JoinGroup ((type: JoinGroupRequest, groupId=$Default, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@3120cd0b)) to coordinator ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:46:20.479 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Disabling heartbeat thread 13:46:20.479 [pool-1-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] (Re-)joining group 13:46:20.479 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending JoinGroup ((type: JoinGroupRequest, groupId=$Default, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@3fe2dc6)) to coordinator ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:47:20.597 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Disabling heartbeat thread 13:47:20.597 [pool-1-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] (Re-)joining group 13:47:20.598 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending JoinGroup ((type: JoinGroupRequest, groupId=$Default, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@4f7d07b7)) to coordinator ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:48:19.424 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Initialize connection to node ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 0 rack: null) for sending metadata request 13:48:19.424 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Initiating connection to node ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 0 rack: null) 13:48:19.714 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST 13:48:19.714 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=ankitEventHubNameSpace.servicebus.windows.net;mechs=[PLAIN] 13:48:20.026 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent 13:48:20.027 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received 13:48:20.028 [pool-1-thread-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency 13:48:20.029 [pool-1-thread-1] DEBUG o.a.kafka.common.network.Selector - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Created socket with SO_RCVBUF = 66052, SO_SNDBUF = 132104, SO_TIMEOUT = 0 to node 0 13:48:20.029 [pool-1-thread-1] DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Completed connection to node 0. Fetching API versions. 13:48:20.774 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Disabling heartbeat thread 13:48:20.774 [pool-1-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] (Re-)joining group 13:48:20.774 [pool-1-thread-1] DEBUG o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=KafkaExampleConsumer#0, groupId=$Default] Sending JoinGroup ((type: JoinGroupRequest, groupId=$Default, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@425c4943)) to coordinator ankitEventHubNameSpace.servicebus.windows.net:9093 (id: 2147483647 rack: null) 13:48:21.592 [pool-1-thread-1] DEBUG o.a.k.c.network.SslTransportLayer - SSL handshake completed successfully with peerHost 'ankitEventHubNameSpace.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' 13:48:21.592 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 13:48:21.906 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to SEND_HANDSHAKE_REQUEST 13:48:21.908 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 13:48:22.226 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INITIAL 13:48:22.227 [pool-1-thread-1] DEBUG o.a.k.c.s.a.SaslClientAuthenticator - Set SASL client state to INTERMEDIATE
Can you try running with no ports blocked, disabling firewalls, etc.?
Why is your bootstrap.servers setting localhost:9093? Is that the bastion host doing the redirect to the EH cluster?
1)So if i disable my firewall, ssl throw exception : Caused by: 14:57:00.017 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.k.c.network.SslTransportLayer - SSLEngine.closeInBound() raised an exception. javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack? at sun.security.ssl.Alerts.getSSLException(Alerts.java:208) ~[na:1.8.0_221] at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1647) ~[na:1.8.0_221] at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1615) ~[na:1.8.0_221] at sun.security.ssl.SSLEngineImpl.closeInbound(SSLEngineImpl.java:1542) ~[na:1.8.0_221] at org.apache.kafka.common.network.SslTransportLayer.handshakeFailure(SslTransportLayer.java:797) [kafka-clients-1.0.0.jar:na] at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:257) [kafka-clients-1.0.0.jar:na] at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79) [kafka-clients-1.0.0.jar:na] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:460) [kafka-clients-1.0.0.jar:na] at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.0.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221] 14:57:00.017 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG o.a.kafka.common.network.Selector - [Producer clientId=KafkaExampleProducer] Connection with ankiteventhubnamespace.servicebus.windows.net/104.208.16.0 disconnected due to authentication exception org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
2)If my firewall is on and i re direct using bastion producer works fine but consumers dont. 3)All things works fine if i jar both producer & consumer and run it over bastion
What is the UTC timestamp of the error you pasted above? I can check our traces to see if the connection was closed by Event Hubs or someone else.
its around 9:27am UTC on 23
Closing - all configurations are correct, local networking/firewall issues.
Description
Unable to produce to event hub, its giving org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. Although i can see request and successful request in event hub namespace metrics
How to reproduce
cloned azure-event-hubs-for-kafka and produced msg using quickstart java producer
Has it worked previously?
its a first attempt
Checklist
Please provide the relevant information for the following items:
Java SDK
Java quickstart
1.1.0
producer failure
mac mojave
If this is a question on basic functionality, please verify the following: