jaegertracing / jaeger

CNCF Jaeger, a Distributed Tracing Platform
https://www.jaegertracing.io/
Apache License 2.0
20.61k stars 2.45k forks source link

[Bug]: Kafka integration test failing #4979

Closed james-ryans closed 1 year ago

james-ryans commented 1 year ago

What happened?

I'm trying to test kafka e2e integration test but it keeps failing at subtest GetTrace.

$ ./scripts/kafka-integration-test.sh
Connection to localhost port 9092 [tcp/XmlIpcRegSvc] succeeded!
# Expire tests results for storage integration tests since the environment might change
# even though the code remains the same.
go clean -testcache
bash -c "set -e; set -o pipefail; GOCACHE=/Users/james_ryans/go/src/github.com/james-ryans/jaeger/.gocache go test -race -v -coverpkg=./... -coverprofile cover.out ./plugin/storage/integration/... | sed 's/PASS/✅ PASS/g' | sed 's/FAIL/❌ FAIL/g' | sed 's/SKIP/☠️ SKIP/g'"
...
=== RUN   TestKafkaStorage
=== RUN   TestKafkaStorage/GetTrace
    integration.go:115: Waiting for storage backend to update documents, iteration 1 out of 100
    integration.go:221: trace not found
    ...
    integration.go:115: Waiting for storage backend to update documents, iteration 99 out of 100
    integration.go:221: trace not found
    integration.go:115: Waiting for storage backend to update documents, iteration 100 out of 100
    integration.go:221: trace not found
    integration.go:221: trace not found
    integration.go:225: 
            Error Trace:    /Users/james_ryans/go/src/github.com/james-ryans/jaeger/plugin/storage/integration/integration.go:225
            Error:          Should be true
            Test:           TestKafkaStorage/GetTrace
    trace_compare.go:57: 
            Error Trace:    /Users/james_ryans/go/src/github.com/james-ryans/jaeger/plugin/storage/integration/trace_compare.go:57
                                        /Users/james_ryans/go/src/github.com/james-ryans/jaeger/plugin/storage/integration/integration.go:226
            Error:          Expected value not to be nil.
            Test:           TestKafkaStorage/GetTrace
--- ❌ FAIL: TestKafkaStorage (100.12s)
    --- ❌ FAIL: TestKafkaStorage/GetTrace (100.09s)
❌ FAIL
...
❌ FAIL  github.com/jaegertracing/jaeger/plugin/storage/integration  100.816s
❌ FAIL

I thought my Kafka setup was wrong, but the test successfully produced the traces and consumed incoming messages.

# get the produced traces
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jaeger-kafka-integration-test-1701363535572905000 --from-beginning
{"traceId":"AAAAAAAAAAAAAAAAAAAAEQ==","spanId":"AAAAAAAAAAM=","operationName":"example-operation-1","references":[],"startTime":"2023-11-29T16:46:31.639875Z","duration":"0.000100s","tags":[],"logs":[{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]},{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]}],"process":{"serviceName":"example-service-1","tags":[]}}
{"traceId":"AAAAAAAAAAAAAAAAAAAAEQ==","spanId":"AAAAAAAAAAQ=","operationName":"example-operation-2","references":[],"startTime":"2023-11-29T16:46:31.639875Z","duration":"0.000100s","tags":[],"logs":[{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]},{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]}],"process":{"serviceName":"example-service-2","tags":[]}}
{"traceId":"AAAAAAAAAAAAAAAAAAAAEQ==","spanId":"AAAAAAAAAAU=","operationName":"example-operation-1","references":[],"startTime":"2023-11-29T16:46:31.639875Z","duration":"0.000100s","tags":[],"logs":[{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]},{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]}],"process":{"serviceName":"example-service-3","tags":[]}}
{"traceId":"AAAAAAAAAAAAAAAAAAAAEQ==","spanId":"AAAAAAAAAAY=","operationName":"example-operation-3","references":[],"startTime":"2023-11-29T16:46:31.639875Z","duration":"0.000100s","tags":[{"key":"span.kind","vStr":"server"}],"logs":[{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]},{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]}],"process":{"serviceName":"example-service-1","tags":[]}}
{"traceId":"AAAAAAAAAAAAAAAAAAAAEQ==","spanId":"AAAAAAAAAAc=","operationName":"example-operation-4","references":[],"startTime":"2023-11-29T16:46:31.639875Z","duration":"0.000100s","tags":[{"key":"span.kind","vStr":"client"}],"logs":[{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]},{"timestamp":"2023-11-29T16:46:31.639875Z","fields":[]}],"process":{"serviceName":"example-service-1","tags":[]}}
^CProcessed a total of 5 messages

