apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.02k stars 13.98k forks source link

Task processes killed with WARNING - Recorded pid does not match the current pid #17507

Closed iasoon closed 1 year ago

iasoon commented 3 years ago

Apache Airflow version: 2.1.3

Apache Airflow Provider versions (please include all providers that are relevant to your bug):

apache-airflow-providers-amazon==2.0.0
apache-airflow-providers-celery==2.0.0
apache-airflow-providers-cncf-kubernetes==2.0.0
apache-airflow-providers-docker==2.0.0
apache-airflow-providers-elasticsearch==2.0.2
apache-airflow-providers-ftp==2.0.0
apache-airflow-providers-google==4.0.0
apache-airflow-providers-grpc==2.0.0
apache-airflow-providers-hashicorp==2.0.0
apache-airflow-providers-http==2.0.0
apache-airflow-providers-imap==2.0.0
apache-airflow-providers-microsoft-azure==3.0.0
apache-airflow-providers-mysql==2.0.0
apache-airflow-providers-postgres==2.0.0
apache-airflow-providers-redis==2.0.0
apache-airflow-providers-sendgrid==2.0.0
apache-airflow-providers-sftp==2.0.0
apache-airflow-providers-slack==4.0.0
apache-airflow-providers-sqlite==2.0.0
apache-airflow-providers-ssh==2.0.0

Environment:

I'm using the airflow-2.1.2 container from dockerhub.

What happened:

When using the EMRStepSensor (set to reschedule mode) to monitor EMR steps, the task will sometimes fail while the EMR step sucessfully ran. Most of the time the sensor will work fine, but every so often this issue occurs (on the same DAG, without modifications).

