Closed dmateusp closed 2 years ago
Thanks for opening your first issue here! Be sure to follow the issue template!
@dimberman ? Does it ring a bell ?
This issue also results in BranchPythonOperator not working with KubernetesExecutor.
Apache Airflow version: 1.10.12
What happened: BranchPythonOperator task succeeds. But it does not schedule the task in the followed branch. Scheduler throws UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
Stack Trace: Process DagFileProcessor4425-Process: Traceback (most recent call last): File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, self._kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor pickle_dags) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, *kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1637, in process_file self._process_dags(dagbag, dags, ti_keys_to_schedule) File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1316, in _process_dags self._process_task_instances(dag, tis_out) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(args, kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 793, in _process_task_instances run.update_state(session=session) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper return func(*args, kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 296, in update_state session=session) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper return func(*args, *kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 659, in are_dependencies_met session=session): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 683, in get_failed_dep_statuses dep_context): File "/usr/local/lib/python3.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 106, in get_dep_statuses for dep_status in self._get_dep_statuses(ti, session, dep_context): File "/usr/local/lib/python3.7/site-packages/airflow/ti_deps/deps/not_previously_skipped_dep.py", line 58, in _get_dep_statuses task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1564, in xcom_pull return pull_fn(task_id=task_ids) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(args, kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/models/xcom.py", line 165, in get_one return json.loads(result.value.decode('UTF-8')) UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
Does it happen on 2.0.0 too ?
@dmateusp have you tried this on 2.0?
sorry folks, we didn't attempt to migrate to 2.0 yet!
We're getting the same error in Airflow 2.0.1 and it happens with xcom produced by the PythonOperator
too.
Tasks that try to xcom_pull
upstream results that were created before the 1.0.14 → 2.0.1 migration break (see below).
Pulling upstream XCom results that were created after the 2.0.1 upgrade works just fine.
[2021-04-17 14:28:35,344] {taskinstance.py:1455} ERROR - 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/conda/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/conda/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
result = task_copy.execute(context=context)
File "/opt/conda/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/opt/conda/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/flow.py", line 91, in _run
upstream_result = task_instance.xcom_pull(task_ids=[taskid])[0]
File "/opt/conda/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1960, in xcom_pull
for result in query.with_entities(XCom.task_id, XCom.value)
File "/opt/conda/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1960, in <dictcomp>
for result in query.with_entities(XCom.task_id, XCom.value)
File "/opt/conda/lib/python3.7/site-packages/airflow/models/xcom.py", line 255, in deserialize_value
return json.loads(result.value.decode('UTF-8'))
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
Interestingly the XCom data shows up just fine on the webserver:
@Dr-Denzy @ephraimbuddy would either of you be interested in this ticket?
Let's collaborate on this @ephraimbuddy
Hi @Dr-Denzy @ephraimbuddy any update on this?
I was not able to reproduce this on master. Here's my dag:
Here's logs for first task:
And logs for the second task:
@ephraimbuddy both your tasks ran with Airflow 2.1.0.? In my case the first task ran with Airflow 1.10.14 and the second with 2.0.1. They werde PythonOperators and the return value was a simple dictionary. But I can't tell if that is the same problem as with the KubernetesPodOperator.
@michaelosthege, I tested with KubernetesPodOperator. I have not tested your case. Can you add include_prior_dates=True
to the xcom_pull?
@ephraimbuddy the tasks were part of the same DAG. (I cleared the downstream one after the 2.0.1 upgrade, but not the upstream task that had run on 1.10.14.)
Therefore it doesn't sound like include_prior_dates
should make any difference, right?
I wonder if this might be a wider problem than just the KubernetesPod/Python operators. I have seen this problem on 1.10.12 with a custom operator (derived from BaseOperator) - but it happens only very intermittently.
Our production cluster (AWS ECS, using docker + CeleryExecutor) has been running for ~11 months without issue, and I just saw this bug for the first time. A single xcom JSON couldn't be loaded, and that stopped all DAGs from making any progress. Deleting the relevant record from the xcom table restored everything to working order.
During development (running locally with docker + SequentialExecutor) the bug happened a few times. It seemed to be more likely to happen when the system was under heavy load, but I've not really got enough data to be sure.
If someone experience this issue please add reproduce example. Since no comments on this issue for months + ephraim was unable to reproduce I think it's safe to close this issue.
Since my last comment, we've experienced this bug twice on our production cluster in 6 months of otherwise trouble-free running. So it is highly intermittent, and we have not found any way to reliably reproduce it. It is also a very problematic bug, because it completely disables the DAGs - but I appreciate that 1.10 is a pretty old version now.
Apache Airflow version:
1.10.13
Kubernetes version (if you are using kubernetes) (use
kubectl version
):v1.15.11-eks
Environment:
uname -a
):Linux ddac867b589a 4.19.76-linuxkit #1 SMP Tue May 26 11:42:35 UTC 2020 x86_64 GNU/Linux
Python 3.7.9
What happened:
xcom.pull(...)
throws UnicodeDecodeError when the task that produced the xcom is KubernetesPodOperator:(Full stack trace lower)
Note that this is happening across all of our DAGs when I try to migrate from 1.10.10 to 1.10.13
How to reproduce it:
Here are 2 tasks that should reproduce it (that is part of a DAG I use to test K8s features because making an update internally):
stack trace
``` Process DagFileProcessor324119-Process: Traceback (most recent call last): File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, **self._kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor pickle_dags) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1620, in process_file self._process_dags(dagbag, dags, ti_keys_to_schedule) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1299, in _process_dags self._process_task_instances(dag, tis_out) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 793, in _process_task_instances ready_tis = run.update_state(session=session) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 281, in update_state ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 340, in _get_ready_tis session=session): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 659, in are_dependencies_met session=session): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 683, in get_failed_dep_statuses dep_context): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 106, in get_dep_statuses for dep_status in self._get_dep_statuses(ti, session, dep_context): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/not_previously_skipped_dep.py", line 58, in _get_dep_statuses task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1564, in xcom_pull return pull_fn(task_id=task_ids) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 161, in get_one return json.loads(result.value.decode('UTF-8')) UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte ```Note that bash_echo_2 simply does not get scheduled, the stack trace here comes from the scheduler Pod (we run Airflow on KubernetesExecutor)