cloudevents / sdk-go

Go SDK for CloudEvents
https://cloudevents.github.io/sdk-go/
Apache License 2.0
776 stars 216 forks source link

Provide report channel `Events()` for confluent kafka producer #1031

Closed yanmxa closed 2 months ago

yanmxa commented 2 months ago

Signed-off-by: myan myan@redhat.com Resolved: https://github.com/cloudevents/sdk-go/issues/1030

yanmxa commented 2 months ago

I was just starting the review until I noticed we are already passing an error channel to Produce which according to Confluent docs is an option to not have to poll Events() channel and should not lead to memory leak. Do you see memory leaks in our implementation?

It is also possible to direct delivery reports to alternate channels by providing a non-nil chan Event channel to .Produce()

https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html

After running a service with our current implementation for a few days, The memory usage is from 75 MB to about 180 MB

image

Then I check the memory through pprof, found it mainly caused by the func _Cfunc_GoBytes and the related issues: https://github.com/confluentinc/confluent-kafka-go/issues/578#issuecomment-931642924, https://github.com/confluentinc/confluent-kafka-go/issues/1043#issuecomment-1691038426

image

Note: All the confluent producer samples initialize a goroutine to retrieve the event from the producer.Events() chan which I missed before.

embano1 commented 2 months ago

Sorry for being a bit pedantic here, but since we follow Confluent's producer logic and pass an event channel, either something is fundamentally wrong with the Confluent library or we are not using it correctly. I don't see adding another goroutine to handle this solving the problem IF the Confluent library behaves as expected, allowing a return channel on produce which we use accordingly: https://github.com/cloudevents/sdk-go/blob/e6a74efbacbf4ac70fc8ed598e678f94b6be7a6d/protocol/kafka_confluent/v2/protocol.go#L136

Can you please debug further whether that channel is not drained correctly and whether you see events also in Event() which should not be the case then?

embano1 commented 2 months ago

Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the .Events() channel as *kafka.Message and you should check msg.TopicPartition.Error for nil to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil chan Event channel to .Produce(). If no delivery reports are wanted they can be completely disabled by setting configuration property "go.delivery.reports": false.

Btw (unrelated to this, but still important): we aren't calling Flush(), are we?

When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function .Flush() that will block until all message deliveries are done or the provided timeout elapses.

yanmxa commented 2 months ago

Sorry for being a bit pedantic here, but since we follow Confluent's producer logic and pass an event channel, either something is fundamentally wrong with the Confluent library or we are not using it correctly. I don't see adding another goroutine to handle this solving the problem IF the Confluent library behaves as expected, allowing a return channel on produce which we use accordingly:

https://github.com/cloudevents/sdk-go/blob/e6a74efbacbf4ac70fc8ed598e678f94b6be7a6d/protocol/kafka_confluent/v2/protocol.go#L136

Can you please debug further whether that channel is not drained correctly and whether you see events also in Event() which should not be the case then?

@embano1 You get the point. It does seem wired to use deliveryChan and producer.Events() at the same time. Let's hold this PR. I need to get more information to understand why then doing this in the sample

yanmxa commented 2 months ago

Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the .Events() channel as *kafka.Message and you should check msg.TopicPartition.Error for nil to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil chan Event channel to .Produce(). If no delivery reports are wanted they can be completely disabled by setting configuration property "go.delivery.reports": false.

Btw (unrelated to this, but still important): we aren't calling Flush(), are we?

When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function .Flush() that will block until all message deliveries are done or the provided timeout elapses.

Yes, We don't need to call the Flush(). Cause each time we send a message, then wait directly for the result from the deliveryChan.

yanmxa commented 2 months ago

Yes, We don't need to call the Flush(). Cause each time we send a message, then wait directly for the result from the deliveryChan.

Makes sense.

However, I wonder what happens if a user constructs the producer with the following option:

If no delivery reports are wanted they can be completely disabled by setting configuration property "go.delivery.reports": false.

Would the producer block indefinitely or panic on nil event? Can you please test this and make sure your Send still works?

Sure! I verified the following cases:

1. What happens when set both producer.Events() chan and the customized derliveryChan?