EMRStepSensor task instance debug log ``` *** Reading local file: /opt/airflow/logs/derived.adobe_target_catalog_sporza/watch_adobe_target_catalog_sporza_job_emr_step/2021-08-07T05:28:00+00:00/1.log [2021-08-08 05:29:20,367] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner [2021-08-08 05:29:21,594] {base_task_runner.py:62} DEBUG - Planning to run as the user [2021-08-08 05:29:21,597] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:29:22,086] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:29:22,086] {taskinstance.py:911} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2021-08-08 05:29:22,086] {taskinstance.py:911} DEBUG - dependency 'Task Instance State' PASSED: True, Task state queued was valid. [2021-08-08 05:29:22,128] {taskinstance.py:911} DEBUG - dependency 'Ready To Reschedule' PASSED: True, The task instance is not in State_UP_FOR_RESCHEDULE or NONE state. [2021-08-08 05:29:22,128] {taskinstance.py:911} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2021-08-08 05:29:22,129] {taskinstance.py:911} DEBUG - dependency 'Task Instance Not Running' PASSED: True, Task is not in running state. [2021-08-08 05:29:22,130] {taskinstance.py:896} INFO - Dependencies all met for [2021-08-08 05:29:22,130] {taskinstance.py:911} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2021-08-08 05:29:22,165] {taskinstance.py:911} DEBUG - dependency 'Ready To Reschedule' PASSED: True, The task instance is not in State_UP_FOR_RESCHEDULE or NONE state. [2021-08-08 05:29:22,206] {taskinstance.py:911} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2021-08-08 05:29:22,327] {taskinstance.py:911} DEBUG - dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool') [2021-08-08 05:29:22,327] {taskinstance.py:911} DEBUG - dependency 'Task Concurrency' PASSED: True, Task concurrency is not set. [2021-08-08 05:29:22,327] {taskinstance.py:896} INFO - Dependencies all met for [2021-08-08 05:29:22,327] {taskinstance.py:1087} INFO - -------------------------------------------------------------------------------- [2021-08-08 05:29:22,327] {taskinstance.py:1088} INFO - Starting attempt 1 of 1 [2021-08-08 05:29:22,327] {taskinstance.py:1089} INFO - -------------------------------------------------------------------------------- [2021-08-08 05:29:24,417] {taskinstance.py:1107} INFO - Executing on 2021-08-07T05:28:00+00:00 [2021-08-08 05:29:24,421] {standard_task_runner.py:52} INFO - Started process 7426 to run task [2021-08-08 05:29:24,427] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'derived.adobe_target_catalog_sporza', 'watch_adobe_target_catalog_sporza_job_emr_step', '2021-08-07T05:28:00+00:00', '--job-id', '98940', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/adobe_target_catalog_sporza_wf.py', '--cfg-path', '/tmp/tmpucsqr5ep', '--error-file', '/tmp/tmpi1wh32jz'] [2021-08-08 05:29:24,427] {standard_task_runner.py:77} INFO - Job 98940: Subtask watch_adobe_target_catalog_sporza_job_emr_step [2021-08-08 05:29:24,428] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [] [2021-08-08 05:29:26,223] {settings.py:208} DEBUG - Setting up DB connection pool (PID 7426) [2021-08-08 05:29:26,223] {settings.py:244} DEBUG - settings.prepare_engine_args(): Using NullPool [2021-08-08 05:29:26,226] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:29:27,030] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:29:27,523] {logging_mixin.py:104} INFO - Running on host ip-172-31-58-121.eu-west-1.compute.internal [2021-08-08 05:29:27,523] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:29:28,218] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:29:28,871] {taskinstance.py:677} DEBUG - Clearing XCom data [2021-08-08 05:29:29,515] {taskinstance.py:684} DEBUG - XCom data cleared [2021-08-08 05:29:32,916] {taskinstance.py:1302} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=vrt AIRFLOW_CTX_DAG_ID=derived.adobe_target_catalog_sporza AIRFLOW_CTX_TASK_ID=watch_adobe_target_catalog_sporza_job_emr_step AIRFLOW_CTX_EXECUTION_DATE=2021-08-07T05:28:00+00:00 AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-08-07T05:28:00+00:00 [2021-08-08 05:29:32,916] {__init__.py:146} DEBUG - Preparing lineage inlets and outlets [2021-08-08 05:29:32,916] {__init__.py:190} DEBUG - inlets: [], outlets: [] [2021-08-08 05:29:33,486] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:29:36,437] {base_aws.py:368} INFO - Airflow Connection: aws_conn_id=aws_default [2021-08-08 05:29:36,521] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:29:36,562] {base_job.py:227} DEBUG - [heartbeat] [2021-08-08 05:29:38,654] {base_aws.py:179} INFO - No credentials retrieved from Connection [2021-08-08 05:29:38,654] {base_aws.py:87} INFO - Creating session with aws_access_key_id=None region_name=None [2021-08-08 05:29:38,655] {hooks.py:417} DEBUG - Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane [2021-08-08 05:29:38,656] {hooks.py:417} DEBUG - Changing event name from before-call.apigateway to before-call.api-gateway [2021-08-08 05:29:38,657] {hooks.py:417} DEBUG - Changing event name from request-created.machinelearning.Predict to request-created.machine-learning.Predict [2021-08-08 05:29:38,659] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.autoscaling.CreateLaunchConfiguration to before-parameter-build.auto-scaling.CreateLaunchConfiguration [2021-08-08 05:29:38,659] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.route53 to before-parameter-build.route-53 [2021-08-08 05:29:38,659] {hooks.py:417} DEBUG - Changing event name from request-created.cloudsearchdomain.Search to request-created.cloudsearch-domain.Search [2021-08-08 05:29:38,660] {hooks.py:417} DEBUG - Changing event name from docs.*.autoscaling.CreateLaunchConfiguration.complete-section to docs.*.auto-scaling.CreateLaunchConfiguration.complete-section [2021-08-08 05:29:38,663] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.logs.CreateExportTask to before-parameter-build.cloudwatch-logs.CreateExportTask [2021-08-08 05:29:38,663] {hooks.py:417} DEBUG - Changing event name from docs.*.logs.CreateExportTask.complete-section to docs.*.cloudwatch-logs.CreateExportTask.complete-section [2021-08-08 05:29:38,663] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.cloudsearchdomain.Search to before-parameter-build.cloudsearch-domain.Search [2021-08-08 05:29:38,663] {hooks.py:417} DEBUG - Changing event name from docs.*.cloudsearchdomain.Search.complete-section to docs.*.cloudsearch-domain.Search.complete-section [2021-08-08 05:29:38,664] {base_aws.py:157} INFO - role_arn is None [2021-08-08 05:29:38,667] {utils.py:364} DEBUG - IMDS ENDPOINT: http://169.254.169.254/ [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: env [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: assume-role [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: assume-role-with-web-identity [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: sso [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: shared-credentials-file [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: custom-process [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: config-file [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: ec2-credentials-file [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: boto-config [2021-08-08 05:29:38,670] {credentials.py:1974} DEBUG - Looking for credentials via: container-role [2021-08-08 05:29:38,671] {connectionpool.py:230} DEBUG - Starting new HTTP connection (1): 169.254.170.2:80 [2021-08-08 05:29:38,673] {connectionpool.py:442} DEBUG - http://169.254.170.2:80 "GET /v2/credentials/c5f7099a-d46e-4472-a48f-6c137db9e75d HTTP/1.1" 200 1307 [2021-08-08 05:29:38,674] {loaders.py:174} DEBUG - Loading JSON file: /home/airflow/.local/lib/python3.6/site-packages/botocore/data/endpoints.json [2021-08-08 05:29:38,683] {hooks.py:210} DEBUG - Event choose-service-name: calling handler [2021-08-08 05:29:38,691] {loaders.py:174} DEBUG - Loading JSON file: /home/airflow/.local/lib/python3.6/site-packages/botocore/data/emr/2009-03-31/service-2.json [2021-08-08 05:29:38,695] {hooks.py:210} DEBUG - Event creating-client-class.emr: calling handler [2021-08-08 05:29:38,697] {endpoint.py:292} DEBUG - Setting elasticmapreduce timeout as (60, 60) [2021-08-08 05:29:38,698] {loaders.py:174} DEBUG - Loading JSON file: /home/airflow/.local/lib/python3.6/site-packages/botocore/data/_retry.json [2021-08-08 05:29:38,698] {client.py:166} DEBUG - Registering retry handlers for service: emr [2021-08-08 05:29:38,699] {emr_step.py:73} INFO - Poking step s-581MKMYKRED7 on cluster j-XDTNJDHR23RQ [2021-08-08 05:29:38,699] {hooks.py:210} DEBUG - Event before-parameter-build.emr.DescribeStep: calling handler [2021-08-08 05:29:38,699] {hooks.py:210} DEBUG - Event before-call.emr.DescribeStep: calling handler [2021-08-08 05:29:38,699] {endpoint.py:101} DEBUG - Making request for OperationModel(name=DescribeStep) with params: {'url_path': '/', 'query_string': '', 'method': 'POST', 'headers': {'X-Amz-Target': 'ElasticMapReduce.DescribeStep', 'Content-Type': 'application/x-amz-json-1.1', 'User-Agent': 'Boto3/1.17.107 Python/3.6.13 Linux/4.14.238-182.422.amzn2.x86_64 exec-env/AWS_ECS_FARGATE Botocore/1.20.107'}, 'body': b'{"ClusterId": "j-XDTNJDHR23RQ", "StepId": "s-581MKMYKRED7"}', 'url': 'https://eu-west-1.elasticmapreduce.amazonaws.com/', 'context': {'client_region': 'eu-west-1', 'client_config': , 'has_streaming_input': False, 'auth_type': None}} [2021-08-08 05:29:38,700] {hooks.py:210} DEBUG - Event request-created.emr.DescribeStep: calling handler > [2021-08-08 05:29:38,700] {hooks.py:210} DEBUG - Event choose-signer.emr.DescribeStep: calling handler [2021-08-08 05:29:38,700] {auth.py:380} DEBUG - Calculating signature using v4 auth. [2021-08-08 05:29:38,700] {auth.py:381} DEBUG - CanonicalRequest: POST / content-type:application/x-amz-json-1.1 host:eu-west-1.elasticmapreduce.amazonaws.com x-amz-date:20210808T052938Z x-amz-security-token:IQoJb3JpZ2luX2VjEGwaCWV1LXdlc3QtMSJHMEUCIQCZ4NXI3+3YJKZwyu+L4vTyY4swRSd7+zHkDO3Nwc0JiwIgdDuzR2+qJ5vFE03B1RRhzZYGV4Iy/pt7jVXRr0YjXTUqogQIhP//////////ARABGgwyODM3NzQxNDgzNTciDDl+9YdFHetmruZnfSr2A4vFCZwFNe0LEigdz0ayjIhvTZMsjY6CClml27JRBbUllyWz6t3b5SjafRWERgfEe1xIjnFFvp/UFSDIrZVuOefX94uUcgWQKV5zhz/dHdpCzpPltF+syapd2IeY5lgaUZWr+ax+Pfs75JILyfTpa7u43hg1M6P9L07zP/IUWdrXvyvu0wVh07r6/bHUDKJGy52Ok2q6Hr4ltT4MVAY/EZIEDZA8dlNdd7/UDddecp+/7/QIMftzZsgQJVyO+IIB+cfHCIV+1g/ACnsSbBkJuH2fn/B0YeP6e1135yTFknzNKVUxf6OeBIuPrkcu78HnaOXcjv+JSKb9AcFP+2zOo9JYmr040XGVqesZiAc+uDWbL74LDQ7ze5b4sLSNXfCCEvqsYVy9Nt7UlopCXoIS2lBrWgXNhsppDQ976Gr+WgyV1Wbjx2NZlJFCtSNarwF0qDPBmm6yhQxyKMYarOrUpy9aNl/18ujIhJZ2/JMUYlzH0TOBc1YCKIuZLNPwYfxZW6JmosfKXjAFUK7eDn4tU6LSXyo2GNLYzjY71OyCOpllJYtxg/NDAQBlmMCbFPpFCCTjPECU0sGnhfPAdlsyYngT3L0bSyjAZpPRfYrfPd4i3j2niPq9BwF/iOIR95T2CK7zqxXB4TIpporSE0pItmFgynGTh+IwtZm9iAY6pgENShKQaHxJvNj1O5ddYeCwW3H2SqjrHroeOt82xGkH5stZ7g28Uy7iPIPEehlkrB7ry/KjLq9DAo4zVneHhfKJBUBTBxz944AZhd0d9VvSRGgEXnmxzTsHma9hnkwQfGdbXwH4ja582o7hPEI98AAgoo5OCuV/vyMHHgrJBa7z+7eRtGnCeg+EJZy9WlsilHHGze4QGDEDc4NJypNZM4JON503hXON x-amz-target:ElasticMapReduce.DescribeStep content-type;host;x-amz-date;x-amz-security-token;x-amz-target 2a131454435119560c6fa09b6b1eeeb557aba91e61af62fe4883ccc24890c4f3 [2021-08-08 05:29:38,700] {auth.py:383} DEBUG - StringToSign: AWS4-HMAC-SHA256 20210808T052938Z 20210808/eu-west-1/elasticmapreduce/aws4_request 29f0d229d76daf02f334b8c7d193cb06de4619f4adb7b3057683f53b2ea79c80 [2021-08-08 05:29:38,700] {auth.py:385} DEBUG - Signature: 66e13f874221cb3c1b275a052ac56a7e827d431e940bfe9b14414104e0a3b162 [2021-08-08 05:29:38,701] {endpoint.py:187} DEBUG - Sending http request: [2021-08-08 05:29:38,702] {httpsession.py:50} DEBUG - Certificate path: /home/airflow/.local/lib/python3.6/site-packages/certifi/cacert.pem [2021-08-08 05:29:38,702] {connectionpool.py:943} DEBUG - Starting new HTTPS connection (1): eu-west-1.elasticmapreduce.amazonaws.com:443 [2021-08-08 05:29:38,752] {connectionpool.py:442} DEBUG - https://eu-west-1.elasticmapreduce.amazonaws.com:443 "POST / HTTP/1.1" 200 799 [2021-08-08 05:29:38,752] {parsers.py:233} DEBUG - Response headers: {'x-amzn-RequestId': 'ecacd9e3-44b2-4a00-a4f0-48efe0d65847', 'Content-Type': 'application/x-amz-json-1.1', 'Content-Length': '799', 'Date': 'Sun, 08 Aug 2021 05:29:38 GMT'} [2021-08-08 05:29:38,752] {parsers.py:234} DEBUG - Response body: b'{"Step":{"ActionOnFailure":"CONTINUE","Config":{"Args":["spark-submit","--deploy-mode","cluster","--master","yarn","--driver-memory","2G","--executor-memory","8G","--num-executors","5","--executor-cores","3","--packages","io.delta:delta-core_2.11:0.6.1","--py-files","s3://vrt-datalake-artifacts-prod/adobe-target-catalog-sporza/python/adobe_target_catalog_sporza-1.0.0-py3.7_6.egg,s3://vrt-datalake-artifacts-prod/python-datalake-helpers/vrt_datalake_helpers-21-py3-none-any.whl","s3://vrt-datalake-artifacts-prod/adobe-target-catalog-sporza/python/run_6.py","prod","2021-08-08 05:28:00"],"Jar":"command-runner.jar","Properties":{}},"Id":"s-581MKMYKRED7","Name":"adobe-target-catalog-sporza-job","Status":{"State":"PENDING","StateChangeReason":{},"Timeline":{"CreationDateTime":1.628400523922E9}}}}' [2021-08-08 05:29:38,753] {hooks.py:210} DEBUG - Event needs-retry.emr.DescribeStep: calling handler [2021-08-08 05:29:38,753] {retryhandler.py:187} DEBUG - No retry needed. [2021-08-08 05:29:38,753] {emr_base.py:66} INFO - Job flow currently PENDING [2021-08-08 05:29:38,753] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:29:39,718] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:29:39,748] {taskinstance.py:1906} DEBUG - Task Duration set to 17.617928 [2021-08-08 05:29:42,695] {taskinstance.py:1484} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE [2021-08-08 05:29:42,696] {cli_action_loggers.py:84} DEBUG - Calling callbacks: [] [2021-08-08 05:29:44,944] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:29:48,268] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:29:48,674] {base_job.py:227} DEBUG - [heartbeat] [2021-08-08 05:29:48,674] {local_task_job.py:149} INFO - Task exited with return code 0 [2021-08-08 05:29:48,675] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:29:49,740] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:30:49,437] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner [2021-08-08 05:30:51,569] {base_task_runner.py:62} DEBUG - Planning to run as the user [2021-08-08 05:30:51,572] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:30:52,465] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:30:52,465] {taskinstance.py:911} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2021-08-08 05:30:52,465] {taskinstance.py:911} DEBUG - dependency 'Task Instance State' PASSED: True, Task state queued was valid. [2021-08-08 05:30:52,544] {taskinstance.py:911} DEBUG - dependency 'Ready To Reschedule' PASSED: True, The task instance is not in State_UP_FOR_RESCHEDULE or NONE state. [2021-08-08 05:30:52,544] {taskinstance.py:911} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2021-08-08 05:30:52,545] {taskinstance.py:911} DEBUG - dependency 'Task Instance Not Running' PASSED: True, Task is not in running state. [2021-08-08 05:30:52,546] {taskinstance.py:896} INFO - Dependencies all met for [2021-08-08 05:30:52,546] {taskinstance.py:911} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2021-08-08 05:30:52,593] {taskinstance.py:911} DEBUG - dependency 'Ready To Reschedule' PASSED: True, The task instance is not in State_UP_FOR_RESCHEDULE or NONE state. [2021-08-08 05:30:52,593] {taskinstance.py:911} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2021-08-08 05:30:52,756] {taskinstance.py:911} DEBUG - dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool') [2021-08-08 05:30:52,756] {taskinstance.py:911} DEBUG - dependency 'Task Concurrency' PASSED: True, Task concurrency is not set. [2021-08-08 05:30:52,756] {taskinstance.py:896} INFO - Dependencies all met for [2021-08-08 05:30:52,756] {taskinstance.py:1087} INFO - -------------------------------------------------------------------------------- [2021-08-08 05:30:52,756] {taskinstance.py:1088} INFO - Starting attempt 1 of 1 [2021-08-08 05:30:52,756] {taskinstance.py:1089} INFO - -------------------------------------------------------------------------------- [2021-08-08 05:30:53,648] {taskinstance.py:1107} INFO - Executing on 2021-08-07T05:28:00+00:00 [2021-08-08 05:30:53,651] {standard_task_runner.py:52} INFO - Started process 7972 to run task [2021-08-08 05:30:53,655] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'derived.adobe_target_catalog_sporza', 'watch_adobe_target_catalog_sporza_job_emr_step', '2021-08-07T05:28:00+00:00', '--job-id', '98953', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/adobe_target_catalog_sporza_wf.py', '--cfg-path', '/tmp/tmppdcqbh0o', '--error-file', '/tmp/tmpdky_uy36'] [2021-08-08 05:30:53,655] {standard_task_runner.py:77} INFO - Job 98953: Subtask watch_adobe_target_catalog_sporza_job_emr_step [2021-08-08 05:30:53,656] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [] [2021-08-08 05:30:54,515] {settings.py:208} DEBUG - Setting up DB connection pool (PID 7972) [2021-08-08 05:30:54,515] {settings.py:244} DEBUG - settings.prepare_engine_args(): Using NullPool [2021-08-08 05:30:54,517] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:30:57,090] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:30:58,979] {logging_mixin.py:104} INFO - Running on host ip-172-31-58-121.eu-west-1.compute.internal [2021-08-08 05:30:58,979] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:30:59,588] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:31:03,931] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:31:04,181] {taskinstance.py:677} DEBUG - Clearing XCom data [2021-08-08 05:31:04,914] {taskinstance.py:684} DEBUG - XCom data cleared [2021-08-08 05:31:05,896] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:31:05,934] {base_job.py:227} DEBUG - [heartbeat] [2021-08-08 05:31:12,815] {taskinstance.py:1302} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=vrt AIRFLOW_CTX_DAG_ID=derived.adobe_target_catalog_sporza AIRFLOW_CTX_TASK_ID=watch_adobe_target_catalog_sporza_job_emr_step AIRFLOW_CTX_EXECUTION_DATE=2021-08-07T05:28:00+00:00 AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-08-07T05:28:00+00:00 [2021-08-08 05:31:12,815] {__init__.py:146} DEBUG - Preparing lineage inlets and outlets [2021-08-08 05:31:12,816] {__init__.py:190} DEBUG - inlets: [], outlets: [] [2021-08-08 05:31:13,431] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:31:13,718] {base_aws.py:368} INFO - Airflow Connection: aws_conn_id=aws_default [2021-08-08 05:31:14,251] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:31:14,293] {base_job.py:227} DEBUG - [heartbeat] [2021-08-08 05:31:14,780] {base_aws.py:179} INFO - No credentials retrieved from Connection [2021-08-08 05:31:14,780] {base_aws.py:87} INFO - Creating session with aws_access_key_id=None region_name=None [2021-08-08 05:31:14,781] {hooks.py:417} DEBUG - Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane [2021-08-08 05:31:14,783] {hooks.py:417} DEBUG - Changing event name from before-call.apigateway to before-call.api-gateway [2021-08-08 05:31:14,784] {hooks.py:417} DEBUG - Changing event name from request-created.machinelearning.Predict to request-created.machine-learning.Predict [2021-08-08 05:31:14,786] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.autoscaling.CreateLaunchConfiguration to before-parameter-build.auto-scaling.CreateLaunchConfiguration [2021-08-08 05:31:14,786] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.route53 to before-parameter-build.route-53 [2021-08-08 05:31:14,786] {hooks.py:417} DEBUG - Changing event name from request-created.cloudsearchdomain.Search to request-created.cloudsearch-domain.Search [2021-08-08 05:31:14,787] {hooks.py:417} DEBUG - Changing event name from docs.*.autoscaling.CreateLaunchConfiguration.complete-section to docs.*.auto-scaling.CreateLaunchConfiguration.complete-section [2021-08-08 05:31:14,790] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.logs.CreateExportTask to before-parameter-build.cloudwatch-logs.CreateExportTask [2021-08-08 05:31:14,790] {hooks.py:417} DEBUG - Changing event name from docs.*.logs.CreateExportTask.complete-section to docs.*.cloudwatch-logs.CreateExportTask.complete-section [2021-08-08 05:31:14,790] {hooks.py:417} DEBUG - Changing event name from before-parameter-build.cloudsearchdomain.Search to before-parameter-build.cloudsearch-domain.Search [2021-08-08 05:31:14,790] {hooks.py:417} DEBUG - Changing event name from docs.*.cloudsearchdomain.Search.complete-section to docs.*.cloudsearch-domain.Search.complete-section [2021-08-08 05:31:14,791] {base_aws.py:157} INFO - role_arn is None [2021-08-08 05:31:14,794] {utils.py:364} DEBUG - IMDS ENDPOINT: http://169.254.169.254/ [2021-08-08 05:31:14,796] {credentials.py:1974} DEBUG - Looking for credentials via: env [2021-08-08 05:31:14,796] {credentials.py:1974} DEBUG - Looking for credentials via: assume-role [2021-08-08 05:31:14,796] {credentials.py:1974} DEBUG - Looking for credentials via: assume-role-with-web-identity [2021-08-08 05:31:14,797] {credentials.py:1974} DEBUG - Looking for credentials via: sso [2021-08-08 05:31:14,797] {credentials.py:1974} DEBUG - Looking for credentials via: shared-credentials-file [2021-08-08 05:31:14,797] {credentials.py:1974} DEBUG - Looking for credentials via: custom-process [2021-08-08 05:31:14,797] {credentials.py:1974} DEBUG - Looking for credentials via: config-file [2021-08-08 05:31:14,797] {credentials.py:1974} DEBUG - Looking for credentials via: ec2-credentials-file [2021-08-08 05:31:14,797] {credentials.py:1974} DEBUG - Looking for credentials via: boto-config [2021-08-08 05:31:14,797] {credentials.py:1974} DEBUG - Looking for credentials via: container-role [2021-08-08 05:31:14,798] {connectionpool.py:230} DEBUG - Starting new HTTP connection (1): 169.254.170.2:80 [2021-08-08 05:31:14,799] {connectionpool.py:442} DEBUG - http://169.254.170.2:80 "GET /v2/credentials/c5f7099a-d46e-4472-a48f-6c137db9e75d HTTP/1.1" 200 1307 [2021-08-08 05:31:14,800] {loaders.py:174} DEBUG - Loading JSON file: /home/airflow/.local/lib/python3.6/site-packages/botocore/data/endpoints.json [2021-08-08 05:31:14,810] {hooks.py:210} DEBUG - Event choose-service-name: calling handler [2021-08-08 05:31:14,817] {loaders.py:174} DEBUG - Loading JSON file: /home/airflow/.local/lib/python3.6/site-packages/botocore/data/emr/2009-03-31/service-2.json [2021-08-08 05:31:14,822] {hooks.py:210} DEBUG - Event creating-client-class.emr: calling handler [2021-08-08 05:31:14,824] {endpoint.py:292} DEBUG - Setting elasticmapreduce timeout as (60, 60) [2021-08-08 05:31:14,825] {loaders.py:174} DEBUG - Loading JSON file: /home/airflow/.local/lib/python3.6/site-packages/botocore/data/_retry.json [2021-08-08 05:31:14,825] {client.py:166} DEBUG - Registering retry handlers for service: emr [2021-08-08 05:31:14,826] {emr_step.py:73} INFO - Poking step s-581MKMYKRED7 on cluster j-XDTNJDHR23RQ [2021-08-08 05:31:14,826] {hooks.py:210} DEBUG - Event before-parameter-build.emr.DescribeStep: calling handler [2021-08-08 05:31:14,826] {hooks.py:210} DEBUG - Event before-call.emr.DescribeStep: calling handler [2021-08-08 05:31:14,826] {endpoint.py:101} DEBUG - Making request for OperationModel(name=DescribeStep) with params: {'url_path': '/', 'query_string': '', 'method': 'POST', 'headers': {'X-Amz-Target': 'ElasticMapReduce.DescribeStep', 'Content-Type': 'application/x-amz-json-1.1', 'User-Agent': 'Boto3/1.17.107 Python/3.6.13 Linux/4.14.238-182.422.amzn2.x86_64 exec-env/AWS_ECS_FARGATE Botocore/1.20.107'}, 'body': b'{"ClusterId": "j-XDTNJDHR23RQ", "StepId": "s-581MKMYKRED7"}', 'url': 'https://eu-west-1.elasticmapreduce.amazonaws.com/', 'context': {'client_region': 'eu-west-1', 'client_config': , 'has_streaming_input': False, 'auth_type': None}} [2021-08-08 05:31:14,826] {hooks.py:210} DEBUG - Event request-created.emr.DescribeStep: calling handler > [2021-08-08 05:31:14,827] {hooks.py:210} DEBUG - Event choose-signer.emr.DescribeStep: calling handler [2021-08-08 05:31:14,827] {auth.py:380} DEBUG - Calculating signature using v4 auth. [2021-08-08 05:31:14,827] {auth.py:381} DEBUG - CanonicalRequest: POST / content-type:application/x-amz-json-1.1 host:eu-west-1.elasticmapreduce.amazonaws.com x-amz-date:20210808T053114Z x-amz-security-token:IQoJb3JpZ2luX2VjEGwaCWV1LXdlc3QtMSJHMEUCIQCZ4NXI3+3YJKZwyu+L4vTyY4swRSd7+zHkDO3Nwc0JiwIgdDuzR2+qJ5vFE03B1RRhzZYGV4Iy/pt7jVXRr0YjXTUqogQIhP//////////ARABGgwyODM3NzQxNDgzNTciDDl+9YdFHetmruZnfSr2A4vFCZwFNe0LEigdz0ayjIhvTZMsjY6CClml27JRBbUllyWz6t3b5SjafRWERgfEe1xIjnFFvp/UFSDIrZVuOefX94uUcgWQKV5zhz/dHdpCzpPltF+syapd2IeY5lgaUZWr+ax+Pfs75JILyfTpa7u43hg1M6P9L07zP/IUWdrXvyvu0wVh07r6/bHUDKJGy52Ok2q6Hr4ltT4MVAY/EZIEDZA8dlNdd7/UDddecp+/7/QIMftzZsgQJVyO+IIB+cfHCIV+1g/ACnsSbBkJuH2fn/B0YeP6e1135yTFknzNKVUxf6OeBIuPrkcu78HnaOXcjv+JSKb9AcFP+2zOo9JYmr040XGVqesZiAc+uDWbL74LDQ7ze5b4sLSNXfCCEvqsYVy9Nt7UlopCXoIS2lBrWgXNhsppDQ976Gr+WgyV1Wbjx2NZlJFCtSNarwF0qDPBmm6yhQxyKMYarOrUpy9aNl/18ujIhJZ2/JMUYlzH0TOBc1YCKIuZLNPwYfxZW6JmosfKXjAFUK7eDn4tU6LSXyo2GNLYzjY71OyCOpllJYtxg/NDAQBlmMCbFPpFCCTjPECU0sGnhfPAdlsyYngT3L0bSyjAZpPRfYrfPd4i3j2niPq9BwF/iOIR95T2CK7zqxXB4TIpporSE0pItmFgynGTh+IwtZm9iAY6pgENShKQaHxJvNj1O5ddYeCwW3H2SqjrHroeOt82xGkH5stZ7g28Uy7iPIPEehlkrB7ry/KjLq9DAo4zVneHhfKJBUBTBxz944AZhd0d9VvSRGgEXnmxzTsHma9hnkwQfGdbXwH4ja582o7hPEI98AAgoo5OCuV/vyMHHgrJBa7z+7eRtGnCeg+EJZy9WlsilHHGze4QGDEDc4NJypNZM4JON503hXON x-amz-target:ElasticMapReduce.DescribeStep content-type;host;x-amz-date;x-amz-security-token;x-amz-target 2a131454435119560c6fa09b6b1eeeb557aba91e61af62fe4883ccc24890c4f3 [2021-08-08 05:31:14,827] {auth.py:383} DEBUG - StringToSign: AWS4-HMAC-SHA256 20210808T053114Z 20210808/eu-west-1/elasticmapreduce/aws4_request a2f4ed4536a8bb05d35fc345f45eec9b42f7e071c230286854f034ea4878ced2 [2021-08-08 05:31:14,827] {auth.py:385} DEBUG - Signature: 90da79c86917ea04d8aed50abba5b1b3d5152e5327941eda3bf485e6af620e6b [2021-08-08 05:31:14,827] {endpoint.py:187} DEBUG - Sending http request: [2021-08-08 05:31:14,828] {httpsession.py:50} DEBUG - Certificate path: /home/airflow/.local/lib/python3.6/site-packages/certifi/cacert.pem [2021-08-08 05:31:14,828] {connectionpool.py:943} DEBUG - Starting new HTTPS connection (1): eu-west-1.elasticmapreduce.amazonaws.com:443 [2021-08-08 05:31:15,021] {connectionpool.py:442} DEBUG - https://eu-west-1.elasticmapreduce.amazonaws.com:443 "POST / HTTP/1.1" 200 799 [2021-08-08 05:31:15,021] {parsers.py:233} DEBUG - Response headers: {'x-amzn-RequestId': '12e9220a-4dc3-4ca0-9ec9-93a529032f9b', 'Content-Type': 'application/x-amz-json-1.1', 'Content-Length': '799', 'Date': 'Sun, 08 Aug 2021 05:31:14 GMT'} [2021-08-08 05:31:15,021] {parsers.py:234} DEBUG - Response body: b'{"Step":{"ActionOnFailure":"CONTINUE","Config":{"Args":["spark-submit","--deploy-mode","cluster","--master","yarn","--driver-memory","2G","--executor-memory","8G","--num-executors","5","--executor-cores","3","--packages","io.delta:delta-core_2.11:0.6.1","--py-files","s3://vrt-datalake-artifacts-prod/adobe-target-catalog-sporza/python/adobe_target_catalog_sporza-1.0.0-py3.7_6.egg,s3://vrt-datalake-artifacts-prod/python-datalake-helpers/vrt_datalake_helpers-21-py3-none-any.whl","s3://vrt-datalake-artifacts-prod/adobe-target-catalog-sporza/python/run_6.py","prod","2021-08-08 05:28:00"],"Jar":"command-runner.jar","Properties":{}},"Id":"s-581MKMYKRED7","Name":"adobe-target-catalog-sporza-job","Status":{"State":"PENDING","StateChangeReason":{},"Timeline":{"CreationDateTime":1.628400523922E9}}}}' [2021-08-08 05:31:15,022] {hooks.py:210} DEBUG - Event needs-retry.emr.DescribeStep: calling handler [2021-08-08 05:31:15,022] {retryhandler.py:187} DEBUG - No retry needed. [2021-08-08 05:31:15,023] {emr_base.py:66} INFO - Job flow currently PENDING [2021-08-08 05:31:15,023] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:31:17,730] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:31:17,771] {taskinstance.py:1906} DEBUG - Task Duration set to 25.225634 [2021-08-08 05:31:21,580] {taskinstance.py:1484} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE [2021-08-08 05:31:21,581] {cli_action_loggers.py:84} DEBUG - Calling callbacks: [] [2021-08-08 05:31:24,405] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:31:26,452] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:31:26,531] {base_job.py:227} DEBUG - [heartbeat] [2021-08-08 05:31:26,531] {local_task_job.py:149} INFO - Task exited with return code 0 [2021-08-08 05:31:26,531] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:31:27,269] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:32:35,215] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner [2021-08-08 05:32:36,815] {base_task_runner.py:62} DEBUG - Planning to run as the user [2021-08-08 05:32:36,817] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:32:38,537] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:32:38,537] {taskinstance.py:911} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2021-08-08 05:32:38,538] {taskinstance.py:911} DEBUG - dependency 'Task Instance State' PASSED: True, Task state queued was valid. [2021-08-08 05:32:38,578] {taskinstance.py:911} DEBUG - dependency 'Ready To Reschedule' PASSED: True, The task instance is not in State_UP_FOR_RESCHEDULE or NONE state. [2021-08-08 05:32:38,578] {taskinstance.py:911} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2021-08-08 05:32:38,579] {taskinstance.py:911} DEBUG - dependency 'Task Instance Not Running' PASSED: True, Task is not in running state. [2021-08-08 05:32:38,579] {taskinstance.py:896} INFO - Dependencies all met for [2021-08-08 05:32:38,579] {taskinstance.py:911} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2021-08-08 05:32:39,309] {taskinstance.py:911} DEBUG - dependency 'Ready To Reschedule' PASSED: True, The task instance is not in State_UP_FOR_RESCHEDULE or NONE state. [2021-08-08 05:32:39,310] {taskinstance.py:911} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2021-08-08 05:32:39,477] {taskinstance.py:911} DEBUG - dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool') [2021-08-08 05:32:39,477] {taskinstance.py:911} DEBUG - dependency 'Task Concurrency' PASSED: True, Task concurrency is not set. [2021-08-08 05:32:39,477] {taskinstance.py:896} INFO - Dependencies all met for [2021-08-08 05:32:39,477] {taskinstance.py:1087} INFO - -------------------------------------------------------------------------------- [2021-08-08 05:32:39,477] {taskinstance.py:1088} INFO - Starting attempt 1 of 1 [2021-08-08 05:32:39,477] {taskinstance.py:1089} INFO - -------------------------------------------------------------------------------- [2021-08-08 05:32:43,976] {taskinstance.py:1107} INFO - Executing on 2021-08-07T05:28:00+00:00 [2021-08-08 05:32:43,979] {standard_task_runner.py:52} INFO - Started process 8339 to run task [2021-08-08 05:32:43,984] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'derived.adobe_target_catalog_sporza', 'watch_adobe_target_catalog_sporza_job_emr_step', '2021-08-07T05:28:00+00:00', '--job-id', '98962', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/adobe_target_catalog_sporza_wf.py', '--cfg-path', '/tmp/tmpbqux_rdl', '--error-file', '/tmp/tmpyb9gdojg'] [2021-08-08 05:32:43,984] {standard_task_runner.py:77} INFO - Job 98962: Subtask watch_adobe_target_catalog_sporza_job_emr_step [2021-08-08 05:32:43,985] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [] [2021-08-08 05:32:51,719] {settings.py:208} DEBUG - Setting up DB connection pool (PID 8339) [2021-08-08 05:32:51,720] {settings.py:244} DEBUG - settings.prepare_engine_args(): Using NullPool [2021-08-08 05:32:51,722] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:32:53,315] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:32:54,868] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:32:55,812] {logging_mixin.py:104} INFO - Running on host ip-172-31-58-121.eu-west-1.compute.internal [2021-08-08 05:32:55,813] {taskinstance.py:614} DEBUG - Refreshing TaskInstance from DB [2021-08-08 05:32:57,199] {taskinstance.py:649} DEBUG - Refreshed TaskInstance [2021-08-08 05:32:57,203] {local_task_job.py:194} WARNING - Recorded pid 7972 does not match the current pid 8339 [2021-08-08 05:32:57,206] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 8339 [2021-08-08 05:32:57,210] {process_utils.py:66} INFO - Process psutil.Process(pid=8339, status='terminated', exitcode=, started='05:32:43') (8339) terminated with exit code Negsignal.SIGTERM ```

