streamnative / pulsar-spark

Spark Connector to read and write with Pulsar
Apache License 2.0
113 stars 50 forks source link

Fix the issue of getting initial offset with user provided start #149

Closed chaoqin-li1123 closed 1 year ago

chaoqin-li1123 commented 1 year ago

Motivation

Fix the 2 bugs in getUserProvidedMessageId

  1. consumer.asInstanceOf[ConsumerImpl[GenericRecord]].hasMessageAvailable may throw exception if consumer is freshly started.
  2. initial message id is not wrap in UserProvidedMessageId class, which cause the first message to be skipped.
  3. when consumer seek() and read() it actually skip the start message if the message is not a batch message

Modifications

No longer call hasMessageAvailable() method. Wrap initial offset inside UserProvidedMessageId change to use reader instead of consumer.

Verifying this change

(Please pick either of the following options)

Documentation

Check the box below.

Need to update docs?

chaoqin-li1123 commented 1 year ago

I find another issue, when consumer seek on offset that is not a batch message id, it will skip a message, previously this method was using reader instead of consumer, which doesn't have the same issue. I have verified this in the test.