Azure / azure-event-hubs-spark

Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Apache License 2.0
233 stars 173 forks source link

message_id not populated in systemProperties when reading from eventhub #607

Open jimtodd92 opened 3 years ago

jimtodd92 commented 3 years ago

Bug Report:

I can see the message_id in function app along with body. But, when reading thru spark with the eventhub library its populated as blank {}.

`eventhub_conf = { 'eventhubs.connectionString': spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(dbutils.secrets.get(key_vault_name, parameters.get('EVENTHUB_SECRET_NAME'))), 'eventhubs.consumerGroup': 'csl_consumer', "eventhubs.startingPosition": json.dumps(start_position), }

   start_position = {
        "offset": offset,
        "seqNo": sequence_no,
        "enqueuedTime": start_time,
        "isInclusive": True,
    }

    eventhub_conf = {
        'eventhubs.connectionString': spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(dbutils.secrets.get(key_vault_name, parameters.get('EVENTHUB_SECRET_NAME'))),
        'eventhubs.consumerGroup': 'yyyy',
        "eventhubs.startingPosition": json.dumps(start_position),
    }

        end_position = {
            "offset": None,  
            "seqNo": -1,  
            "enqueuedTime": end_time,
            "isInclusive": True
        }
        eventhub_conf["eventhubs.endingPosition"] = json.dumps(end_position)

    payload = spark \
        .read \
        .format("eventhubs") \
        .options(**eventhub_conf) \
        .load()

`

image

nyaghma commented 2 years ago

@jimtodd92 I tried to repro the issue on my side, but my app received the message_id in the systemProperties without any issue. I slightly modified this sample to add message_id to telemetry messages and wrote a simple PySpark app to read messages from the built-in eventhub, using connector version 2.3.18 and spark 3.1. image