What you expected to happen:

I'd expect the EMRStepSensor to run until the EMR step succeeded, and report a succesful run.

If my understanding is correct, these final lines in the log show the runner terminating the task process. If I'm reading the log correctly, 8339 is the correct PID for the task, and the recorded pid 7972 is the pid for a previous run. Could it be possible that this pid is not correctly being updated?

[2021-08-08 05:32:57,203] {local_task_job.py:194} WARNING - Recorded pid 7972 does not match the current pid 8339
[2021-08-08 05:32:57,206] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 8339
[2021-08-08 05:32:57,210] {process_utils.py:66} INFO - Process psutil.Process(pid=8339, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='05:32:43') (8339) terminated with exit code Negsignal.SIGTERM

Anything else we need to know:

The symptoms look very similar to #17394, but I'm not using run_as_user, and the reported pids are not the same, so I'm not sure whether this is the same issue.

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

ephraimbuddy commented 3 years ago

We added a fix contained in: https://github.com/apache/airflow/pull/16301 that would be released in 2.1.3. Can you try if you can reproduce with the current main branch

crazyproger commented 2 years ago

We're using 2.1.3, see exactly same message sometimes in task runs. Using celery executor.

ephraimbuddy commented 2 years ago

@crazyproger do you have a dag to reproduce this behaviour? please share

