apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

Inconsistent XCom message format when using PubSubPullSensor with deferrable=True #41877

Closed arpit-maheshwari1 closed 1 month ago

arpit-maheshwari1 commented 2 months ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

No response

Apache Airflow version

2.9.1

Operating System

Google Cloud Composer 2 (managed Airflow environment on Google Cloud)

Deployment

Google Cloud Composer

Deployment details

No response

What happened

I'm using the PubSubPullSensor in Apache Airflow with deferrable=True on Google Cloud Composer 2. When the sensor is set to deferrable=True, the messages stored in XCom have a different format compared to when deferrable=False.

When deferrable=True:
The messages are stored in XCom as strings that resemble serialized protobuf format:

"[ack_id: 'RFAGFixdRkhRNxkIaFEOT14jPzUgKEUSCQdPAihdeTFTLUFdfWhRDRlyfWB8a1MbBgNGBysOURsHaE5tdR_L0ZL0S0NUa1gSBwVCVnldUhwPbF1ZdQN58b3b8qzgnn8JOjrfj_XZbTuLvKsbZiM9XhJLLD5-LzlFQV5AEkwkDERJUytDCypYEU4EISE-MD5FUw' message { data: 'hello from cloud console!' message_id: '12167757297241504' publish_time { seconds: 1724839443 nanos: 948000000 } } ]"

When deferrable=False:
The messages are stored in XCom as standard Python dictionaries:

[{'ack_id': 'UAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIJB08CKF15MU0sQVhwaFENGXJ9YHxrUxsDV0ECel1RGQdoTm11H4GglfRLQ1RrWBIHB01Vel5TEwxoX11wBnm4vPO6v8vgfwk9OpX-8tltO6ywsP9GZiM9XhJLLD5-LzlFQV5AEkwkDERJUytDCypYEU4EISE-MD5FU0Q', 'message': {'data': 'aGkgZnJvbSBjbG91ZCBjb25zb2xlIQ==', 'message_id': '12165864188103151', 'publish_time': '2024-08-28T11:49:50.962Z', 'attributes': {}, 'ordering_key': ''}, 'delivery_attempt': 0}]

This difference in formats causes issues when processing the messages downstream, as the structure is inconsistent and the protobuf-like strings require additional parsing.

What you think should happen instead

The expected behavior is that the messages should be stored in XCom in a consistent format, regardless of whether deferrable=True or deferrable=False is set, ideally as a standard Python dictionary.

How to reproduce

Steps to Reproduce:

  1. Use PubSubPullSensor with deferrable=True and deferrable=False.
  2. Compare the messages stored in XCom.

You can reproduce this issue using the following DAG:

from airflow.decorators import dag
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubDeleteSubscriptionOperator,
)
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
from airflow.utils.dates import days_ago

# Define default arguments
default_args = {
    'start_date': days_ago(1),
    'retries': 1,
}

# Define the DAG with the @dag decorator
@dag(
    dag_id='pubsub_pull_sensor_example',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
)
def pubsub_dag():
    project_id = "<<PROJECT_ID>>"
    topic_name = "<<TOPIC_NAME>>"
    subscription_name = "testing-airflow-subscription"

    # Task to create a Pub/Sub subscription
    create_subscription = PubSubCreateSubscriptionOperator(
        task_id="create_subscription",
        project_id=project_id,
        topic=topic_name,
        subscription=subscription_name
    )

    # PubSubPullSensor to wait for a message on the Pub/Sub subscription
    wait_for_message = PubSubPullSensor(
        task_id='wait_for_message',
        project_id=project_id,
        subscription=subscription_name,
        max_messages=1,  # We only want to pull one message
        ack_messages=True,  # Acknowledge the messages immediately after receiving
        deferrable=True,
    )

    # Task to delete the Pub/Sub subscription
    delete_subscription = PubSubDeleteSubscriptionOperator(
        task_id="delete_subscription",
        project_id=project_id,
        subscription=subscription_name
    )

    # Define the task dependencies
    create_subscription_task = create_subscription
    pulled_messages = wait_for_message
    delete_subscription_task = delete_subscription

    create_subscription_task >> pulled_messages
    pulled_messages >> delete_subscription_task

# Instantiate the DAG by calling the function
pubsub_dag = pubsub_dag()

Note:

  1. This code requires an existing Pub/Sub topic.
  2. The DAG will create a new subscription to that topic.
  3. After running the DAG, post a message to the specified topic and observe the XCom value of the "wait_for_message" task to see how the message is stored.
  4. Please update the project_id and topic_name in the code with your actual Google Cloud project ID and Pub/Sub topic name.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

gopidesupavan commented 2 months ago

Looking into this one, seems received message to JSON-serializable dicts conversion not there when deferrable = True. Let me check and update.