Azure / azure-event-hubs-spark

Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Apache License 2.0
233 stars 173 forks source link

Missing events during write to event hub, unable to catch exceptions #642

Open Silverutm opened 2 years ago

Silverutm commented 2 years ago

We are trying to readStream from Event hubs, after the read part we are using the writeStream and foreachbatch, within the foreach batch we are doing some operations and writing the microbatch to another set of event hubs.

During the write part, we are missing few events randomly. We can surely say our processing is not dropping it because of the DQ check we have.

Another issue is, if we have given a wrong event hub name (which doesn't exist), the write part doesn't throw an exception and it just moves on to the next microbatch

Here is the sample code

val inputconnectionString = ConnectionStringBuilder("Endpoint=sb://xyz.servicebus.windows.net/;SharedAccessKeyName=SendAuthorizationRule;SharedAccessKey=Xyz=;EntityPath=TestInputEventHub").build

val inputehConf = EventHubsConf(inputconnectionString)
          .setConsumerGroup("XYZ")
          .setStartingPosition(EventPosition.fromEnqueuedTime("XYZ")
          .setMaxEventsPerTrigger("XYZ")
          .setReceiverTimeout("XYZ")

val reader = sparkSession
        .readStream
        .format("eventhubs")
        .options(inputehConf.toMap)
        .load()

val streamingQueryHandle = reader
        .writeStream
        .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
          logger.log("Starting StreamProcessor", isDebug = false)
          EventStreamProcessor.process(batchDF, batchId, scdProviders)
        }
        .outputMode("update")
        .option("checkpointLocation", AppConfigs.checkpointLocation)
        .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(AppConfigs.triggerIntervalDurationInSeconds)))       
        .start()

//For each batch executes this process
def process (batchDF: DataFrame, batchId: Long, scdProviders: mutable.Map[String, SCDProvider])
{

batchDF.persist()
val outputDF = <some operations>

val outputconnectionString = "Endpoint=sb://xyz.servicebus.windows.net/;SharedAccessKeyName=SendAuthorizationRule;SharedAccessKey=Xyz=;EntityPath=TestOutputEventHub"
val outputehConf = EventHubsConf(outputconnectionString)

try {
        outputDF.toJSON.selectExpr("value as body")
            .write
            .format("eventhubs")
            .options(eventHubsConfWrite.toMap) 
            .save()

}
catch {
      case ex: Exception => {
        println("Unable to save the files to EH " + ehConf.toMap)
        ex.printStackTrace()
        throw ex
}
}
}

I need to be able to catch the exception if there is some events not persisted to event hub during write. Also, Why don't i get an exception if i write to event hub that doesn't even exists.

Any help or pointers are appreciated.

wallystart commented 2 years ago

Hello! The same happens to me in Python, in my case it happens in: write, read, writeStream and readStream.

The bug is probably in Scala, it doesn't launch events there and Python doesn't launch events either (according source code, Python depends Scala events)

With my team we build this code for the streaming process, we force the event with awaitTermination(). Maybe something like this will work in your batch process

try:
  stream = read_sDF.writeStream \
                    .format("eventhubs") \
                    .options(**eventhubConfigurations) \
                    .option("checkpointLocation", eventhubCheckpointPath)

  # start streaming
  streamManager = stream.start()
  # force error here, if any
  streamManager.awaitTermination()

except StreamingQueryException as error:
   if error.desc.startswith("Detected schema change"): # capture error according source code message
          print("Schema changed, await...")
          if streamManager.status['isTriggerActive']:
              # force stop
              streamManager.stop()
  else:
          # unrecognized error, raise exception
          raise StreamingQueryException(error.desc, error.stackTrace)

Comments:

Hope it helps

Silverutm commented 2 years ago

We already have this

streamingQueryHandle.awaitTermination()

also, @nyaghma why when we try this

connStr="Endpoint=sb://xyzEHN.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=WrongKey;EntityPath=NonExistingEH"

we don't get any errors when doing

...
.write
.format("eventhubs")