crazyproger commented 2 years ago

We met this on different dags. Clearing dagrun helps in all cases. Task log:

*** Reading local file: /opt/airflow/logs/<Dag id>/wait_cache_ready/2021-08-31T17:00:00+00:00/1.log
[2021-08-31 18:14:43,582] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance:<Dag id>.wait_cache_ready 2021-08-31T17:00:00+00:00 [queued]>
[2021-08-31 18:14:43,608] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: <Dag id>.wait_cache_ready 2021-08-31T17:00:00+00:00 [queued]>
[2021-08-31 18:14:43,609] {taskinstance.py:1094} INFO - 
--------------------------------------------------------------------------------
[2021-08-31 18:14:43,609] {taskinstance.py:1095} INFO - Starting attempt 1 of 1
[2021-08-31 18:14:43,609] {taskinstance.py:1096} INFO - 
--------------------------------------------------------------------------------
[2021-08-31 18:14:43,631] {taskinstance.py:1114} INFO - Executing <Task(PythonSensor): wait_cache_ready> on 2021-08-31T17:00:00+00:00
[2021-08-31 18:14:43,638] {standard_task_runner.py:52} INFO - Started process 25320 to run task
[2021-08-31 18:14:43,643] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', '<Dag id>', 'wait_cache_ready', '2021-08-31T17:00:00+00:00', '--job-id', '45735641', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/<integration>/upload_dags/installs_hourly_2.py', '--cfg-path', '/tmp/tmp90yuh86s', '--error-file', '/tmp/tmp3s15tqza']
[2021-08-31 18:14:43,644] {standard_task_runner.py:77} INFO - Job 45735641: Subtask wait_cache_ready
[2021-08-31 18:14:48,697] {local_task_job.py:204} WARNING - Recorded pid 23138 does not match the current pid 25320
[2021-08-31 18:14:48,699] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 25320
[2021-08-31 18:14:48,707] {process_utils.py:66} INFO - Process psutil.Process(pid=25320, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='18:14:42') (25320) terminated with exit code Negsignal.SIGTERM

