SumoLogic / sumologic-kafka-push

Utility for reading logs from a Kafka Topic and sending to Sumo Logic.
Apache License 2.0
4 stars 4 forks source link

TLS support for bootstrap server configuration #41

Open ashrivastavagit opened 2 years ago

ashrivastavagit commented 2 years ago

Currently it seems there is support to use TLS for bootstrapServers. We have Kafka brokers where TLS and authentication enabled to connect the broker server.

Can we have this support, so that we can use this helm chart to publish Kafka logs on Sumologic.

vsinghal13 commented 2 years ago

TLS support is already available. The extra volumes example refers to the truststore/keystore mounts needed and the settings just need to be put into the conf identified by endpointsSecret https://github.com/SumoLogic/sumologic-kafka-push/tree/main/helm#extra-volumes

vsinghal13 commented 2 years ago

you would include something like:

akka: {
  kafka.consumer: {
    security.protocol: SSL
    ssl.truststore.location: /opt/kafka/config/kafka.truststore.jks
    ssl.truststore.password: trustore_password
    ssl.keystore.location: /opt/kafka/config/client.keystore.jks
    ssl.keystore.password: keystore_password
    ssl.key.password: key_password
    ssl.enabled.protocols: TLSv1.2,TLSv1.1,TLSv1
    ssl.client.auth: required
  }
}
ashrivastavagit commented 2 years ago

Hi Vijit,

Thanks a lot for response.

Do we have option to override “akka” configuration in helm chart, I have not seen this.

If you can share the helm chart actual configuration that will help a lot.

Regards, Anup

