apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.79k stars 1.74k forks source link

[Bug] [Source-Rocketmq] DUPLICATE CONSUME in streaming mode #7225

Closed ssfve closed 2 weeks ago

ssfve commented 1 month ago

Search before asking

What happened

Trying to consume from rocketMq-source, only send 1 message to topics, but will duplicate consume again and again,

Script is set to streaming will keep running, there are 4 queues for my topic, so there are 4 consumer threads created

ConsumeBroadcastEnable is set to true , does it mean clustering mode is changed to broadcast mode(not needed)?

the offset is not updated, whether commit.on.checkpoint=true is set or not, thus in the screenshot, 4 - 8 -12 ...

POSSIBLY in RocketMqSourceReader.java in function pollNext() when updating sourceSplit.setStartOffset()

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  job.name = "v2.streaming.config.rocketmq-paimon.track"
  checkpoint.interval = 1000
}

source {
        Rocketmq {
                name.srv.addr = "10.30.21.52:9876"
                topics = "TEST-TRACK-DL-BTN-webhook-T"
                #topics = "SeaTunnel-Consumer-Group"
                #start.mode = "CONSUME_FROM_GROUP_OFFSETS"
                #consumer.group = "TEST-TRACK-DL-BTN-webhook-T-datahub-C"
                #partition.discovery.interval.millis = 1000
                #commit.on.checkpoint=false
                batch.size=1
                format = "json"
                schema = {
                        fields {
                                eventProps = "map<string, string>"
                                deviceProps = "map<string, string>"
                                uuidWebsite = string
                                uuidApp = string
                                userId = int
                                source = string
                        }
                }
        }
}

sink {
    Console {
        }
}

Running Command

$SEATUNNEL_HOME/bin/seatunnel.sh --config /home/user488/seatunnel/v2.streaming.config.rocketmq-console.track

Error Exception

duplicate consume, ingestion is ok, consumerOffset not updated in time
run in cluster mode
using Zeta Engine

Zeta or Flink or Spark Version

2.3.5

Java or Scala Version

RocketMQ and seatunnel on Java 17

Screenshots

Screenshot 2024-07-17 170237

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 2 weeks ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.