bytedance / bitsail

BitSail is a distributed high-performance data integration engine which supports batch, streaming and incremental scenarios. BitSail is widely used to synchronize hundreds of trillions of data every day.
https://bytedance.github.io/bitsail/
Apache License 2.0
1.62k stars 331 forks source link

[Bug][Connector] RocketMQ Source Get Start Offset #458

Closed liuxiaocs7 closed 1 year ago

liuxiaocs7 commented 1 year ago

What happened

The logic seems not right, when conf is earliest, it should be seekToBegin instead, vice versa?

  private long getStartOffset(MessageQueue messageQueue) throws MQClientException {
    switch (consumerOffsetMode) {
      case RocketMQSourceOptions.CONSUMER_OFFSET_EARLIEST_KEY:
        consumer.seekToEnd(messageQueue);
        return consumer.getOffsetStore()
            .readOffset(messageQueue, READ_FROM_MEMORY);
      case RocketMQSourceOptions.CONSUMER_OFFSET_LATEST_KEY:
        consumer.seekToBegin(messageQueue);
        return consumer.getOffsetStore()
            .readOffset(messageQueue, READ_FROM_MEMORY);
      case RocketMQSourceOptions.CONSUMER_OFFSET_TIMESTAMP_KEY:
        return consumer.offsetForTimestamp(messageQueue, consumerOffsetTimestamp);
      default:
        throw BitSailException.asBitSailException(
            RocketMQErrorCode.CONSUMER_FETCH_OFFSET_FAILED,
            String.format("Consumer offset mode = %s not support right now.", consumerOffsetMode));
    }
  }

What do you expect to happen

No response

How to reproduce

No.

Build Environment

No response

Execution Environment

No response

BitSail version

master

BitSail Component or Code Module

BitSail Connector

Are you willing to submit PR?

Code of Conduct