function that creates dag is here: https://gist.github.com/crazyproger/a2a516f8e6b757b88d29f6fccca16990 It has failed on first task - sensor, implementation is at the bottom - def check_file_exists. I omit other tasks implementations.

noelmcloughlin commented 2 years ago

We're using 2.1.3, see exactly same message sometimes in task runs. Using celery executor.

Yes. I downgraded to 2.1.2 and the issue is gone away for now: See related https://github.com/apache/airflow/issues/10026

ashb commented 2 years ago

@noelmcloughlin In the issue you mention 2.1.2 -- did you mean 2.1.3?

noelmcloughlin commented 2 years ago

Sorry I was on 2.1.2 (okay), took 2.1.3 (issue seen), downgraded to 2.1.2 (okay again).

ashb commented 2 years ago

That's what I thought, just wanted to confirm.

nmehraein commented 2 years ago

Hi, that should be link with that PR ? #17333 . The problem seem very similar.

laserpedro commented 2 years ago

So for info, I incorporated the fix proposed by @nmehraein here unfortunately I am still getting the same error ...

laserpedro commented 2 years ago

Also as mentionned here it seems that the run_as_user is part of the issue: https://www.gitmemory.com/issue/apache/airflow/17394/892065842

As @potiuk mentioned it is related but having passed airflow in the config for the default impersonation did not help

