flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.73k stars 648 forks source link

[BUG] flyteadmin kafka sender panic #3117

Closed Apophyllite closed 1 year ago

Apophyllite commented 1 year ago

Describe the bug

Hello all, it seems like a bug.

https://github.com/flyteorg/flyteadmin/blob/master/pkg/async/cloudevent/factory.go#L71 the kafka sender/producer will close immediately after the NewCloudEventsPublisher initialized.

I am getting the following error:

panic: send on closed channel

goroutine 1574 [running]:
github.com/Shopify/sarama.(*syncProducer).SendMessage(0xc000f352d8, 0xc00069ad20)
    /go/pkg/mod/github.com/!shopify/sarama@v1.26.4/sync_producer.go:95 +0x78
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2.(*Sender).Send(0xc001a36700, {0x2bf7a70, 0xc000f2daa0}, {0x2bf7cd8?, 0xc0006e4800}, {0x0, 0x0, 0x0})
    /go/pkg/mod/github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2@v2.8.0/sender.go:74 +0x224
github.com/cloudevents/sdk-go/v2/client.(*ceClient).Send(0xc000c85860, {0x2bf7a70, 0xc000f2daa0}, {{0x2c09130, 0xc001ac67e0}, {0xc001acea20, 0x11b, 0x11b}, 0x1, 0x0})
    /go/pkg/mod/github.com/cloudevents/sdk-go/v2@v2.8.0/client/client.go:133 +0x2ee
github.com/flyteorg/flyteadmin/pkg/async/cloudevent/implementations.(*KafkaSender).Send(0xc00139b7c0, {0x2bf7a70, 0xc000ba9560}, {0x0?, 0x0?}, {{0x2c09130, 0xc001ac67e0}, {0xc001acea20, 0x11b, 0x11b}, ...})
    /go/src/github.com/flyteorg/flyteadmin/pkg/async/cloudevent/implementations/sender.go:46 +0x12b

Expected behavior

kafka publisher can send cloudevents.

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

welcome[bot] commented 1 year ago

Thank you for opening your first issue here! 🛠

cosmicBboy commented 1 year ago

@pingsutw can we close this? does #499 a address this issue?