WordPress / openverse

Openverse is a search engine for openly-licensed media. This monorepo includes all application code.
https://openverse.org
MIT License
254 stars 203 forks source link

Rekognition DAG fails on upsert #4947

Closed AetherUnbound closed 2 months ago

AetherUnbound commented 2 months ago

Airflow log link

Note: Airflow is currently only accessible to maintainers & those given access. If you would like access to Airflow, please reach out to a member of @WordPress/openverse-maintainers.

https://airflow.openverse.org/dags/add_rekognition_labels/grid?dag_run_id=manual__2024-09-06T21%3A46%3A15.268305%2B00%3A00&task_id=parse_and_insert_labels&base_date=2024-09-06T21%3A46%3A15%2B0000&tab=logs

Description

We're seeing this issue in production when running the DAG:

[2024-09-17, 18:41:54 UTC] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 762, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/decorators/base.py", line 266, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 238, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py", line 256, in execute_callable
    return runner.run(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/catalog/dags/data_augmentation/rekognition/add_rekognition_labels.py", line 131, in parse_and_insert_labels
    _insert_tags(tags_buffer, postgres_conn_id)
  File "/opt/airflow/catalog/dags/data_augmentation/rekognition/add_rekognition_labels.py", line 58, in _insert_tags
    postgres.insert_rows(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/sql/hooks/sql.py", line 621, in insert_rows
    sql = self._generate_insert_sql(table, values[0], target_fields, replace, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/postgres/hooks/postgres.py", line 329, in _generate_insert_sql
    raise ValueError("PostgreSQL ON CONFLICT upsert syntax requires column names")
ValueError: PostgreSQL ON CONFLICT upsert syntax requires column names

Reproduction

I'm able to reproduce this by adding this line to image_analysis_labels.jsonl:

{"image_uuid":"b840de61-fb9d-4ec5-9572-8d778875869f","response":{"Labels":[]},"LabelModelVersion":"2.0","ResponseMetadata":{"RequestId":"0e9cd4f1-055e-4d4f-af3b-9a1884268f68","HTTPStatusCode":200,"HTTPHeaders":{"content-type":"application\/x-amz-json-1.1","date":"Thu, 29 Oct 2020 19:46:03 GMT","x-amzn-requestid":"0e9cd4f1-055e-4d4f-af3b-9a1884268f68","content-length":"1109","connection":"keep-alive"},"RetryAttempts":0}}

DAG status

AetherUnbound commented 2 months ago

I believe that this is something with the SQL provider in Airflow, I'll dig into it.