2. What happens with the configuration "go.delivery.reports": false based on the current implementation?

3. What happens with the configuration "go.delivery.reports": false and the producer.Events() chan?

I also found that this issue, https://github.com/cloudevents/sdk-go/issues/1030 is not necessarily related to our current implementation, so I am planning to close the issue and the current PR. But I will keep tracing why the confluent sample uses the producer.Events() and deliveryChan at the same time. And once the conclusion is reached, I will update it here.

@embano1 What do you think?

embano1 commented 2 months ago

What happens when set both producer.Events() chan and the customized derliveryChan?

That result is what we are expecting, thanks.

What happens with the configuration "go.delivery.reports": false based on the current implementation?

Interesting, seems custom event delivery channel overwrites this setting then. Weird configuration behavior, but in-line with the underlying Kafka callback implementation so perhaps reasonable.

What happens with the configuration "go.delivery.reports": false and the producer.Events() chan?

OK, so that setting works then.

Thx for your investigations, proposed next steps SGTM!

yanmxa commented 2 months ago

@embano1 I posted an issue in the community, looks like the producer.Events() is also provided to handle some events not associated with the sending message. https://github.com/confluentinc/confluent-kafka-go/issues/1163

It does report the connection event from the producer.Events() chan, even if the deliveryChan exist:

{"level":"info","ts":1712455974.5608,"logger":"fallback","caller":"v2/protocol.go:291","msg":"get an error event from producer.Events() chan: 127.0.0.1:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)"}
embano1 commented 2 months ago

@yanmxa thx for verifying! So I suggest implementing a select {} pattern on both channels then. Assuming producer.Events() only fires if there's an error which is not related to the provided send/produce call channel, we should be good not leaking resources, i.e., there should never be an event on the other channel we pass to produce/send and we can close it on return.

yanmxa commented 2 months ago

@yanmxa thx for verifying! So I suggest implementing a select {} pattern on both channels then.

@embano1 I'm worried that switching deliverChan to select{} pattern would make the Sender an asynchronous method, which means it will always return success immediately?

Assuming producer.Events() only fires if there's an error which is not related to the provided send/produce call channel, we should be good not leaking resources, i.e., there should never be an event on the other channel we pass to produce/send and we can close it on return.

Yes, We release the deliveryChan passed to the producer/send once the producer is done.

Is the current implementation consistent with your ideas?

embano1 commented 2 months ago

Select blocks, ie, is not making the code async. Let me see if I can propose something to simplify the current PR.

embano1 commented 2 months ago

Quick question: what was the reason we decided to use a custom event delivery channel instead of reading everything from producer.Events()?

yanmxa commented 2 months ago

Quick question: what was the reason we decided to use a custom event delivery channel instead of reading everything from producer.Events()?

@embano1 use the "delieryChan" to get the result after each send so that we can change the asynchronous method Produce into a synchronous method to adapt to the Sender.

I choose deliveryChan instead of producer.Events(). Because I don't find any official examples that use producer.Events() chan to block and wait for the result of produce/send. They all [use producer.Events() in a separate gourontine](https://github.com/search?q=repo%3Aconfluentinc%2Fconfluent-kafka-go%20p.Events()&type=code). But it does provide many cases where produce is blocked and waiting for a result to be sent using deliveryChan.

Now we know that they do this because producer.Events() also contains events that aren't related to Produce itself.

embano1 commented 2 months ago

This is really non-intuitive API design in the Confluent go SDK if you ask me :/

I'm proposing a slightly different implementation to

1) Simplify the developer experience for sdk-go with Confluent Kafka users 1) Simplify our implementation/complexity/maintenance

Produce() is fundamentally async and we would need to wrap sync behavior around it because Send has synchronous semantics in sdk-go today. However, wrapping is not easy because Produce() uses buffers and batching under the covers so that we'd need success and error handlers so users can correlate whether a Send() was successful, i.e., inspect each event in Events() and correlate to the appropriate Send() call. Instead, we could just surface that Confluent SDK behavior to users, given they're familiar with that SDK/behavior, and giving them full control to follow SDK best practices/usage without complicating our implementation and the developer experience.

diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go
index 8aa9068..e98ab51 100644
--- a/protocol/kafka_confluent/v2/protocol.go
+++ b/protocol/kafka_confluent/v2/protocol.go
@@ -12,9 +12,10 @@
    "io"
    "sync"

+   "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+
    "github.com/cloudevents/sdk-go/v2/binding"
    "github.com/cloudevents/sdk-go/v2/protocol"
-   "github.com/confluentinc/confluent-kafka-go/v2/kafka"

    cecontext "github.com/cloudevents/sdk-go/v2/context"
 )
@@ -33,15 +34,14 @@ type Protocol struct {
    consumerTopics       []string
    consumerRebalanceCb  kafka.RebalanceCb                          // optional
    consumerPollTimeout  int                                        // optional
-   consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional
+   consumerErrorHandler func(ctx context.Context, err kafka.Error) // optional
    consumerMux          sync.Mutex
    consumerIncoming     chan *kafka.Message
    consumerCtx          context.Context
    consumerCancel       context.CancelFunc

    producer             *kafka.Producer
-   producerDeliveryChan chan kafka.Event // optional
-   producerDefaultTopic string           // optional
+   producerDefaultTopic string // optional

    closerMux sync.Mutex
 }
@@ -85,9 +85,6 @@ func New(opts ...Option) (*Protocol, error) {
    if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {
        return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer")
    }
-   if p.producer != nil {
-       p.producerDeliveryChan = make(chan kafka.Event)
-   }
    return p, nil
 }

@@ -128,23 +125,25 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ..
        kafkaMsg.Key = []byte(messageKey)
    }

-   err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...)
-   if err != nil {
-       return err
+   if err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...); err != nil {
+       return fmt.Errorf("create producer message: %w", err)
    }

-   err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
-   if err != nil {
-       return err
-   }
-   e := <-p.producerDeliveryChan
-   m := e.(*kafka.Message)
-   if m.TopicPartition.Error != nil {
-       return m.TopicPartition.Error
+   if err = p.producer.Produce(kafkaMsg, nil); err != nil {
+       return fmt.Errorf("send message: %w", err)
    }
+
    return nil
 }

