streamnative / pulsar-spark

Spark Connector to read and write with Pulsar
Apache License 2.0
113 stars 50 forks source link

[BUG] Data loss and data duplication when the Pulsar reader experiences network connectivity issues #181

Open neilramaswamy opened 2 weeks ago

neilramaswamy commented 2 weeks ago

Describe the bug

I have observed the Pulsar spark connector occasionally drops data and duplicates data. It seems to be happening when the underlying Pulsar client has connectivity issues with the Pulsar broker.

To Reproduce

I have a unit test that writes 10 numbers, 11 through 20, to a Pulsar topic, and then attempts to read back all the data from that topic using Structured Streaming. Occasionally, I have observed that the test reads more than 10 messages, and, less frequently, less than 10 messages. For example, this very simple test has returned:

  == Results ==
  !== Correct Answer - 10 ==   == Spark Answer - 11 ==
   struct<value:string>        struct<value:string>
   [11]                        [11]
  ![12]                        [11]
  ![13]                        [12]
  ![14]                        [13]
  ![15]                        [14]
  ![16]                        [15]
  ![17]                        [16]
  ![18]                        [17]
  ![19]                        [18]
  ![20]                        [19]
  !                            [20]

This doesn't always happen—it's flaky. To reproduce this, you might have to run this 100s of times in an environment that can have network slowdowns/failures. I ran this on an internal cloud environment.

Expected behavior

The connector should always read back exactly 10 messages.

Additional context

I added logs to the pulsar-spark library and re-ran my test hundreds of times. I then saw the following logs:

24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Reading currentMessage from the reader
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Got currentMessage=11
24/11/05 02:21:11 INFO PulsarSourceUtils: Returning false for enteredEnd with end=1:12:-1 and current=1:10:-1:0
24/11/05 02:21:11 INFO ConnectionHandler: [persistent://public/default/topic1] [spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Reconnecting after timeout
24/11/05 02:21:11 INFO ConsumerImpl: [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Subscribing to topic on cnx [id: 0x456bd5bc, L:/127.0.0.1:55998 - R:localhost/127.0.0.1:32803], consumerId 1
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,196+0000 [pulsar-io-19-13] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:37088] Subscribing on topic persistent://public/default/topic1 / spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,196+0000 [pulsar-io-19-13] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Creating non-durable subscription at msg id 1:10:-1:-1 - {}
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,198+0000 [pulsar-io-19-13] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic1-spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Rewind from 1:10 to 1:10
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,199+0000 [pulsar-io-19-13] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:37088] Created subscription on topic persistent://public/default/topic1 / spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4
24/11/05 02:21:11 INFO ConsumerImpl: [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Subscribed to topic on localhost/127.0.0.1:32803 -- consumer: 1
24/11/05 02:21:11 INFO CodeGenerator: Code generated in 33.914034 ms
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Reading currentMessage from the reader
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Got currentMessage=11

The logs that start with [DBG] are my own, and I added them at the end of getNext in PulsarSourceRDDBase. You'll notice that the record 11 is read, we get that the pulsar client prints that it is Reconnecting after timeout, and then 11 is read again. I have also noticed the original read reading arbitrary values, like 13.

I dug a little bit into the Pulsar codebase and found that there are known issues around the Pulsar reader's seek method; see this issue for example. This other issue has the same symptom that I am seeing here. I believe that this PR fixes the underlying issue, so my recommendation would be for us to upgrade the Pulsar client version of this library to be something that has this fix, i.e. ^3.0.4.

ericm-db commented 2 weeks ago

Addressing this here: https://github.com/streamnative/pulsar-spark/pull/180