noelmcloughlin commented 2 years ago

it seems that the run_as_user is part of the issue

I was not using run_as_user https://github.com/apache/airflow/issues/10026#issuecomment-906811467 so I think we need to collect more data. There is probably a common attribute but I'm not sure run_as_user is important.

laserpedro commented 2 years ago

Is there any workaround since my dags are no longer working properly ? tks

kiwy42 commented 2 years ago

I tried disable run_as_user and trying to disable some stuff. I'm using spark and other java program and py4j . I'm using TriggerMultiDagRun Operator. I'm currently adapting old dags to airflow 2 and as soon as I clear task to update my code and try new stuff, I have those strange error as well. I'm working on docker airflow 2.1.2 Python 3.9. So far this release seems very unstable.

laserpedro commented 2 years ago

cf https://github.com/apache/airflow/issues/18041#issuecomment-915383493

For me for the moment the pb is on backfill mode. tonight my processes will run on this new airflow session I will see If I get the same errors.

UPDATE: I have modified the param scheduler_heartbeat_sec to 60 sec instead of 5 sec and it is better: so maybe a problem of performance in the backend (postgresql) creates this issue .... @kiwy42 what backend are u using to store the task instances ?

george-zubrienko commented 2 years ago

cf #18041 (comment)

For me for the moment the pb is on backfill mode. tonight my processes will run on this new airflow session I will see If I get the same errors.