From: Vijit Singhal @.> Date: Monday, April 25, 2022 at 8:55 AM To: SumoLogic/sumologic-kafka-push @.> Cc: Anup Kumar Shrivastava @.>, Author @.> Subject: Re: [SumoLogic/sumologic-kafka-push] TLS support for bootstrap server configuration (Issue #41)

you would include something like:

akka: {

kafka.consumer: {

security.protocol: SSL

ssl.truststore.location: /opt/kafka/config/kafka.truststore.jks

ssl.truststore.password: trustore_password

ssl.keystore.location: /opt/kafka/config/client.keystore.jks

ssl.keystore.password: keystore_password

ssl.key.password: key_password

ssl.enabled.protocols: TLSv1.2,TLSv1.1,TLSv1

ssl.client.auth: required

}

}

— Reply to this email directly, view it on GitHubhttps://github.com/SumoLogic/sumologic-kafka-push/issues/41#issuecomment-1108753132, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABG2JZYDJHNGOZF43NFFECDVG257RANCNFSM5TPCEZDA. You are receiving this because you authored the thread.Message ID: @.***>

vsinghal13 commented 2 years ago

you can take a look at: https://github.com/SumoLogic/sumologic-kafka-push/blob/main/helm/README.md#configuration

Basically, the additional conf needs to be put in a file and that file will be specified at endpointsSecret parameter in the helm chart.

ashrivastavagit commented 2 years ago

Thanks a lot Vijit,

I will work on it and will update on this thread for sure.

Regards, Anup

From: Vijit Singhal @.> Date: Monday, April 25, 2022 at 9:31 AM To: SumoLogic/sumologic-kafka-push @.> Cc: Anup Kumar Shrivastava @.>, Author @.> Subject: Re: [SumoLogic/sumologic-kafka-push] TLS support for bootstrap server configuration (Issue #41)

you can take a look at: https://github.com/SumoLogic/sumologic-kafka-push/blob/main/helm/README.md#configuration

Basically, the additional conf needs to be put in a file and that file will be specified at endpointsSecret parameter in the helm chart.

— Reply to this email directly, view it on GitHubhttps://github.com/SumoLogic/sumologic-kafka-push/issues/41#issuecomment-1108791008, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABG2JZ5ZRE5L5KALKBBWB4LVG3CEZANCNFSM5TPCEZDA. You are receiving this because you authored the thread.Message ID: @.***>

ashrivastavagit commented 2 years ago

@vsinghal13 I have added following configuration Chart specification spec: chart: kafka-push version: "0.2.5" sourceRef: kind: HelmRepository name: sumologic-kafka-push

Below are the values that we are adding to override values file.

extraVolumes:

extraVolumes & extraVolumeMounts are being used to add Kafka TLS certificate file.

And endpointsSecret, i am having below configuration and kept in one file i.e. "akka.conf" but that is created as secret since endpointSecret are being used as a secret in your helm chart.

akka: { kafka.consumer: { security.protocol: SSL ssl.truststore.location: /etc/ssl/certs/kafka/truststore.jks ssl.truststore.password: instaclustr ssl.keystore.location: /etc/ssl/certs/kafka/keystore.jks ssl.keystore.password: instaclustr ssl.key.password: instaclustr ssl.enabled.protocols: TLSv1.2,TLSv1.1,TLSv1 ssl.client.auth: required sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";" } }

While deploying this changes, I am getting below logs, & I think connection has not been made to kafka cluster & still having issues.

2022-04-27 00:51:28 WARN [default-dispatcher-21] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.92:9093 (id: -4 rack: null) disconnected 2022-04-27 00:51:28 WARN [default-dispatcher-14] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.100:9093 (id: -3 rack: null) disconnected 2022-04-27 00:51:28 WARN [default-dispatcher-17] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.27:9093 (id: -1 rack: null) disconnected 2022-04-27 00:51:29 WARN [default-dispatcher-25] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.137:9093 (id: -5 rack: null) disconnected 2022-04-27 00:51:29 WARN [default-dispatcher-26] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.40:9093 (id: -2 rack: null) disconnected

Could you please verify and let me know if I am missing any thing here or need some enhancement. PS : SSL and authentication is enable for kafka server.

vsinghal13 commented 2 years ago

were you able to run helm upgrade successfully after the changes that you made?

ashrivastavagit commented 2 years ago

Yes

ddaughertysl commented 2 years ago

On pod startup you should see in the logs the kafka consumer configuration. Does it match with what you are setting in the endpointsSecret? The secret should also be formatted as follows:

{ "application.conf": "{\"akka\":{\"kafka.consumer\": { ... } }" }

ashrivastavagit commented 2 years ago

@ddaughertysl Here is my finding

This is the actual secret file configuration { "application.conf": "{ "akka":{ "kafka.consumer": "security.protocol": "SSL" "ssl.truststore.location": "/etc/ssl/certs/kafka/truststore.jks" "ssl.truststore.password": "instaclustr" "ssl.keystore.location": "/etc/ssl/certs/kafka/truststore.jks" "ssl.keystore.password": "" "ssl.key.password": "" "ssl.enabled.protocols": "TLSv1.2,TLSv1.1,TLSv1" "ssl.client.auth": "required" "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";" }" }

External Secret is getting created with this name akka-conf akka.conf file is created and configured as endpointsSecret: akka-conf

But still getting this from pod logs

2022-04-27 16:25:09 INFO [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.220.5.27:9093, 10.220.5.40:9093, 10.220.5.100:9093, 10.220.5.92:9093, 10.220.5.137:9093, 10.220.5.178:9093] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-kafka-push-logs-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = kafka-push-logs group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class com.sumologic.sumopush.serde.KubernetesLogEventSerde$

2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka version: 2.7.0 2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka commitId: 448719dc99a19793 2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1651076710027 2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.c.KafkaConsumer - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Subscribed to pattern: 'v2dev-instaclustr.*-logs' 2022-04-27 16:25:10 WARN [default-dispatcher-14] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.100:9093 (id: -3 rack: null) disconnected

As per log, it seems value is not being override..

vsinghal13 commented 2 years ago

Please have a look at https://github.com/SumoLogic/sumologic-kafka-push/pull/45

aporwal3 commented 2 years ago

@vsinghal13 Have applied the same configuration as what you have mentioned in your PR. But still the SSL configuration dont works in our environment. We have other applications as well on our cluster which uses the same SSL configuration and it does work on them.

Also did performed the troubleshooting steps as well and all looks correct from that front.

So can you please advise as what might be going wrong ?

Logs: `2022-06-24 11:28:34 INFO [.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started 2022-06-24 11:28:35 INFO [.default-dispatcher-7] a.k.i.SingleSourceLogic - [066ce] Starting. StageActor Actor[akka://sumo-push-main/system/Materializers/StreamSupervisor-0/$$c#97301754] 2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.2x0.x.27:9093, 10.2x0.x.40:9093, 10.2x0.x.137:9093, 10.2x0.x.178:9093] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-kafka-push-logs-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = kafka-push-logs group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = /etc/ssl/certs/kafka/truststore.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = /etc/ssl/certs/kafka/truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class com.sumologic.sumopush.serde.KubernetesLogEventSerde$

2022-06-24 11:28:35 WARN [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - The configuration 'ssl.client.auth' was supplied but isn't a known config. 2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka version: 2.7.0 2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka commitId: 448719dc99a19793 2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1656070115704 2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.c.KafkaConsumer - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Subscribed to pattern: 'v2dev-instaclustr.*-logs' 2022-06-24 11:28:36 WARN [default-dispatcher-24] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.27:9093 (id: -1 rack: null) disconnected 2022-06-24 11:28:37 WARN [default-dispatcher-25] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.100:9093 (id: -3 rack: null) disconnected 2022-06-24 11:28:37 WARN [default-dispatcher-10] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.92:9093 (id: -4 rack: null) disconnected `

vsinghal13 commented 2 years ago

@ddaughertysl any suggestions on this?

ddaughertysl commented 2 years ago

i think this shows progress in that the ssl settings are now being applied and the security protocol is SSL. The question is why aren't they working. Which is a bit harder for us to answer without comparing both the settings that work in your other applications and the sumo-push settings. Also i notice that your ssl.protocol is set to TLSv1.3 and ssl.enabled.protocols don't include that value. So you might also try including v1.3 in the list of enabled protocols.

aporwal3 commented 2 years ago

@ddaughertysl Did the requested change but still the issue persist.

2022-07-01 09:27:03 INFO [.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started 2022-07-01 09:27:05 INFO [.default-dispatcher-5] a.k.i.SingleSourceLogic - [f1e6a] Starting. StageActor Actor[akka://sumo-push-main/system/Materializers/StreamSupervisor-0/$$c#-38628872] 2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.x.27:9093, 10.x.40:9093, 10.x.100:9093, 10.x.92:9093, 10.x.137:9093, 10.x.178:9093] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-kafka-push-logs-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = kafka-push-logs group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.3, TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = /etc/ssl/certs/kafka/truststore.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = /etc/ssl/certs/kafka/truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class com.sumologic.sumopush.serde.KubernetesLogEventSerde$

2022-07-01 09:27:05 WARN [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - The configuration 'ssl.client.auth' was supplied but isn't a known config. 2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka version: 2.7.0 2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka commitId: 448719dc99a19793 2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1656667625750 2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.c.KafkaConsumer - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Subscribed to pattern: 'v2dev-instaclustr.*-logs' 2022-07-01 09:27:06 WARN [default-dispatcher-24] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.27:9093 (id: -1 rack: null) disconnected 2022-07-01 09:27:07 WARN [default-dispatcher-23] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.100:9093 (id: -3 rack: null) disconnected 2022-07-01 09:27:07 WARN [default-dispatcher-24] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.137:9093 (id: -5 rack: null) disconnected 2022-07-01 09:27:08 WARN [default-dispatcher-23] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.40:9093 (id: -2 rack: null) disconnected

ddaughertysl commented 2 years ago

Please send the broker logs since they might have more information about what is happening. Also noticing that you are using the same file for both keystore and truststore. Is that intentional? Can you also send us the settings you are using in your working applications with the passwords redacted?

ddaughertysl commented 2 years ago

The sasl.jaas.config line in the config also looks a little strange. Can you escape the quotes in that config so it looks like the following:

sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
aporwal3 commented 2 years ago

Thanks @ddaughertysl for the hint. We have corrected jaas config now. But the tool is failing now with SASL client authenticator error. Several articles online suggests that it is a temporary error which should get resolved once we restart the app. But that is not happening in our case.

2022-07-06 13:28:34 WARN [default-dispatcher-20] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker xxxxx:9093 (id: -4 rack: null) disconnected 2022-07-06 13:28:34 INFO [default-dispatcher-24] o.a.k.c.n.SaslChannelBuilder - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Failed to create channel due to org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:620) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:200) at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:275) at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:216) at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:142) at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:224) at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338) at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) at org.apache.kafka.common.network.Selector.connect(Selector.java:256) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1004) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:244) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:529) at akka.kafka.internal.KafkaConsumerActor.commitAndPoll(KafkaConsumerActor.scala:515) at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:499) at akka.kafka.internal.KafkaConsumerActor$$anonfun$regularReceive$1.applyOrElse(KafkaConsumerActor.scala:289) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:205) at akka.actor.Timers.aroundReceive(Timers.scala:52) at akka.actor.Timers.aroundReceive$(Timers.scala:41) at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:205) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577) at akka.actor.ActorCell.invoke(ActorCell.scala:547) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

