apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

Uploading logs to Google Cloud Storage fails after a task completes #37809

Closed lihan closed 8 months ago

lihan commented 8 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.3

What happened?

I use Google Cloud Storage as the logging backend, this error happens when uploading logs to GCS. See trackback below.

[2024-03-01T06:13:18.454+0000] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2024-03-01T06:13:18.642+0000] {connection.py:232} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-03-01T06:13:18.644+0000] {base.py:73} INFO - Using connection ID 'gcscloudlogging' for task execution.
[2024-03-01T06:13:18.645+0000] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1213, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py", line 2330, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1213, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.10/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.10/copy.py", line 271, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1213, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 161, in deepcopy
    rv = reductor(4)
  File "/home/airflow/.local/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 194, in __getstate__
    raise PicklingError(
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
Sentry is attempting to send 1 pending events
Waiting up to 2 seconds
Press Ctrl-C to quit

What you think should happen instead?

No response

How to reproduce

Cannot consistently reproduce it, it happens when some tasks complete, we are using KuberentesExecutor.

Operating System

Linux

Versions of Apache Airflow Providers

pip freeze | grep google-

google-ads==22.0.0
google-api-core==2.12.0
google-api-python-client==2.102.0
google-auth==2.23.2
google-auth-httplib2==0.1.1
google-auth-oauthlib==1.1.0
google-cloud-aiplatform==1.34.0
google-cloud-appengine-logging==1.3.2
google-cloud-audit-log==0.2.5
google-cloud-automl==2.11.2
google-cloud-batch==0.17.1
google-cloud-bigquery==3.12.0
google-cloud-bigquery-datatransfer==3.12.1
google-cloud-bigquery-storage==2.22.0
google-cloud-bigtable==2.21.0
google-cloud-build==3.20.0
google-cloud-compute==1.14.1
google-cloud-container==2.32.0
google-cloud-core==2.3.3
google-cloud-datacatalog==3.16.0
google-cloud-dataflow-client==0.8.4
google-cloud-dataform==0.5.3
google-cloud-dataplex==1.6.3
google-cloud-dataproc==5.6.0
google-cloud-dataproc-metastore==1.13.0
google-cloud-dlp==3.12.3
google-cloud-kms==2.19.1
google-cloud-language==2.11.1
google-cloud-logging==3.8.0
google-cloud-memcache==1.7.3
google-cloud-monitoring==2.15.1
google-cloud-orchestration-airflow==1.9.2
google-cloud-os-login==2.10.0
google-cloud-pubsub==2.18.4
google-cloud-redis==2.13.2
google-cloud-resource-manager==1.10.4
google-cloud-run==0.9.1
google-cloud-secret-manager==2.16.4
google-cloud-spanner==3.40.1
google-cloud-speech==2.21.0
google-cloud-storage==2.11.0
google-cloud-storage-transfer==1.9.2
google-cloud-tasks==2.14.2
google-cloud-texttospeech==2.14.2
google-cloud-translate==3.12.0
google-cloud-videointelligence==2.11.4
google-cloud-vision==3.4.4
google-cloud-workflows==1.12.1
google-crc32c==1.5.0
google-re2==1.1
google-resumable-media==2.6.0
grpc-google-iam-v1==0.12.6
keyrings.google-artifactregistry-auth==1.0.0
pydata-google-auth==1.8.2

Deployment

Official Apache Airflow Helm Chart

Deployment details

Deployed on GKE

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 8 months ago

Some of your operators are storing some fields whaicah are unpicklable, but it's hard to say which ones. Unfortunately they are set in the way that does not reveal which fields they are - and which objects - you need to track which tasks / operators are doing it and find out what "unpicklable" objects they set. Unfortunately - this is Python so if someone has custom DAG code and operators that set a state that cannot be pickled. this might happen. You must look for it i the code of your DAGs, I am afraid.