UPDATE: I have modified the param scheduler_heartbeat_sec to 60 sec instead of 5 sec and it is better: so maybe a problem of performance in the backend (postgresql) creates this issue .... @kiwy42 what backend are u using to store the task instances ?

In our case with Kubernetes Executor it definitely seems scheduler related. In a DAG with 55 tasks, around a third receives sigterm shortly after starting and then goes into a retry loop with Pid X does not match Pid Y. It was fixed after I reduced pool size from 128 (all tasks queued at the same time) to 32, so 23 tasks were left in scheduled state. After I reverted the pool change, issue came back

laserpedro commented 2 years ago

I will revert also to lastest 2.0 version available: however I do not know what impact it will have on the metadata db (need a downgrade or sth like that) ...

Does someone know if there is an official guide to downgrade ?

So what I did is:

With this set up my dags are working "properly" (remaining the below bug) ! I no longer have mismatch of pid and no longer have random sigterm on task execution relatively large queries.

ridwantahir commented 2 years ago

this proble is still there in airflow 2.13!!

MarvinSchenkel commented 2 years ago

I will revert also to lastest 2.0 version available: however I do not know what impact it will have on the metadata db (need a downgrade or sth like that) ...

Does someone know if there is an official guide to downgrade ?

So what I did is:

  • setup a dedicated postgresql server on Linux for the meta database
  • downgrade to airflow 2.0.2
  • increase the heartbeat signal (so that the tasks gets not killed by the heart beat concurrency (mainly for sensors): and will incorporate https://github.com/apache/airflow/pull/16289/files in order to fix the race condition tomorrow.

With this set up my dags are working "properly" (remaining the below bug) ! I no longer have mismatch of pid and no longer have random sigterm on task execution relatively large queries.

How did you increase the heartbeat signal? I am running Airflow on a Kubernetes cluster and run into this issue on 2.1.3, 2.1.2 and 2.0.2.

laserpedro commented 2 years ago

Hello @MarvinSchenkel , to increase the scheduler heartbeat signal you can either export it in your env variable when installing airflow or after having already installed it modifiy the param your airflow.cfg file.

to make sure that the values are correctly set you can use airflow config get-value scheduler scheduler_heartbeat_sec btw

And then restart the scheduler and the webserver.

MarvinSchenkel commented 2 years ago

Hello @MarvinSchenkel , to increase the scheduler heartbeat signal you can either export it in your env variable when installing airflow or after having already installed it modifiy the param your airflow.cfg file.

  • bash profile: export AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=120
  • airflow.cfg: scheduler_heartbeat_sec=120.

to make sure that the values are correctly set you can use airflow config get-value scheduler scheduler_heartbeat_sec btw

And then restart the scheduler and the webserver.

Thanks for your reply. I tried this and it seems to be sorta working. I am not receiving any more SIGTERM errors. However, I constantly receive the message "The scheduler does not seem to be running". If I look at my k8s cluster overview, I can see that the scheduler pod constantly gets restarted because it seems offline. So it feels more like a happy coincidence, rather than a proper fix :-).

I also tried settings AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME to 604800 and AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION to False, but unfortunately, still receiving SIGTERMs.

laserpedro commented 2 years ago

On my side I also receive this message that seems correct if the heartbeat is X the message should be

The scheduler does not appear to be running. Last heartbeat was received X / 60 minutes ago.

The DAGs list may not update, and new tasks will not be scheduled.

Sorry did not mean that It was a proper fix. I think that the a proper is: https://github.com/apache/airflow/pull/16289/files

MarvinSchenkel commented 2 years ago

On my side I also receive this message that seems correct if the heartbeat is X the message should be

The scheduler does not appear to be running. Last heartbeat was received X / 60 minutes ago.

The DAGs list may not update, and new tasks will not be scheduled.

Also as I said above that is NOT a proper fix " increase the heartbeat signal (so that the tasks gets not killed by the heart beat concurrency (mainly for sensors): and will incorporate https://github.com/apache/airflow/pull/16289/files in order to fix the race condition tomorrow." => that is the proper fix but since it was not available in 2.0.2 I have to modify my source code with the commit changes.

Fair enough, so with the fix applied we wouldn't need to set any additional configs (like scheduler_heartbeat, killed_task_cleanup_time and schedule_task_after_task_execution)? Is there any way to check when this PR is going to be released? The PR mentions 2.1.3, but I am still running into these issues using 2.1.3.

laserpedro commented 2 years ago

This was already released however in version > 2.0.2 and for me those versions create the mismatch PID issue so for the moment I stick to 2.0.2. What you could do is fork the source code of 2.0.2 and change the files. I will go for this solution.

Honestly I would keep the other setting as they are and once your session is fixed this way maybe increasingly reverting the parameters ...

UPDATE: @MarvinSchenkel I have decreased the heartbeat to 60 sec that is < to my sensor time out (3 minutes) and for the moment the task is no longer receiving sigterms. So the incorporation of the fix above seems to be working ! And my airflow session is behaving as expected.

MarvinSchenkel commented 2 years ago

This was already released however in version > 2.0.2 and for me those versions create the mismatch PID issue so for the moment I stick to 2.0.2. What you could do is fork the source code of 2.0.2 and change the files. I will go for this solution.

Honestly I would keep the other setting as they are and once your session is fixed this way maybe increasingly reverting the parameters ...

UPDATE: @MarvinSchenkel I have decreased the heartbeat to 60 sec that is < to my sensor time out (3 minutes) and for the moment the task is no longer receiving sigterms. So the incorporation of the fix above seems to be working ! And my airflow session is behaving as expected.

Thanks for the update! I think I am gonna wait until this is fixed in an upcoming release. Our production environment is still running 1.10.15, which is running fine at the moment. I don't want to jump through all these hoops just to get it working :-).

ridwantahir commented 2 years ago

