Closed blindspotbounty closed 1 year ago
Generally a good start! However, I think we should not have a statistics closure and all these new initializers, but rather an
AsyncSequence
as we do withKafkaConsumer.messages
or the producer acknowledgements. The implementation of statistics should therefore follow a similar pattern. Feel free to reach out to me if you need any more pointers for that!
Thank you for the feedback. I think I got you. Basically, we add type to KafkaEvent
, e.g. statistics
and use it across the code.
Let me try it out, it sounds promising regarding how it should look like.
Generally a good start! However, I think we should not have a statistics closure and all these new initializers, but rather an
AsyncSequence
as we do withKafkaConsumer.messages
or the producer acknowledgements. The implementation of statistics should therefore follow a similar pattern. Feel free to reach out to me if you need any more pointers for that!Thank you for the feedback. I think I got you. Basically, we add type to
KafkaEvent
, e.g.statistics
and use it across the code. Let me try it out, it sounds promising regarding how it should look like.
Yes, so you would probably want to add that to KafkaEvent
, receive the statistics
event through KafkaClient.eventPoll
and then yield
the statistics to some new AsyncSequence
that is exposed by KafkaConsumer
+ KafkaProducer
.
Generally a good start! However, I think we should not have a statistics closure and all these new initializers, but rather an
AsyncSequence
as we do withKafkaConsumer.messages
or the producer acknowledgements. The implementation of statistics should therefore follow a similar pattern. Feel free to reach out to me if you need any more pointers for that!Thank you for the feedback. I think I got you. Basically, we add type to
KafkaEvent
, e.g.statistics
and use it across the code. Let me try it out, it sounds promising regarding how it should look like.Yes, so you would probably want to add that to
KafkaEvent
, receive thestatistics
event throughKafkaClient.eventPoll
and thenyield
the statistics to some newAsyncSequence
that is exposed byKafkaConsumer
+KafkaProducer
.
I pushed a draft for Consumer (not touching producer so far), just to be sure that I got the idea correctly. If it is okay, I will perform other cleanup in code and put the same code for producer as well.
Yes it is going into the right direction!
@blindspotbounty If you are ready and can push up the changes addressing the review, I am happy to get this over the line
@FranzBusch I've pushed commit with addressing conversations. But remained two conversations open in case it should be different.
@swift-server-bot add to allowlist
@swift-ci please test
@blindspotbounty can you take a look at the CI failures?
@FranzBusch sure. But I think it is better to restart ci after resolving conflicts.
@swift-ci please test
Seems, it is the same as for PR for backpressure:
11:12:15 /code/Sources/Kafka/RDKafka/RDKafkaClient.swift:33:17: error: stored property 'kafkaHandle' of 'Sendable'-conforming class 'RDKafkaClient' has non-sendable type 'OpaquePointer'
https://github.com/swift-server/swift-kafka-client/pull/139#issuecomment-1772685064
This is approximate solution for exposing librdkafka statistics callback (https://github.com/swift-server/swift-kafka-gsoc/issues/79)
Main ideas: