Open rverma-nikiai opened 5 years ago
Hello @rverma-nikiai
I don't really know the Kinesis API. For a first look for the "outgoing" side it sounds rather easy as you get a CompletionStage
, so you would use the same kind of approach used for Kafka.
For the "incoming" side, yes it might be a bit more tricky. There are several ways:
PublisherBuilder
using (be aware this is pseudo-code):
PublisherBuilder.fromPublisher(Flowable.generate(emitter -> {
KinesisMessage msg = queue.get();
if (msg == null) {
pollMoreFromKinesis(emitter)
} else {
emitter.onNext(msg);
if (queue.size() < threshold) {
pollMoreFromKinesis(null)
}
}
});
pollMoreFromKinesis(Emitter)
retrieves the records from Kinesis. If the passed emitter is not null the next record needs to be sent (using onNext
) instead of queued.
The queue
is a storage of the records. It is filled by pollMoreFromKinesis
and consumed using the get
method.
BTW, if you want to contribute this connector to the smallrye implementation, you are more than welcome.
@cescoffier I am intended to add this to smallrye implementation only and thanks for your suggestions, they are very helpful.
I have a question. Since kinesis library uses http2 with rxjava what advantages would vertx would provide over that?
Also in the outgoing part, since both kafka and kinesis recommends to write messages in batch, I didn't found anything in kafkaSink which suggests batch workflow. Do you have any recommended approach for this? I did a crude hack at https://github.com/nikiai/smallrye-reactive-messaging/blob/master/smallrye-reactive-messaging-kinesis/src/main/java/io/smallrye/reactive/messaging/kinesis/KinesisSink.java but I would believe the batch process should include a fix flush time and max batch size and should flush when either of them hit the limit.
e.g. if we have a flush rate of 1 sec and batch size of 25, we should flush either every second or when we received 25 messages in the stream, whatever happens first.
@rverma-nikiai Vert.x is not used in all the connectors (the camel connector does not use it), so no worries.
You can use the RX Java 2 API provided by the library. BTW, do you have the link to this lib (I'm a real Kinesis noob).
About batch, it's something I need to fix in the KafkaConnector. But my idea is very close to yours, keep a number of inflight message in a queue and send them "once in a while" (by size or timeout).
The library is
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
</dependency>
And some sample code examples are at
Still stuck since the kinesis client expects a Consumer
and we expect to return PublisherBuilder<Message<?>>
. I think we have to pass a consumer which copies the messages to a Flowable. Does this make sense? Any idea how can we do this?
As a first step, you can use an AsyncProcessor
, and call onNext
with the messages received by the consumer. Then, we can iterate to add proper back pressure.
Hi team,
I have started working on supporting kinesis client as part of reactive-messaging https://github.com/nikiai/smallrye-reactive-messaging/tree/master/smallrye-reactive-messaging-kinesis. I am having some difficulties with Kinesis Source though since kinesis library provides us a subscribeToShard method and I couldn't figure out a way to bind the PublisherBuilder<? extends Message<?>> source to the above subscriber.
Any leads are warmly appreciated.