Closed LadyCodeSmith closed 5 months ago
@LGouellec can you help us to identify the issue ?
Hi @LadyCodeSmith ,
Streamiz and Kafka Streams as well use different internal topics to backup the state or materialize a repartition (meaning a key change). When the application starts, it will describe the topics present in the cluster via an Admin API Call to know if these internal topics are already created or not, if not, let create them.
So your application must be able to get the metadata from your cluster : https://docs.confluent.io/platform/current/kafka/authorization.html#cluster-resource-operations
Today the design is there, we can probably switch to request a metadata call per internal topic.
Thank you for this information @LGouellec . Do you have some sample code to request metadata call per internal topic @LGouellec ?
@LadyCodeSmith Today it's not available, but if you look at https://github.com/LGouellec/kafka-streams-dotnet/blob/7b10cbc6c8940845d6d105398d29845ad4a33d8e/core/Processors/DefaultTopicManager.cs#L52. We will replace by a metadata call per topic few lines after
Feel free to submit a PR
Description
When running the Streamiz Kafka Stream application, it attempts to describe all topics in the Kafka cluster instead of just the specified topic.The access logs show multiple attempts by the application to access topics for which the user does not have authorization.This behavior occurs consistently whenever the application is started, regardless of the configuration settings.
How to reproduce
Checklist
Please provide the following information:
[ ]public async Task StartAsync() { try { // Configure Stream var streamConfig = new StreamConfig<StringSerDes, StringSerDes>(); streamConfig.ApplicationId = KafkaConsumerGroup; streamConfig.ClientId = ConsumerId; streamConfig.BootstrapServers = KafkaBrokers; streamConfig.SslCertificateLocation = KafkaCertificateLocation; streamConfig.StateDir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString()); streamConfig.AutoOffsetReset = AutoOffsetReset.Earliest; streamConfig.SecurityProtocol = SecurityProtocol.SaslSsl; streamConfig.SaslMechanism = SaslMechanism.Gssapi; streamConfig.SaslKerberosServiceName = SaslKerberosServiceName; streamConfig.SaslKerberosPrincipal = SaslKerberosPrincipal;
} catch (Exception ex) { Log.Error($"Error while processing aggregate. Error: {ex.Message} - Stacktrace: {ex.StackTrace}"); throw; } } Streamiz.Kafka.Net version = "1.5.1"
stream-application [ASBCustomersDDNAggregate Consumers] Start creation of the stream application with this
configuration:
Stream property:
client.id: ASB.DDNAggregate ConsumerService
num.stream.threads: 1
default.key.serdes: Streamiz Kafka.Net.SerDes.StringSerDes
default value.serdes: Streamiz.Kafka.Net.SerDes.StringSerDes
default.timestamp.extractor: Streamiz Kafka.Net.Processors.Internal.FailOnInvalidTimestamp
commit.interval.ms: 30000
processing.guarantee: AT_LEAST_ONCE
transaction.timeout: 00:00:10
*poll.ms: 100
max.poll.records: 500
max.poll.restoring.records: 1000
max.task.idle.ms: 0
buffered.records.per.partition: 1000
inner.exception.handler. System.Func 2(System.Exception, Streamiz.Kafka.Net.ExceptionHandlerResponse]
production.exception.handler:
System.Func 2[Confluent.Kafka.Delivery Report 2[System.Byte[], System. Byte[]], Streamiz. Kafka.Net.ExceptionHandlerResponse)
deserialization.exception.handler:
System.Func 4[Streamiz Kafka.Net.ProcessorContext, Confluent.Kafka.ConsumeResult 2[System. Byte[], System. Byte[]], System.Exce
ption, Streamiz Kafka.Net.ExceptionHandlerResponse]
rocksdb.config.setter: System.Action 2[System.String, Streamiz. Kafka.Net.State. RocksDb, RocksDbOptions]
follow.metadata: False
state.dir: C:\Users\dev.anijam\AppData\Local\Temp\73bdf55d-af0b-4196-874f-96ee3146cd6d
replication.factor 1
windowstore.changelog.additional.retention.ms: 86400000
offset.checkpoint.manager.
metrics.interval.ms: 30000
metrics.recording.level: INFO
metrics.reporter. System. Action 1 [System.Collections.Generic.IEnumerable 1 [Streamiz Kafka.Net.Metrics.Sensor]]
expose.librdkafka.stats: False
start.task.delay.ms: 5000 max.degree.of.parallelism: 8
application.id:
ASBCustomers DDNAggregate Consumers
Client property:
bootstrap.servers:
Inxatca6895.prod.asbgroup.co.nz:9093,Inxcomp6892.prod.asbgroup.co.nz:9093,Inxcomp6893.prod.asbgroup.co.nz:9093
ssl.certificate.location: KafkaCerts/webappkafkabrokerdev.asbbank.co.nz.pem
security.protocol:
sasl_ssl
*Sasl.mechanism:
GSSAPI
sasl.kerberos.service.name:
kafka
sasl.kerberos.principal:
kafka
Consumer property:
max.poll.interval.ms: 300000
enable.auto.commit: False
enable.auto.offset.store: False
partition.assignment strategy: cooperative-sticky
auto.offset.reset:
earliest
Producer property:
None
Admin client property:
None
info
: Streamiz.Kafka.Net. Processors.Stream Thread[0]
info
stream-thread [ASB.DDNAggregate ConsumerService-stream-thread-0] Creating shared producer client
: Streamiz Kafka.Net. Processors.Stream Thread[0]
stream-thread(ASB.DDNAggregateConsumerService-stream-thread-0] Creating consumer client
info:
Streamiz.Kafka.Net.KafkaStream [0]
stream-application [ASBCustomers DDNAggregate Consumers] State transition from CREATED to REBALANCING
info: Streamiz Kafka.Net KafkaStream [0] info: Streamiz Kafka.Net.KafkaStream [0]
stream-application[ASBCustomers DDNAggregateConsumers] Starting Streams client with this topology: Topologies: Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [ASB.FileNet. Document-Aggregated])
--> KSTREAM-MAPVALUES-0000000001
Processor. KSTREAM-MAPVALUES-0000000001 (stores: [])
--> KSTREAM-AGGREGATE-0000000002
<--KSTREAM-SOURCE-0000000000
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [agg-store])
--> KTABLE-TOSTREAM-0000000003
<-- KSTREAM-MAPVALUES-0000000001
Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
--> KSTREAM-PEEK-0000000004
<-- KSTREAM-AGGREGATE-0000000002
Processor: KSTREAM-PEEK-0000000004 (stores: [])
none
-- KTABLE-TOSTREAM-0000000003
<
[14:53:43 [Infomation) Consumer started, Ctrl-C to stop consuming
info: Streamiz.Kafka.Net Processors. Stream Thread[0]
stream-thread [ASB.DDNAggregate ConsumerService-stream-thread-0] Starting
info: Streamiz Kafka.Net.Processors.Stream Thread[0]
stream-thread[ASB.DDNAggregateConsumerService-stream-thread-0] State transition from CREATED to STARTING
info: Streamiz Kafka.Net. Processors. Stream Thread[0]
stream-thread[ASB.DDNAggregateConsumerService-stream-thread-0] State transition from STARTING to
PARTITIONS ASSIGNED
info: Streamiz Kafka.Net.Kafka.Internal Streams RebalanceListener[0]
Partition assignment took 00:00:00.1470479 ms. Currently assigned active tasks: 0-11,0-10,0-9,0-8,0-7,0-6,0-5,0-4,0-3,0-2,0-1,0-0
Revoked assigned active tasks:
info: Streamiz Kafka.Net.Processors.Stream Thread [0]
stream-thread [ASB.DDNAggregateConsumerService-stream-thread-0] State transition from PARTITIONS_ASSIGNED to
RUNNING
info: Streamiz Kafka. Net KafkaStream [0]
stream-application[ASBCustomers DDNAggregateConsumers] State transition from REBALANCING to RUNNING
info: Streamiz Kafka.Net. Processors Internal. ProcessorStateManager[0] PARTITIONS_ASSIGNED
info: Streamiz. Kafka.Net.Kafka.Internal. Streams RebalanceListener[0]
Partition assignment took 00:00:00.1470479 ms.
Currently assigned active tasks: 0-11,0-10,0-9,0-8,0-7,0-6,0-5,0-4,0-3,0-2,0-1,0-0
Revoked assigned active tasks:
info: Streamiz Kafka.Net.Processors.Stream Thread[0]
stream-thread ASB.DDNAggregateConsumerService-stream-thread-0] State transition from PARTITIONS_ASSIGNED to
RUNNING
info: Streamiz Kafka.Net.KafkaStream [0]
stream-application[ASBCustomers DDNAggregate Consumers] State transition from REBALANCING to RUNNING
info: Streamiz Kafka.Net Processors.Internal ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregateConsumers-agg-store-changelog [[11]] of in-
memory state store agg-store
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/11] Task 0-11 state transition from CREATED to RESTORING
info: Streamiz Kafka.Net Processors.Stream Task[0]
stream-task[0/11] Restoration will start soon.
info: Streamiz Kafka.Net. Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregateConsumers-agg-store-changelog [[10]] of in-
memory state store agg-store
info: Streamiz.Kafka.Net. Processors.Stream Task [0]
stream-task[0/10] Task 0-10 state transition from CREATED to RESTORING
info
: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/10] Restoration will start soon.
info: Streamiz Kafka.Net.Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregate Consumers-agg-store-changelog [[9]] of in-
memory state store agg-store
info: Streamiz Kafka.Net Processors. Stream Task [0]
stream-task[019] Task 0-9 state transition from CREATED to RESTORING
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[019] Restoration will start soon.
info: Streamiz Kafka.Net.Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomersDDNAggregateConsumers-agg-store-changelog [[8]] of in-
memory state store agg-store
info: Streamiz Kafka.Net.Processors.Stream Task[0]
stream-task[018] Task 0-8 state transition from CREATED to RESTORING PARTITIONS_ASSIGNED
info: Streamiz. Kafka.Net.Kafka.Internal. Streams RebalanceListener[0]
Partition assignment took 00:00:00.1470479 ms.
Currently assigned active tasks: 0-11,0-10,0-9,0-8,0-7,0-6,0-5,0-4,0-3,0-2,0-1,0-0
Revoked assigned active tasks:
info: Streamiz Kafka.Net.Processors.Stream Thread[0]
stream-thread ASB.DDNAggregateConsumerService-stream-thread-0] State transition from PARTITIONS_ASSIGNED to
RUNNING
info: Streamiz Kafka.Net.KafkaStream [0]
stream-application[ASBCustomers DDNAggregate Consumers] State transition from REBALANCING to RUNNING
info: Streamiz Kafka.Net Processors.Internal ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregateConsumers-agg-store-changelog [[11]] of in-
memory state store agg-store
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/11] Task 0-11 state transition from CREATED to RESTORING
info: Streamiz Kafka.Net Processors.Stream Task[0]
stream-task[0/11] Restoration will start soon.
info: Streamiz Kafka.Net. Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregateConsumers-agg-store-changelog [[10]] of in-
memory state store agg-store
info: Streamiz.Kafka.Net. Processors.Stream Task [0]
stream-task[0/10] Task 0-10 state transition from CREATED to RESTORING
info
: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/10] Restoration will start soon.
info: Streamiz Kafka.Net.Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregate Consumers-agg-store-changelog [[9]] of in-
memory state store agg-store
info: Streamiz Kafka.Net Processors. Stream Task [0]
stream-task[019] Task 0-9 state transition from CREATED to RESTORING
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[019] Restoration will start soon.
info: Streamiz Kafka.Net.Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomersDDNAggregateConsumers-agg-store-changelog [[8]] of in-
memory state store agg-store
info: Streamiz Kafka.Net.Processors.Stream Task[0]
stream-task[018] Task 0-8 state transition from CREATED to RESTORING PARTITIONS_ASSIGNED
info: Streamiz. Kafka.Net.Kafka.Internal. Streams RebalanceListener[0]
Partition assignment took 00:00:00.1470479 ms.
Currently assigned active tasks: 0-11,0-10,0-9,0-8,0-7,0-6,0-5,0-4,0-3,0-2,0-1,0-0
Revoked assigned active tasks:
info: Streamiz Kafka.Net.Processors.Stream Thread[0]
stream-thread ASB.DDNAggregateConsumerService-stream-thread-0] State transition from PARTITIONS_ASSIGNED to
RUNNING
info: Streamiz Kafka.Net.KafkaStream [0]
stream-application[ASBCustomers DDNAggregate Consumers] State transition from REBALANCING to RUNNING
info: Streamiz Kafka.Net Processors.Internal ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregateConsumers-agg-store-changelog [[11]] of in-
memory state store agg-store
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/11] Task 0-11 state transition from CREATED to RESTORING
info: Streamiz Kafka.Net Processors.Stream Task[0]
stream-task[0/11] Restoration will start soon.
info: Streamiz Kafka.Net. Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregateConsumers-agg-store-changelog [[10]] of in-
memory state store agg-store
info: Streamiz.Kafka.Net. Processors.Stream Task [0]
stream-task[0/10] Task 0-10 state transition from CREATED to RESTORING
info
: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/10] Restoration will start soon.
info: Streamiz Kafka.Net.Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomers DDNAggregate Consumers-agg-store-changelog [[9]] of in-
memory state store agg-store
info: Streamiz Kafka.Net Processors. Stream Task [0]
stream-task[019] Task 0-9 state transition from CREATED to RESTORING
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[019] Restoration will start soon.
info: Streamiz Kafka.Net.Processors.Internal. ProcessorStateManager[0]
Initializing to the starting offset for changelog ASBCustomersDDNAggregateConsumers-agg-store-changelog [[8]] of in-
memory state store agg-store
info: Streamiz Kafka.Net.Processors.Stream Task[0]
stream-task[018] Task 0-8 state transition from CREATED to RESTORING a total number of O records
info: Streamiz Kafka.Net. Processors.Internal.StoreChangelogReader[0]
Finished restoring changelog agg-store to store ASBCustomers DDNAggregateConsumers-agg-store-changelog [[7]] with
a total number of 0 records
info: Streamiz Kafka.Net.Processors.Internal.StoreChangelogReader[0]
Finished restoring changelog agg-store to store ASBCustomers DDNAggregateConsumers-agg-store-changelog [[8]] with
a total number of 0 records
info: Streamiz. Kafka.Net Processors.Internal.StoreChangelogReader[0]
Finished restoring changelog agg-store to store ASBCustomers DDNAggregate Consumers-agg-store-changelog [[10]] with
a total number of O records
info: Streamiz Kafka.Net. Processors.Internal.StoreChangelogReader[0]
Finished restoring changelog agg-store to store ASBCustomers DDNAggregateConsumers-agg-store-changelog [[11]] with
a total number of 0 records
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/11] Task 0-11 state transition from RESTORING to RUNNING
info: Streamiz Kafka.Net. Processors.StreamTask[0]
stream-task[0/11] Restored and ready to run
info: Streamiz Kafka, Net. Processors.Stream Task[0]
stream-task[0/10) Task 0-10 state transition from RESTORING to RUNNING
info: Streamiz Kafka.Net. Processors.Stream Task[0]
stream-task[0/10] Restored and ready to run
info: Streamiz Kafka.Net.Processors.Stream Task [0]
stream-task[018] Task 0-8 state transition from RESTORING to RUNNING
info: Streamiz Kafka.Net.Processors. Stream Task[0]
stream-task[0/8] Restored and ready to run
info: Streamiz Kafka.Net.Processors.Stream Task[0]
stream-task[0/7] Task 0-7 state transition from RESTORING to RUNNING
info: Streamiz Kafka.Net.Processors. Stream Task[0]
stream-task[0[7] Restored and ready to run
info: Streamiz Kafka.Net. Processors.StreamTask[0]
stream-task[0/6] Task 0-6 state transition from RESTORING to RUNNING
info: Streamiz Kafka.Net. Processors.Stream Task [0]
stream-task[0/6] Restored and ready to run
info: Streamiz Kafka.Net. Processors.Stream Task[0] Task [0]
stream-task[0/5) Task 0-5 state transition from RESTORING to RUNNING
info: Streamiz Kafka.Net Processors.Stream Task[0]
This issue poses a security risk as it allows unauthorized access to Kafka topics
Check ACL all the permissions and the consumer configs are setup correctly