argoproj / argo-events

Event-driven Automation Framework for Kubernetes
https://argoproj.github.io/argo-events/
Apache License 2.0
2.35k stars 729 forks source link

Kafka event source: no active event server running #1212

Open jonathanyuechun opened 3 years ago

jonathanyuechun commented 3 years ago

Describe the bug

Hi, i'm testing out the kafka event source and kafka sensor examples: https://argoproj.github.io/argo-events/eventsources/setup/kafka.

Event Source

{"level":"info","ts":1620814014.4780571,"logger":"argo-events.eventsource","caller":"cmd/main.go:63","msg":"starting eventsource server","eventSourceName":"kafka","version":"vv1.3.0+7591146.dirty"}
{"level":"info","ts":1620814014.4782612,"logger":"argo-events.eventsource","caller":"metrics/metrics.go:172","msg":"starting metrics server","eventSourceName":"kafka"}
{"level":"info","ts":1620814014.4810283,"logger":"argo-events.eventsource","caller":"leaderelection/leaderelection.go:122","msg":"Not the LEADER, stand by ...","eventSourceName":"kafka"}
{"level":"info","ts":1620814015.3544254,"logger":"argo-events.eventsource","caller":"leaderelection/leaderelection.go:131","msg":"Becoming a Candidate, stand by ...","eventSourceName":"kafka"}
{"level":"info","ts":1620814015.355255,"logger":"argo-events.eventsource","caller":"leaderelection/leaderelection.go:128","msg":"I'm the LEADER, starting ...","eventSourceName":"kafka"}
{"level":"info","ts":1620814015.3553903,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:309","msg":"Starting event source server...","eventSourceName":"kafka"}
{"level":"info","ts":1620814015.3563788,"logger":"argo-events.eventsource","caller":"driver/nats.go:93","msg":"NATS auth strategy: Token","eventSourceName":"kafka","clientID":"client-kafka-eventsource-g984s-58ff45bb8d-qpdgl-302"}
{"level":"info","ts":1620814015.3588736,"logger":"argo-events.eventsource","caller":"driver/nats.go:105","msg":"Connected to NATS server.","eventSourceName":"kafka","clientID":"client-kafka-eventsource-g984s-58ff45bb8d-qpdgl-302"}
{"level":"info","ts":1620814015.3620908,"logger":"argo-events.eventsource","caller":"driver/nats.go:118","msg":"Connected to NATS streaming server.","eventSourceName":"kafka","clientID":"client-kafka-eventsource-g984s-58ff45bb8d-qpdgl-302"}
{"level":"info","ts":1620814015.3621883,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:422","msg":"Eventing server started.","eventSourceName":"kafka"}
{"level":"info","ts":1620814015.3622308,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:332","msg":"starting eventbus connection daemon...","eventSourceName":"kafka"}
{"level":"info","ts":1620814015.362501,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"}
{"level":"info","ts":1620814015.3625581,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"}
{"level":"info","ts":1620814015.3625734,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"}
{"level":"info","ts":1620814015.389459,"logger":"argo-events.eventsource","caller":"kafka/start.go:186","msg":"parsing the partition value...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example","partition-id":"1"}
{"level":"info","ts":1620814015.3895614,"logger":"argo-events.eventsource","caller":"kafka/start.go:193","msg":"getting available partitions...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example","partition-id":"1"}
{"level":"info","ts":1620814015.6654599,"logger":"argo-events.eventsource","caller":"kafka/start.go:199","msg":"verifying the partition exists within available partitions...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example","partition-id":"1"}
{"level":"error","ts":1620814015.6655567,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:439","msg":"Erroring out, no active event server running","eventSourceName":"kafka","stacktrace":"github.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).run\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:439\ngithub.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).Start.func1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:295"}
{"level":"info","ts":1620814015.665716,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:337","msg":"exiting eventbus connection daemon...","eventSourceName":"kafka"}
{"level":"error","ts":1620814015.6686988,"logger":"argo-events.eventsource","caller":"driver/nats.go:84","msg":"NATS connection lost","eventSourceName":"kafka","clientID":"client-kafka-eventsource-g984s-58ff45bb8d-qpdgl-302","stacktrace":"github.com/argoproj/argo-events/eventbus/driver.(*natsStreaming).Connect.func1\n\t/home/runner/work/argo-events/argo-events/eventbus/driver/nats.go:84\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.10.0/nats.go:3802\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.10.0/nats.go:2082"}
{"level":"error","ts":1620814015.6687121,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:296","msg":"failed to start","eventSourceName":"kafka","error":"no active event server running","errorVerbose":"no active event server running\ngithub.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).run\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:442\ngithub.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).Start.func1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:295\nruntime.goexit\n\t/opt/hostedtoolcache/go/1.14.15/x64/src/runtime/asm_amd64.s:1373","stacktrace":"github.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).Start.func1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:296"}

To Reproduce Steps to reproduce the behavior:

Used minikube as POC environment.

Kafka installation

https://strimzi.io/docs/operators/in-development/quickstart.html#assembly-evaluation-str

Tried on argo-events and eventbus:

Expected behavior Events are published to event source and kafka sensor is dispatched

Screenshots

  1. missing kafka sensor pod
  2. wrong status for kafka-eventsource?
$ kubectl get pods

NAME                                       READY   STATUS    RESTARTS   AGE
argo-server-6fb6d4b7fb-2lq5l               1/1     Running   0          8m19s
eventbus-controller-96f86d586-nd52t        1/1     Running   0          8m33s
eventbus-default-stan-0                    2/2     Running   0          8m21s
eventbus-default-stan-1                    2/2     Running   0          8m6s
eventbus-default-stan-2                    2/2     Running   0          8m4s
eventsource-controller-5b9b5dd4b-6t96h     1/1     Running   0          8m33s
kafka-eventsource-97lqq-7f9dc97bfc-8gjtt   1/1     Running   0          5m54s
sensor-controller-77dbfb77bf-4hn8j         1/1     Running   0          8m33s
workflow-controller-d9cbfcc86-l2r8v        1/1     Running   0          8m19s

Environment (please complete the following information):

Additional context

i can see the topic being created by inspecting my broker with kafka-topics.sh


Message from the maintainers:

If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.

jonathanyuechun commented 3 years ago

I updated the issue, the sensor problem was because of insufficient default RBAC permission over argo-worflow-sa service account.

whynowy commented 3 years ago

The failure should be because the partition ID is not available. However, it's an issue that the error is not printed out.

cbuckley01 commented 3 years ago

The failure should be because the partition ID is not available. However, it's an issue that the error is not printed out.

Right that is a real/larger issue, trying to debug connection and other sarama/kafka without the lower level errors is not fun.

github-actions[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had any activity in the last 60 days. It will be closed if no further activity occurs. Thank you for your contributions.

MATRIX4284 commented 3 years ago

This issue is happening as the partition number is not created in the kafka topic that is being used as a part of event source .

GET THE CORRECT PARTITION ID: Just enter the shell of ur kafka pod using

1.kubectl exec -it kafka-sever-pod-id /bin/bash

2.Go to the $KAFKA_HOME/bin folder.

3.Run the command below to see which PARTITION_ID is available

./kafka_topics.sh --describe --zookeeper k8s-svc-host:port -topic topic1

Topic: topic1 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: topic1 Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001

See whether the partition that is mnetioned in the kafka-event-source.yaml exists for the topic being used in kafka_event_src.

You will see that the partition that is mentioned in event source kafka.yaml is not present and that is the reason the kafka event source ceases to run .

if i does not exist then use the partition that is available for the topic i.e. Partition: 0 & 1 is availabe for topic1.

Edit the kafka_event_source manifest file https://github.com/argoproj/argo-events/tree/master/examples/event-sources/kafka.yaml as below:

MATRIX4284 commented 3 years ago

apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: kafka spec: kafka: example:

kafka broker url

  url: kafka.argo-events:9092
  # name of the kafka topic
  topic: topic1
  # jsonBody specifies that all event body payload coming from this
  # source will be JSON
  jsonBody: true
  # partition id
 **### AS BOTH 0 & 1 partition id is available for topic1 so we can set either 0 / 1 as partition ID **
  partition: "0"
  # optional backoff time for connection retries.
  # if not provided, default connection backoff time will be used.
  connectionBackoff:
    # duration in nanoseconds, or strings like "3s", "2m". following value is 10 seconds
    duration: 1000000
    # how many backoffs
    steps: 5
    # factor to increase on each step.
    # setting factor > 1 makes backoff exponential.
    factor: 2
    jitter: 0.2