streamnative / pulsar-spark

Spark Connector to read and write with Pulsar
Apache License 2.0
113 stars 50 forks source link

[BUG] Pulsar Backlog Pileup - No Message Acknowledgements to Pulsar #178

Open sbandaru opened 1 month ago

sbandaru commented 1 month ago

Describe the bug While using connector 3.4.03 to read from pulsar topic, backlogs on pulsar topic pile up and don't get cleared up till TTL. Spark Version- 3.5.0 Scala 2.12 Pulsar Version- 2.10.0.7 streamnative connector - pulsar-spark-connector_2.12-3.4.0.3.jar

To Reproduce Steps to reproduce the behavior: Running this script as a notebook process messages fine, but results in backlogs in pulsar that don't get cleared up till TTL val pulsarDF = spark.readStream .format("pulsar") .option("service.url", serviceUrl) .option("topic", topicName) .option("predefinedSubscription", subscriptionName) .option("pulsar.reader.receiverQueueSize", "10000") .option("pulsar.client.operationTimeoutMs", "60000") // Set to 60 seconds .option("failOnDataLoss", "false")
.load()

// Process the binary data and rename columns val processedDF = pulsarDF.selectExpr( "CAST(value AS STRING) as message", "CAST(__publishTime AS STRING) as pulsarPublishTime" )

val query = processedDF.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", checkpointLocation) .table(tablePath) // This writes to managed table managed by Unity catalog

Expected behavior Pulsar Backlogs are zero or close to zero

Additional context It appears that stream native connector is not sending message acknowledgements to Pulsar broker causing the backlogs. Doesn't the stream native connector use Consumer API to read and ack?