streamnative / pulsar-hub

The canonical source of StreamNative Hub.
https://hub.streamnative.io
Apache License 2.0
19 stars 11 forks source link

feat(mongo-connector): support copy.existing in source and topics/topic.override in sink #156

Open ericsyh opened 2 years ago

ericsyh commented 2 years ago

Motivation

Compares to mongodb-kafka-connector, pulsar-kafka-connector is quite simple. So it will be better to add some useful features on pulsar-kafka-connector to enhance:

Pls refer the open source code of mongo-kafka.

lzqdename commented 2 years ago

Okay, I will read the code of mongo-kafka firstly!

lzqdename commented 2 years ago

I will read the pulsar io source code next week

lzqdename commented 2 years ago

In pulsar io of mongodb, the offset is not saved , whether this feature should be supported or not in this issue? @ericsyh

ericsyh commented 2 years ago

In pulsar io of mongodb, the offset is not saved , whether this feature should be supported or not in this issue? @ericsyh

@lzqdename Sorry I am not the maintainer of mongo pulsar io, can you give more details about the offset you mentioned that not saving? In source or sink connector? And in which process that not saving?

lzqdename commented 2 years ago

okay,I know, the offset is the position at which the consumer will consume when restarted, you can see : com.mongodb.kafka.connect.source.MongoSourceTask.createCursor 【Pls refer the open source code of mongo-kafka.】 in this function, there is a sub function call : getResumeToken(sourceConfig) in function getResumeToken(sourceConfig), it will read the last consumer offset position in product environment,we shoudl save the offset of the consumed record to avoid repeated cosume action

okay, I will write a basic implementation of "copy.existing" due to lackness of time @ericsyh

ericsyh commented 2 years ago

@lzqdename Cause Pulsar consumer has no need to save offset since the offset info will be saved by Pulsar broker. For example, Pulsar consumer use a subscription name sub-1 to consume for a while and exit for some reason. This consumer just need to connect to pulsar with the same subscription name sub-1, Pulsar broker will send the next message from the last consumed position to this consumer.

lzqdename commented 2 years ago

consume mongodb,not pulsar, u can see https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

line 397 , getResumeToken(sourceConfig) the resume token is token of mongdb @ericsyh

ericsyh commented 2 years ago

@lzqdename OK i see, i think there is no need for you to support saving mongodb offset in this task.