cloudevents / sdk-go

Go SDK for CloudEvents
https://cloudevents.github.io/sdk-go/
Apache License 2.0
837 stars 223 forks source link

Confluent kafka binding #988

Closed yanmxa closed 8 months ago

yanmxa commented 11 months ago

Signed-off-by: myan myan@redhat.com

Adding the Kafka binding with https://github.com/confluentinc/confluent-kafka-go. This implementation will support the following features compared to the sarama binding.

Note: In the confluent samples, with configuration WithPollGoroutines(1) to ensure the order of the messages on the partition.

Since Kafka already natively utilizes consumer groups to enhance consumption efficiency, there is no need to use multiple goroutines to boost consumption speed.

Resolve: https://github.com/cloudevents/sdk-go/issues/918

yanmxa commented 11 months ago

/cc @embano1 @duglin @lionelvillard @clyang82

yanmxa commented 10 months ago

@embano1 Thanks for your review!

embano1 commented 10 months ago

Thy @yanmxa and sorry for my slow responses. I'm under the water at the moment at work and can't give you a concrete date when I'll be able to review. Perhaps @duglin and @lionelvillard can take another look?

lionelvillard commented 10 months ago

@yanmxa can you update the documentation: https://github.com/cloudevents/sdk-go/blob/main/docs/protocol_implementations.md?

Also I wonder if it makes sense to indicate (maybe in the documentation) this is a new implementation and it hasn't been widely tested in the wild. WDYT?

yanmxa commented 10 months ago

@yanmxa can you update the documentation: https://github.com/cloudevents/sdk-go/blob/main/docs/protocol_implementations.md?

Also I wonder if it makes sense to indicate (maybe in the documentation) this is a new implementation and it hasn't been widely tested in the wild. WDYT?

Thanks @lionelvillard! I update the protocol binding of the related document, https://github.com/cloudevents/sdk-go/pull/1008. PTAL~

yanmxa commented 10 months ago

Thanks @duglin !

yanmxa commented 8 months ago

@duglin @embano1 @lionelvillard PTAL~

yanmxa commented 8 months ago

I haven't checked the whole PR because for some parts especially structured/binary handling, I'd like to see more tests for non-happy path scenarios and mis-use if possible. Could you please also add Confluent to the Github integration tests so we run this against Confluent images?

Thank @embano1!

I added some tests for the misuse cases when trying to initialize the client. If others come up with scenarios beyond these, we can continually improve it in the future.

Also, I updated the GitHub integration test workflow to add the kafka_confluent service(running on port 9192, the sarama running on 9092) with the confluent image. So the current confluent kafka protocol uses it for the tests.

yanmxa commented 8 months ago

@embano1 Thanks for your patient reviews all along! Cause several multi-cloud management projects in our team depend on this. If you could provide feedback and suggestions on this PR as soon as possible, it would be of great help to us!

yanmxa commented 8 months ago

Some minor things left, sorry...had some time to dig a bit deeper. Should be easy to address. Then please squash your commits.

Never mind! Done.

yanmxa commented 8 months ago

Looked closer at the Close/Send/OpenInbound implementations and there's still races/chances for panics. Left comments on how to simplify

Thanks a lot! Done!

embano1 commented 8 months ago

@yanmxa

I spend some time looking at the OpenInbound/Send/Receive/Close functions and suggesting this patch to remove some unneeded code and tidy things up:

diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go
index 249855c..7c83087 100644
--- a/protocol/kafka_confluent/v2/protocol.go
+++ b/protocol/kafka_confluent/v2/protocol.go
@@ -99,10 +99,8 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ..

    p.closerMux.Lock()
    defer p.closerMux.Unlock()
+   
    if p.producer.IsClosed() {
-       if p.producerDeliveryChan != nil && !isClosed(p.producerDeliveryChan) {
-           close(p.producerDeliveryChan)
-       }
        return errors.New("producer is closed")
    }

@@ -177,9 +175,7 @@ func (p *Protocol) OpenInbound(ctx context.Context) error {
                logger.Errorf("failed to close the consumer: %v", err)
            }
        }