Can you please advise, as what can be an issue now ?

ddaughertysl commented 2 years ago

have you configured your security.protocol and sasl.mechanism as follows? https://kafka.apache.org/documentation/#security_sasl_scram_clientconfig

aporwal3 commented 2 years ago

@ddaughertysl thanks again. it has worked now. But log parsing is failing now with below error. Do we need to configure serdeClass ?

Error Logs:

2022-07-06 18:19:16 ERROR [default-dispatcher-28] c.s.s.a.LogProcessor$ - unable to parse log message payload: Jul 01 13:42:06 8f844057-a8f5-4f0d-9b66-3347d9815750 [2022-07-01 13:42:06,957] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Failed authentication with /10.x.x.8 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector) java.lang.Exception: payload: Jul 01 13:42:06 8f844057-a8f5-4f0d-9b66-3347d9815750 [2022-07-01 13:42:06,957] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Failed authentication with /10.x.x.8 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector) at com.sumologic.sumopush.serde.KubernetesLogEventSerde$.deserialize(KubernetesLogEventSerde.scala:15) at com.sumologic.sumopush.serde.KubernetesLogEventSerde$.deserialize(KubernetesLogEventSerde.scala:8) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at com.sumologic.sumopush.serde.LogEventSerde.deserialize(LogEventSerde.scala:8) at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1387) at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:687) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:638) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:549) at akka.kafka.internal.KafkaConsumerActor$$anonfun$regularReceive$1.applyOrElse(KafkaConsumerActor.scala:296) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:205) at akka.actor.Timers.aroundReceive(Timers.scala:56) at akka.actor.Timers.aroundReceive$(Timers.scala:41) at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:205) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577) at akka.actor.ActorCell.invoke(ActorCell.scala:547) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.json4s.ParserUtil$ParseException: unknown token J Near: J at org.json4s.native.JsonParser$Parser.fail(JsonParser.scala:236) at org.json4s.native.JsonParser$Parser.nextToken(JsonParser.scala:324) at org.json4s.native.JsonParser$.$anonfun$astParser$1(JsonParser.scala:188) at org.json4s.native.JsonParser$.$anonfun$astParser$1$adapted(JsonParser.scala:145) at org.json4s.native.JsonParser$.parse(JsonParser.scala:133) at org.json4s.native.JsonParser$.parse(JsonParser.scala:71) at org.json4s.native.JsonParser$.parse(JsonParser.scala:50) at org.json4s.native.Serialization$.read(Serialization.scala:71) at org.json4s.Serialization.read(Serialization.scala:25) at org.json4s.Serialization.read$(Serialization.scala:25) at org.json4s.native.Serialization$.read(Serialization.scala:32) at com.sumologic.sumopush.model.KubernetesLogEventSerializer$.fromJson(KubernetesLogEvent.scala:47) at com.sumologic.sumopush.serde.KubernetesLogEventSerde$.deserialize(KubernetesLogEventSerde.scala:13) ... 27 common frames omitted

