apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.11k stars 14.3k forks source link

Cannot use Spanner Client inside a PythonOperator because of incompatibility issues #30494

Closed piffarettig closed 1 year ago

piffarettig commented 1 year ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==8.11.0

Apache Airflow version

2.5.2

Operating System

Debian GNU/Linux

Deployment

Docker-Compose

Deployment details

No response

What happened

I'm using a PythonOperator that just executes some SQL and make the results available to a subsequent task via the usage of xcoms. This is because there's no default operator available for Cloud Spanner that gives me that for free.

Why do I need to do this? Because I need to DROP all Spanner tables that follow a given prefix, so it's not that I can anticipate the table names. They're dynamic (I only know about their prefix).

However, my code fails when I do with database.snapshot() as snapshot:. I get:

TypeError: Expected int, bytes, or behavior, got <class 'grpc_gcp_pb2.ApiConfig'>

That's the exact same error mentioned here.

And given my Airflow version is 2.5.2, the Google Provider requires:

Which resolves to:

And those are the exact same dependency versions mentioned in the issue above. These dependencies don't seem to change in more recent Airflow versions.

What you think should happen instead

No response

How to reproduce

Generate the following task in a DAG:

def my_task() -> PythonOperator:
    task_id = "generate-spanner-drop-statements"
    return PythonOperator(
        task_id=task_id,
        python_callable=_delete_old_spanner_tables,
        provide_context=True,
        op_kwargs={
            "table_prefix": "your table prefix",
            "project_id": "your project id",
            "instance_id": "your instance id",
            "database_id": "your database id"
        },
    )

def _delete_old_spanner_tables(table_prefix: str, project_id: str, instance_id: str, database_id: str, **kwargs):
    hook = SpannerHook()
    database = hook.get_database(
        project_id=project_id,
        instance_id=instance_id,
        database_id=database_id
    )

    if not database:
        raise AirflowException(
            f"The Cloud Spanner database '{database_id}' in project '{project_id}' "
            f"and instance '{instance_id}' is missing."
        )

    # Select statement that gives me the drop statements of the tables and indexes I want to delete
    with open("sql/drop_old_spanner_tables.sql", "r") as f:
        contents = f.read()
    statement = contents.replace("{{ table_prefix }}", table_prefix)

    with database.snapshot() as snapshot:
        result = snapshot.execute_sql(statement)
        rows = list()
        for row in result:
            rows.append(row)
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(key='drop_statements', value=rows)

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

eladkal commented 1 year ago

If your question is why we don't use newer versions for google libraries it's because newer versions are conflicting. There is a campaign to update them : https://github.com/apache/airflow/pull/30067

If you need newer versions for a specific task you should use PythonVirtualenvOperator.