grafana / alloy

OpenTelemetry Collector distribution with programmable pipelines
https://grafana.com/oss/alloy
Apache License 2.0
1.48k stars 223 forks source link

otelcol.exporter.kafka #2033

Closed elysiumHL closed 3 weeks ago

elysiumHL commented 4 weeks ago

What's wrong?

No response

Steps to reproduce

Here’s the scenario: we’re using Grafana Alloy 1.4.3 to collect logs, process them, and then send them to both Loki and Kafka. We want to use tenant_id as the tenant value for Loki and as the topic name for Kafka to ensure data isolation.

However, although we’re adding the tenant_id attribute using the otelcol.processor.attributes component, the topic_from_attribute parameter in otelcol.exporter.kafka doesn’t seem to work. The topic created in Kafka remains otlp_logs instead of dynamically reflecting tenant_id.

It’s unclear whether this issue stems from a problem with otelcol.exporter.kafka or from modifications made in Grafana Alloy.

loki.process "crm" { forward_to = [loki.write.crm.receiver] stage.tenant { value = "crm" } stage.static_labels { values = { tenant = "新CRM", } } stage.labeldrop { values = ["stream", "job"] } stage.drop { source = "service" expression = "^infra.*" } }

otelcol.processor.attributes "crm" { output { logs = [otelcol.exporter.kafka.crm.input] } action { key = "tenant_id" action = "upsert" value = "crm" } action { key = "tenant" action = "upsert" value = "新CRM" } } otelcol.processor.filter "crm" { output { logs = [otelcol.processor.attributes.crm.input] } logs { logrecord = [ "IsMatch(attributes[\"service\"], \"^infra.*\")", ] } }

otelcol.receiver.loki "crm" { output { logs = [otelcol.processor.filter.crm.input] } }

loki.write "crm" { endpoint { url = "http://uat.loki.sre.zclocal/loki/api/v1/push" } } otelcol.exporter.kafka "crm" { protocol_version = "3.5.2" brokers = ["x.x.x.x:9092","x.x.x.x:9092","x.x.x.x:9092"] topic_from_attribute = "tenant_id" encoding = "otlp_json" sending_queue { enabled = false } }

- Here is a log entry from the Kafka otlp_logs topic. Based on the log content from the topic, the tenant_id attribute should have been appended to the logs.

{ "resourceLogs": [ { "resource": {}, "scopeLogs": [ { "scope": {}, "logRecords": [ { "timeUnixNano": "1730772433594340000", "observedTimeUnixNano": "1730772433607872872", "body": { "stringValue": "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)" }, "attributes": [ { "key": "loki.attribute.labels", "value": { "stringValue": "service" } }, { "key": "service", "value": { "stringValue": "auth" } }, { "key": "tenant_id", "value": { "stringValue": "crm" } }, { "key": "tenant", "value": { "stringValue": "新CRM" } } ], "traceId": "", "spanId": "" } ] } ] } ] }



### System information

centos7.9

### Software version

alloy 1.4.3

### Configuration

_No response_

### Logs

_No response_
elysiumHL commented 3 weeks ago

Can use the Resource Detection Processor to achieve this. However, I think the Resource Processor is more suitable.