ddaughertysl commented 2 years ago

Yes you should be using the JsonLogEventSerde but it also looks like your messages aren't in Json format. Can you fix that?

aporwal3 commented 2 years ago

@ddaughertysl Logs are in text/string format. Do we need to compulsory move them to JSON format ?

ddaughertysl commented 2 years ago

Yes per the docs https://github.com/SumoLogic/sumologic-kafka-push#supported-message-formats we only support json format at this time.

ashrivastavagit commented 2 years ago

Yes. I had to adjust resource configuration.

Get Outlook for iOShttps://aka.ms/o0ukef


From: Vijit Singhal @.> Sent: Tuesday, April 26, 2022 6:16 PM To: SumoLogic/sumologic-kafka-push @.> Cc: Anup Kumar Shrivastava @.>; Author @.> Subject: Re: [SumoLogic/sumologic-kafka-push] TLS support for bootstrap server configuration (Issue #41)

were you able to run helm upgrade successfully after the changes that you made?

— Reply to this email directly, view it on GitHubhttps://github.com/SumoLogic/sumologic-kafka-push/issues/41#issuecomment-1110427592, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ABG2JZ4KTZE5VTAI2GFX7PDVHCIP7ANCNFSM5TPCEZDA. You are receiving this because you authored the thread.Message ID: @.***>