geosolutions-it / evo-odas

Code Repository for the EVO-ODAS
https://waffle.io/geosolutions-it/evo-odas
MIT License
31 stars 15 forks source link

Trigger DAG will get (_mysql_exceptions.IntegrityError) Duplicate entry 'xxxxx' for key 'dag_id' #285

Open YouZhengChuan opened 4 years ago

YouZhengChuan commented 4 years ago

i have a dag: "pcdn_export_agg_peak_daily", it will be trigger dag "pcdn_export_agg_peak" 3 times by difference run_id and dag_run.conf

but when dag: "pcdn_export_agg_peak_daily" is running end, log show it get erros and exit with code 1:

[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx (Background on this error at: http://sqlalche.me/e/gkpj)

The error message indicates that there is a duplicate key: "pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00", because I am preparing to insert the data is including these fields: ("dag_id", "execution_date") VALUES is ("pcdn_export_agg_peak.split_to_agg_9.pcdn_agg", <Pendulum [2019-11-21T09:47:00+00:00]>).

my airflow version is v1.10.5, backend meatadata database is mysql.

in the mysql database airflow, show create table dag_run; result is this:

----------------------------------------------------------------+
| dag_run | CREATE TABLE `dag_run` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `dag_id` varchar(250) DEFAULT NULL,
  `execution_date` timestamp(6) NULL DEFAULT NULL,
  `state` varchar(50) DEFAULT NULL,
  `run_id` varchar(250) DEFAULT NULL,
  `external_trigger` tinyint(1) DEFAULT NULL,
  `conf` blob,
  `end_date` timestamp(6) NULL DEFAULT NULL,
  `start_date` timestamp(6) NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `dag_id` (`dag_id`,`execution_date`),
  UNIQUE KEY `dag_id_2` (`dag_id`,`run_id`),
  KEY `dag_id_state` (`dag_id`,`state`)
) ENGINE=InnoDB AUTO_INCREMENT=27 DEFAULT CHARSET=latin1 |

this table has UNIQUE KEY dag_id (dag_id,execution_date), this value is not repeatable.

However, I queried the value of "2019-11-21 09:47:00" to the data of "execution_date" and did not query it.

However, the value of "execution_date" of the data I inserted is "UTC 2019-11-21T09:47:00+00:00". If I convert according to my local time, the value should be "2019-11-21 17:47:00". I queried the value of "2019-11-21 17:47:00" to the data and get 4 result:

mysql> select `id`,`dag_id`,`execution_date`,`state`,`run_id`,`external_trigger`,`end_date`,`start_date` from `dag_run` where `execution_date`='2019-11-21 17:47:00';
+----+--------------------------------------------------+----------------------------+---------+--------------------------------------------------------------------------+------------------+----------+----------------------------+
| id | dag_id                                           | execution_date             | state   | run_id                                                                   | external_trigger | end_date | start_date                 |
+----+--------------------------------------------------+----------------------------+---------+--------------------------------------------------------------------------+------------------+----------+----------------------------+
|  8 | pcdn_export_agg_peak                             | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:53.924978 |
| 13 | pcdn_export_agg_peak.split_to_agg_9.pcdn_agg     | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:54.530647 |
| 18 | pcdn_export_agg_peak.split_to_agg_9.partial_sort | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:55.154685 |
| 23 | pcdn_export_agg_peak.split_to_agg_9              | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:55.788638 |
+----+--------------------------------------------------+----------------------------+---------+--------------------------------------------------------------------------+------------------+----------+----------------------------+
4 rows in set (0.05 sec)

so, by the log error information, i guess the bug how cause of the failure.

For reference....

Complete error log:

*** Reading local file: /home/work/airflow/logs/pcdn_export_agg_peak_daily/peak_agg.daily_device_app_tx/2019-11-20T01:30:00+00:00/1.log
[2019-11-21 17:47:42,165] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [queued]>
[2019-11-21 17:47:42,251] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [queued]>
[2019-11-21 17:47:42,251] {taskinstance.py:838} INFO - 
--------------------------------------------------------------------------------
[2019-11-21 17:47:42,251] {taskinstance.py:839} INFO - Starting attempt 1 of 1
[2019-11-21 17:47:42,251] {taskinstance.py:840} INFO - 
--------------------------------------------------------------------------------
[2019-11-21 17:47:42,475] {taskinstance.py:859} INFO - Executing <Task(TriggerDagRunOperator): peak_agg.daily_device_app_tx> on 2019-11-20T01:30:00+00:00
[2019-11-21 17:47:42,475] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'pcdn_export_agg_peak_daily', 'peak_agg.daily_device_app_tx', '2019-11-20T01:30:00+00:00', '--job_id', '2', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/pcdn_export_agg_peak_daily.py', '--cfg_path', '/tmp/tmpvw0s95my']
[2019-11-21 17:47:43,148] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:43,148] {settings.py:213} INFO - settings.configure_orm(): Using pool settings. pool_size=10, max_overflow=20, pool_recycle=1800, pid=7815
[2019-11-21 17:47:43,460] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:43,458] {__init__.py:51} INFO - Using executor CeleryExecutor
[2019-11-21 17:47:44,505] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:44,504] {dagbag.py:90} INFO - Filling up the DagBag from /home/work/pcdn/offline_agg/dags/pcdn_export_agg_peak_daily.py
[2019-11-21 17:47:45,367] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx /home/work/pcdn/src/lib/conf_manager.py:21: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
[2019-11-21 17:47:45,367] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   self._main_cf = yaml.load(fstream)
[2019-11-21 17:47:45,389] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx /home/work/pcdn/src/lib/conf_manager.py:24: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
[2019-11-21 17:47:45,389] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   self._base_cf = yaml.load(bstream) or {}
[2019-11-21 17:47:46,885] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:46,884] {cli.py:516} INFO - Running <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [running]> on host jh02-db2.jh02
[2019-11-21 17:47:47,293] {logging_mixin.py:95} INFO - context: {'dag': <DAG: pcdn_export_agg_peak_daily>, 'ds': '2019-11-20', 'next_ds': '2019-11-21', 'next_ds_nodash': '20191121', 'prev_ds': '2019-11-19', 'prev_ds_nodash': '20191119', 'ds_nodash': '20191120', 'ts': '2019-11-20T01:30:00+00:00', 'ts_nodash': '20191120T013000', 'ts_nodash_with_tz': '20191120T013000+0000', 'yesterday_ds': '2019-11-19', 'yesterday_ds_nodash': '20191119', 'tomorrow_ds': '2019-11-21', 'tomorrow_ds_nodash': '20191121', 'END_DATE': '2019-11-20', 'end_date': '2019-11-20', 'dag_run': <DagRun pcdn_export_agg_peak_daily @ 2019-11-20 01:30:00+00:00: scheduled__2019-11-20T01:30:00+00:00, externally triggered: False>, 'run_id': 'scheduled__2019-11-20T01:30:00+00:00', 'execution_date': <Pendulum [2019-11-20T01:30:00+00:00]>, 'prev_execution_date': <Pendulum [2019-11-19T01:30:00+00:00]>, 'prev_execution_date_success': <Proxy at 0x7fda2a4d7548 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fda3f7d02f0>>, 'prev_start_date_success': <Proxy at 0x7fda2a4d7508 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fda2a434d90>>, 'next_execution_date': <Pendulum [2019-11-21T01:30:00+00:00]>, 'latest_date': '2019-11-20', 'macros': <module 'airflow.macros' from '/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'params': {'stat_cfg': {'env': 'dev', 'start_ts': 1574256000, 'end_ts': 1574259620, 'stat_name': 'daily_device_app_tx', 'exporter': {'database': 'aegis', 'retention': 'one_week', 'measurements': 'docker_container', 'fields': 'tx,rx'}, 'order_by': {'tags': ['device']}, 'log_level': 'INFO', 'series_chunks': 100, 'sp_chunks': 1000000, 'sp_schunks': 100000}}, 'tables': None, 'task': <Task(TriggerDagRunOperator): peak_agg.daily_device_app_tx>, 'task_instance': <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [running]>, 'ti': <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [running]>, 'task_instance_key_str': 'pcdn_export_agg_peak_daily__peak_agg.daily_device_app_tx__20191120', 'conf': <module 'airflow.configuration' from '/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/configuration.py'>, 'test_mode': False, 'var': {'value': None, 'json': None}, 'inlets': [], 'outlets': []}
[2019-11-21 17:47:47,293] {logging_mixin.py:95} INFO - dag config: {'env': 'dev', 'start_ts': 1574256000, 'end_ts': 1574259620, 'stat_name': 'daily_device_app_tx', 'exporter': {'database': 'aegis', 'retention': 'one_week', 'measurements': 'docker_container', 'fields': 'tx,rx'}, 'order_by': {'tags': ['device']}, 'log_level': 'INFO', 'series_chunks': 100, 'sp_chunks': 1000000, 'sp_schunks': 100000}
[2019-11-21 17:47:47,293] {logging_mixin.py:95} INFO - run_dag_id: tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000
[2019-11-21 17:47:47,498] {logging_mixin.py:95} INFO - [2019-11-21 17:47:47,498] {dagbag.py:90} INFO - Filling up the DagBag from /home/work/pcdn/offline_agg/dags/pcdn_export_agg_peak_once.py
[2019-11-21 17:47:56,478] {taskinstance.py:1051} ERROR - (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
(Background on this error at: http://sqlalche.me/e/gkpj)
Traceback (most recent call last):
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
    cursor, statement, parameters, context
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.IntegrityError: (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 95, in execute
    replace_microseconds=False)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 126, in trigger_dag
    replace_microseconds=replace_microseconds,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 90, in _trigger_dag
    external_trigger=True,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1286, in create_dagrun
    session.commit()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
    self.transaction.commit()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 494, in commit
    self._prepare_impl()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
    self.session.flush()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2459, in flush
    self._flush(objects)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2557, in _flush
    flush_context.execute()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
    statement, params
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
    distilled_params,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
    cursor, statement, parameters, context
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
(Background on this error at: http://sqlalche.me/e/gkpj)
[2019-11-21 17:47:56,485] {taskinstance.py:1082} INFO - Marking task as FAILED.
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx Traceback (most recent call last):
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor, statement, parameters, context
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor.execute(statement, parameters)
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.errorhandler(self, exc, value)
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise errorvalue
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     res = self._query(query)
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     db.query(q)
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     _mysql.connection.query(self, query)
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx _mysql_exceptions.IntegrityError: (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx 
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx The above exception was the direct cause of the following exception:
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx 
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx Traceback (most recent call last):
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/bin/airflow", line 32, in <module>
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     args.func(args)
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return f(*args, **kwargs)
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 522, in run
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     _run(args, dag, ti)
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 440, in _run
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     pool=args.pool,
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return func(*args, **kwargs)
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     result = task_copy.execute(context=context)
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 95, in execute
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     replace_microseconds=False)
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 126, in trigger_dag
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     replace_microseconds=replace_microseconds,
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 90, in _trigger_dag
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     external_trigger=True,
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return func(*args, **kwargs)
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1286, in create_dagrun
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     session.commit()
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.transaction.commit()
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 494, in commit
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self._prepare_impl()
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.session.flush()
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2459, in flush
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self._flush(objects)
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     transaction.rollback(_capture_exception=True)
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     compat.reraise(exc_type, exc_value, exc_tb)
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise value
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2557, in _flush
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     flush_context.execute()
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     rec.execute(self)
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     uow,
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     insert,
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     statement, params
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 988, in execute
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return meth(self, multiparams, params)
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return connection._execute_clauseelement(self, multiparams, params)
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     distilled_params,
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     e, statement, parameters, cursor, context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     util.raise_from_cause(sqlalchemy_exception, exc_info)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     reraise(type(exception), exception, tb=exc_tb, cause=cause)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise value.with_traceback(tb)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor, statement, parameters, context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor.execute(statement, parameters)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.errorhandler(self, exc, value)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise errorvalue
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     res = self._query(query)
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     db.query(q)
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     _mysql.connection.query(self, query)
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx (Background on this error at: http://sqlalche.me/e/gkpj)
[2019-11-21 17:47:57,393] {logging_mixin.py:95} INFO - [2019-11-21 17:47:57,392] {local_task_job.py:105} INFO - Task exited with return code 1
MatteoMart1994 commented 2 years ago

Hi @YouZhengChuan, any news on this issue? Have the same problem in composer 1.10.15-airflow1.10.12