apache / rocketmq-connect

A tool for scalable and reliably streaming data between Apache RocketMQ and other systems.
https://rocketmq.apache.org/
Apache License 2.0
122 stars 118 forks source link

message synchronization support up to the queue level #137

Open Slideee opened 2 years ago

Slideee commented 2 years ago

Now message synchronization is only supported to the topic level. The runtime does not specify the target queue when sending the message, so the queue after the message is sent to the target topic is random. This causes the messages of the source topic to be inconsistently queued in the target topic, which will cause confusion in message consumption after synchronizing the ConsumerOffset. Therefore, you can encapsulate SourceTask and SinkTask through WorkerDirectTask, specify the queue where the source message is located in SinkTask, and send the source message to the specified queue of the target topic through SinkTask. Want to add a SinkTask to RmqSourceReplicator to solve this problem, what do you think?

In this way, the synchronization of messages at the topic level can be solved by the current configuration, or the synchronization at the queue level can be solved by configuring WorkerDirectTask.

现在消息同步只支持到topic级别,发送消息时runtime并没有指定目标队列,所以消息发送到目标topic后的队列是随机的。这样就造成了源topic的消息在目标topic中队列不一致,这样在同步ConsumerOffset后会造成消息消费的错乱。因此可以通过WorkerDirectTask封装SourceTask和SinkTask,在SinkTask中指定源消息所在的队列,通过SinkTask将源消息发送到目标topic的指定队列。想为RmqSourceReplicator添加一个SinkTask去解决这个问题,大家怎么认为呢。

这样,既可以通过当前的配置解决消息在topic级别之间的同步,也可以通过配置WorkerDirectTask解决queue级别的同步。

Slideee commented 2 years ago

@odbozhou what do you think :)

hellodake commented 2 years ago

Replicator目前直接将Target MQ作为自身依赖的MQ,所以没有sink,如果引入sink则需要再增加一套MQ,造成资源浪费。 针对上面问题,我目前做法是将queueId放入payload[]中,在消息发送到Target MQ时,放入指定队列。

Slideee commented 2 years ago

Replicator目前直接将Target MQ作为自身依赖的MQ,所以没有sink,如果引入sink则需要再增加一套MQ,造成资源浪费。 针对上面问题,我目前做法是将queueId放入payload[]中,在消息发送到Target MQ时,放入指定队列。

In fact, there is no need to add a set of MQ, WorkerDirectTask solves this problem

odbozhou commented 2 years ago
  1. Queue-level synchronization problem. By starting multiple tasks, each task uses the same consumerGroup to realize that a topic can be consumed by multiple tasks in cluster mode.
  2. The problem of confusion in message consumption after ConsumerOffset may be solved by timestamp