+func (p *Protocol) Events() (chan kafka.Event, error) {
+   if p.producer == nil {
+       return nil, errors.New("producer not set")
+   }
+
+   return p.producer.Events(), nil
+}
+
 func (p *Protocol) OpenInbound(ctx context.Context) error {
    if p.consumer == nil {
        return errors.New("the consumer client must be set")
@@ -238,7 +237,6 @@ func (p *Protocol) Close(ctx context.Context) error {

    if p.producer != nil && !p.producer.IsClosed() {
        p.producer.Close()
-       close(p.producerDeliveryChan)
    }

    return nil
embano1 commented 2 months ago

If we agree on the above, needs code comments for public methods and usage example

yanmxa commented 2 months ago

Hello, @embano1!

I'd like to consult your opinion on something related to kubeCon China. Since I couldn't find your contact information, I'm reaching out to you here.

Over the past year, we've brought some updates about the message queue to this repos. If you're interested, I can draft a proposal for the enhancements of cloudevents message queue to the KubeCon China. The main people involved would be the two of us. The main topics covered include bringing MQTT binding to support IoT cases, and some improvements related to Kafka such as supporting message confirm, consumption from a specific point, batch subscribing to topics, and supporting ordered partition offsets, etc.

How do you feel about it?

yanmxa commented 2 months ago

If we agree on the above, needs code comments for public methods and usage example

It's a good choice to give the producer.Events() to the user!

If we deprecate the deliveryChan, then we need to think of another question: when to invoke the producer.Flush() to deliver all the buffered messages to the Kafka broker? invoke it in Sender or Closer? I agree with the above once this question is settled.

  • When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function .Flush() that will block until all message deliveries are done or the provided timeout elapses.
embano1 commented 2 months ago

If we deprecate the deliveryChan, then we need to think of another question: when to invoke the producer.Flush() to deliver all the buffered messages to the Kafka broker? invoke it in Sender or Closer? I agree with the above once this question is settled.

Do we need formal deprecation of deliveryChan since we don't expose it? Or do you just mean removing that code? I thought about Sender calling Flush too, but that might be too much (perf). So Close() might be the better option since it would also release Events() eventually (would need to check the Confluent SDK on the behavior again).

embano1 commented 2 months ago

I'd like to consult your opinion on something related to kubeCon China. Since I couldn't find your contact information, I'm reaching out to you here.

Can you please reach out on the CloudEvents Slack channel and ping me there?

yanmxa commented 2 months ago

If we deprecate the deliveryChan, then we need to think of another question: when to invoke the producer.Flush() to deliver all the buffered messages to the Kafka broker? invoke it in Sender or Closer? I agree with the above once this question is settled.

Do we need formal deprecation of deliveryChan since we don't expose it? Or do you just mean removing that code? I thought about Sender calling Flush too, but that might be too much (perf). So Close() might be the better option since it would also release Events() eventually (would need to check the Confluent SDK on the behavior again).

I think we can just remove the code cause the user cannot sense it.

Yes. Calling Flush() in Sender definitely affects the performance. We do it in Closer.

It looks like we have reached a consensus. Then I will proceed with these changes and add some examples.

embano1 commented 2 months ago

Thx @yanmxa ! Definitely easier code on our side - however, that Confluent SDK is really hard to use given all the asynchrony and error handling one has to perform to catch all possible scenarios - guess not much further we can do here :/

yanmxa commented 2 months ago

Thx @yanmxa ! Definitely easier code on our side - however, that Confluent SDK is really hard to use given all the asynchrony and error handling one has to perform to catch all possible scenarios - guess not much further we can do here :/

Absolutely! It seems like they traded ease of use for improved performance with their asynchronous approach. That's made us put in extra work to handle it.

I'm thinking we might consider adding an option in the future to switch the producer to a synchronous method, avoiding users having to watch producer.Events() channel additionally. Like the following snippet:

err = producer.Produce(msg, nil)
if err != nil {
    return fmt.Errorf("failed to produce message: %w", err)
}
event := <-producer.Events()
embano1 commented 2 months ago

I'm thinking we might consider adding an option in the future to switch the producer to a synchronous method, avoiding users having to watch producer.Events() channel additionally. Like the following snippet:

err = producer.Produce(msg, nil)
if err != nil {
    return fmt.Errorf("failed to produce message: %w", err)
}
event := <-producer.Events()

Initially I tried that, but IMHO it's not that easy. Because Produce to Kafka is async, you don't know which event you're getting/returning here, making it problematic for callers to understand whether the returned event/error was for the same CloudEvent just sent with Send(). We'd need additional correlation logic and also handle errors not caused be a specific event, but also returned on this channel (as I understood the Confluent docs).

yanmxa commented 2 months ago

I'm thinking we might consider adding an option in the future to switch the producer to a synchronous method, avoiding users having to watch producer.Events() channel additionally. Like the following snippet:

err = producer.Produce(msg, nil)
if err != nil {
    return fmt.Errorf("failed to produce message: %w", err)
}
event := <-producer.Events()

Initially I tried that, but IMHO it's not that easy. Because Produce to Kafka is async, you don't know which event you're getting/returning here, making it problematic for callers to understand whether the returned event/error was for the same CloudEvent just sent with Send(). We'd need additional correlation logic and also handle errors not caused be a specific event, but also returned on this channel (as I understood the Confluent docs).

Exactly! Making the Producer synchronous also has 3 options currently:

  1. Using producer.Events() just like the above snippet. As you said need additional logic to handle the errors not associated with the produced events.
  2. Using the customized deliveryChan to handle the produced events, and watch producer.Events() channel in a separate goroutine to handle other errors
  3. Using producer. Flush() after sending the message, which also is discussed.

However, should we convert it to synchronous? Which option should we use? If it is necessary, we can discuss it later.

embano1 commented 2 months ago

Which option should we use? If it is necessary, we can discuss it later.

Yeah, let's keep this out of this PR for now