# get the active consumer
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group kafka-integration-test
GROUP                  TOPIC                                             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
kafka-integration-test jaeger-kafka-integration-test-1701363535572905000 0          -               5               -               kafka-integration-test-34f1d5cd-7e82-4bbc-b41c-1843f63b24d5 /192.168.65.1   kafka-integration-test

After diving into the test code, it seems that the messages was produced while the consumer was still starting. So, when the consumer was ready, it began consuming only the new messages, as the Shopify/sarama package default initial offset is set to the newest.

I'm able to solve this bug by adding a 30-second sleep waiting for the consumer to start up before producing the messages. However, maybe there might be a better solution?

Steps to reproduce

  1. Start a kafka server, docker run --name kafka --network jaeger -p 9092:9092 -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT bitnami/kafka:3.6
  2. Run the kafka integration test, ./scripts/kafka-integration-test.sh

Expected behavior

The kafka integration GetTrace subtest should pass. Try adding time.Sleep(30 * time.Second) to file plugin/storage/integration/integration.go at line 210 and the test will pass.

...
=== RUN   TestKafkaStorage
=== RUN   TestKafkaStorage/GetTrace
    integration.go:115: Waiting for storage backend to update documents, iteration 1 out of 100
    integration.go:221: trace not found
    integration.go:115: Waiting for storage backend to update documents, iteration 2 out of 100
=== RUN   TestKafkaStorage/GetTrace/NotFound_error
--- ✅ PASS: TestKafkaStorage (31.66s)
    --- ✅ PASS: TestKafkaStorage/GetTrace (31.01s)
        --- ✅ PASS: TestKafkaStorage/GetTrace/NotFound_error (0.00s)
✅ PASS
...

Relevant log output

No response

Screenshot

No response

Additional context

No response

Jaeger backend version

v1.51

SDK

No response

Pipeline

No response

Stogage backend

No response

Operating system

No response

Deployment model

No response

Deployment configs

No response

yurishkuro commented 1 year ago

So, when the consumer was ready, it began consuming only the new messages, as the Shopify/sarama package default initial offset is set to the newest.

Rather than 30s wait, perhaps we should configure the consumer to always start from the earliest message? Just need to make sure this will not create interference across tests (ideally they should use different topic names)

yurishkuro commented 1 year ago

also, good to see this is working with 3.6, we currently have Kafka pinned in the workflow https://github.com/jaegertracing/jaeger/blob/1dd4b60a640cd1b81f5bf5e942f003e0572685e8/.github/workflows/ci-kafka.yml#L65

james-ryans commented 1 year ago

Rather than 30s wait, perhaps we should configure the consumer to always start from the earliest message? Just need to make sure this will not create interference across tests (ideally they should use different topic names)

Currently, ingester doesn't have the ability to configure the initial offset (Shopify/sarama package support to start at the newest or oldest offset). Should we add a flag to configure the initial offset (e.g. --kafka.consumer.initial-offset) or do we just need to add a field solely to be configurable by the test?

yurishkuro commented 1 year ago

just the internal configuration available to the test would be enough I think.