-       if p.consumerIncoming != nil && !isClosed(p.producerDeliveryChan) {
-           close(p.consumerIncoming)
-       }
+       close(p.consumerIncoming)
    }()

    for {
@@ -228,21 +224,14 @@ func (p *Protocol) Close(ctx context.Context) error {
    p.closerMux.Lock()
    defer p.closerMux.Unlock()

-   if p.consumerCancel != nil {
-       p.consumerCancel()
-   }
-
    if p.producer != nil && !p.producer.IsClosed() {
        p.producer.Close()
+       close(p.producerDeliveryChan)
    }
-   return nil
-}

-func isClosed(ch <-chan kafka.Event) bool {
-   select {
-   case <-ch:
-       return true
-   default:
+   if p.consumerCancel != nil {
+       p.consumerCancel()
    }
-   return false
+
+   return nil
 }
embano1 commented 8 months ago

Let me know if you have questions.

yanmxa commented 8 months ago

Let me know if you have questions.

@embano1 Thanks! Your advice makes the code clearer.

I have a slight concern, perhaps unnecessary and we can just ignore it. That is what if the producer isn't closed by the Closer, that will cause producerDeliveryChan not to be closed?

embano1 commented 8 months ago

I have a slight concern, perhaps unnecessary and we can just ignore it. That is what if the producer isn't closed by the Closer, that will cause producerDeliveryChan not to be closed?

Please add a comment to the Close(ctx) method that protocol users must call Close(ctx) to properly clean up resources after use. Also, add defer p.Close(ctx) to all your examples

yanmxa commented 8 months ago

I have a slight concern, perhaps unnecessary and we can just ignore it. That is what if the producer isn't closed by the Closer, that will cause producerDeliveryChan not to be closed?

Please add a comment to the Close(ctx) method that protocol users must call Close(ctx) to properly clean up resources after use. Also, add defer p.Close(ctx) to all your examples

Done!

embano1 commented 8 months ago

Please add a doc string to the Close() method as well, advising users to call Close() after use. Please directly squash your commits so we can merge. Also, any related documentation/README updates needed to further highlight this new protocol implementation?

embano1 commented 8 months ago

Also, why did you not use WithPollGoroutines in all receiver examples? Should be clear why this is advised e.g., also adding a comment to the example explaining that this won't have perf impact and is only required if the user cares about in-order processing when using the SDK.

yanmxa commented 8 months ago

Also, why did you not use WithPollGoroutines in all receiver examples? Should be clear why this is advised e.g., also adding a comment to the example explaining that this won't have perf impact and is only required if the user cares about in-order processing when using the SDK.

@embano1 Thanks for your prompt feedback!

  1. Add some comments for the Close method and WithPollGoroutines in all receiver examples.
  2. Update the related document in this repository.
  3. Squash the commits

Could you please help me to check if the wording in the above document is appropriate?

embano1 commented 8 months ago

Can you please check your integration tests? They're not looking positive, but passing: https://github.com/cloudevents/sdk-go/actions/runs/8420863202/job/23056453953?pr=988#step:5:284 (example)

embano1 commented 8 months ago

This unit test is also not looking ok: https://github.com/cloudevents/sdk-go/actions/runs/8420863203/job/23056453818#step:4:3883

yanmxa commented 8 months ago

This unit test is also not looking ok: https://github.com/cloudevents/sdk-go/actions/runs/8420863203/job/23056453818#step:4:3883

The error log disappears in the unit test now.

yanmxa commented 8 months ago

Can you please check your integration tests? They're not looking positive, but passing: https://github.com/cloudevents/sdk-go/actions/runs/8420863202/job/23056453953?pr=988#step:5:284 (example)

Done! The integration test error log is from here.

embano1 commented 8 months ago

@duglin can you please take a final look just overall whether we missed a README or general requirements for a new protocol binding?

duglin commented 8 months ago

Sorry, been swamped. I'll try to give it a look today

duglin commented 8 months ago

@lionelvillard too :-)

duglin commented 8 months ago

LGTM

I'll let @embano1 hit the button.