one tick that worked for me was to delete the task instance(or all task instances of a particular dag run) from airflow ui

nmehraein commented 2 years ago

Hello, the release 2.1.4 has been released yesterday with #17333 . Could you test with that release ?

IMO is really related because all conditions and symptoms are similar (kill at first process heartbeat, and task is not the first run because is a DB clean bug between task run. Then more occur on sensor that naturally retry in mode reschedule, but can also occur on normal task when it failing in first attempt).

I'm also trying in first fix attempt on our prod to increase scheduler heartbeat that attenuate the problem because the process has more time to start and update the pid in database before timeout, but can already occur when starting process is too long (especially under load), and i also see the scheduler warning "The scheduler does not appear to be running".

felipeangelimvieira commented 2 years ago

@nmehraein I've updated to 2.1.4 but the problem still appears. This issue is related to #18041.

jaimefr commented 2 years ago

Hello, we are also facing this issue on 2.1.4

stephenonethree commented 2 years ago

Hi, I am also experiencing this issue in 2.1.2 (the latest version currently available in Composer). I just wanted to note that I posted reproducer code/instructions in https://github.com/apache/airflow/issues/18041 (that GitHub issue might be a duplicate of this one or at least related). My code reproduces the issue 100% of the time in 2.1.2. Just wanted you to know in case it helps.

MhmtErsy commented 2 years ago

Hello, I was facing this issue only within PythonVirtualEnvOperator tasks in 2.1.3. After upgrading to 2.1.4, I started facing in other operators too. Also, some tasks are being randomly killed and marked as failed before execution. I can't see the logs on web UI.

*** Log file does not exist: /opt/airflow/logs/airflow2/mydag/mytask/2021-10-05T14:00:34.768196+00:00/1.log *** Fetching from: http://:8793/log/mydag/mytask/2021-10-05T14:00:34.768196+00:00/1.log *** Failed to fetch log file from worker. Request URL missing either an 'http://' or 'https://' protocol.

kondla commented 2 years ago

Thank you for this thread! I got the same problem running composer-1.17.1-airflow-2.1.2 on GCP:

[2021-10-06 13:05:57,236] {local_task_job.py:194} WARNING - Recorded pid 68674 does not match the current pid 141358
[2021-10-06 13:05:57,239] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 141358
[2021-10-06 13:05:57,254] {process_utils.py:66} INFO - Process psutil.Process(pid=141358, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='13:05:51') (141358) terminated with exit code Negsignal.SIGTERM
faycal-merouane commented 2 years ago

hello , I'm facing the same issue : running airflow version 2.1.4 with python version 3.8.10 [2021-10-06 15:44:35,118] {local_task_job.py:203} WARNING - Recorded pid 3761164 does not match the current pid 3761425 [2021-10-06 15:44:35,143] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 3761425 [2021-10-06 15:44:35,153] {process_utils.py:66} INFO - Process psutil.Process(pid=3761425, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='15:44:29') (3761425) terminated with exit code Negsignal.SIGTERM

dariusjonda commented 2 years ago

We're having the same issue after updating from airflow version 2.0.1 to 2.1.4. Most of our jobs are orchestrated on a single instance running local task jobs.

After running some test we found out that the issue occurs whenever multiple new jobs are being spawned at the same time. Unfortunately increasing scheduler_heartbeat_sec as well as scheduler_health_check_threshold didn't solve the issue. However, we decreased parallelism in the airflow.cfg from 32 to 5 and the issue seems to be gone for now...

aladinoss commented 2 years ago

@ashb & @ephraimbuddy As we faced this issue when using two schedulers running in the same time under Airflow 1.10.X with sensors. Can this be a side effect within Airflow 2.X as it's having multi-schedulers ?

ephraimbuddy commented 2 years ago

@ashb & @ephraimbuddy As we faced this issue when using two schedulers running in the same time under Airflow 1.10.X with sensors. Can this be a side effect within Airflow 2.X as it's having multi-schedulers ?

That actually gave me a clue but I haven't reproduced it. This https://github.com/apache/airflow/issues/17507#issuecomment-939750967 too

stephenonethree commented 2 years ago

@ephraimbuddy I posted code which can reproduce the error in #18041 if you haven't seen it yet.

ephraimbuddy commented 2 years ago

@ephraimbuddy I posted code which can reproduce the error in #18041 if you haven't seen it yet.

I tried your code but couldn't reproduce it locally. I do not have an account in composer

ryan-hartigan commented 2 years ago

Facing this same issue running Airflow 2.1.x-2.2.x using this repo's helm chart on AKS running two schedulers.

ephraimbuddy commented 2 years ago

For those having this issue while using multiple schedulers, I added a fix of an issue I got while running two schedulers. You can try the patch. See https://github.com/apache/airflow/pull/18975

FurcyPin commented 2 years ago

+1, I'm seeing task failures with similar messages on Cloud Composer 2 (composer-2.0.0-preview.3-airflow-2.1.2) I had 2 schedulers enabled and downscaled to just 1 scheduler, but I'm still seeing the same error.

brosenkrantz commented 2 years ago

I'm seeing the same failure on 2.1.3 and 2.1.4. I do not see the issue in 2.1.2 but need to upgrade to 2.1.3 or later to resolve CVE-2021-38540 vulnerability.

imamdigmi commented 2 years ago

Hi @ephraimbuddy , I had the same problem in version 2.1.3 and 2.2.1 with Python 3.8

uranusjr commented 2 years ago

Tagging an issue as high priority is not particularly meaningful since this largely only runs on volunteers, and you can’t just let volunteers prioritise your problem freely. You need to provide motivation (commercially or otherwise), or find someone whose priorities align with yours to fix the problem (e.g. yourself!)

potiuk commented 2 years ago

One other comment @imamdigmi @stephenonethree -> if you see the problem in 2.2.1 - can you provide some logs? That woudl help with narrowing down the issue. I believe some of the earlier manifestation of this problem have been fixed, but maybe there are other cases that we have not seen? Logs would be helpful to speed up fixng much more than tagging the issue.

potiuk commented 2 years ago

Ideally please upgrade to 2.2.2 and get us a fresh log with this error - this might be immensely helpful to diagnose and fix the issue @brosenkrantz @imamdigmi @stephenonethree and anyone else facing the problem.

The best way to do it is to create a gist in GitHub and link to it from this issue.

PApostol commented 2 years ago

For those having this issue while using multiple schedulers, I added a fix of an issue I got while running two schedulers. You can try the patch. See #18975

I think this patch has solved the issue for me (or at least reduced its occurrence substantially) - Airflow 2.1.4, Python 3.7.9, using 2 schedulers

stephenonethree commented 2 years ago

As of right now I'm on 2.1.2. I'll probably upgrade shortly to 2.1.4 which is the highest version currently supported by Composer. Once they get into the 2.2.x versions I will reassess and provide more information including fresh logs.

I did spend about a day and a half pulling together code which reproduces the error 100% of the time in a Cloud Composer setting, which I provided earlier. I had been hoping that would help people who are more familiar with Airflow code than I am look at it. Unfortunately it looks like when @ephraimbuddy tried it in his setup, it no longer reproduced. Perhaps the issue is that this is the type of error that may not show up when you are only testing locally.

Thanks to all for your ongoing work on Airflow - other than this bug I love the software immensely!

stephenonethree commented 2 years ago

My earlier code and analysis is here: https://github.com/apache/airflow/issues/18041#issuecomment-932564234