PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.87k stars 1.56k forks source link

Prefect API incident disrupted state handler notifications #5261

Closed speedyturkey closed 1 year ago

speedyturkey commented 2 years ago

Description

Yesterday, I noticed that a couple of my flows failed due to the Prefect API being unavailable. Additionally, some tasks failed for the same reason, resulting in a couple of flows remaining in a running state for more than 90 minutes (despite having died much earlier). Some of the flows that ran for 90+ minutes ultimately succeeded, although one failed. I posted about this on Slack, and @madkinsz suggested I post this issue.

I was happy to see that the incident was posted on https://prefect.status.io and resolved quickly, and I have now subscribed my team's alerting channel to the RSS feed. This is great, thank you very much! My issue, however, is that the flows that failed during the outage did not trigger my failure state handlers, which send a Slack notification. I would love to know if there is anything I can do differently to increase resilience to these kinds of disruptions - if a flow is not firing successfully, I really want to know ASAP. If there isn't anything I can do, hopefully this at least helps somehow to share. Please let me know if I can fill in any further details.

Here is info on 2 failed flow runs where I did not receive a Slack failure notification, plus one ultimately successful flow run which ran really late for some possibly related reason.

Failed Flow 1

flow id: f4949835-c4a8-492d-9ef1-0b279b999a6c flow run id: 15fc28d3-75e1-43e0-afbf-e27099629f83

Scheduled to start at 8:15am EST. Failed at 8:22am EST. Complete logs:

[{
    "date": "20 December 2021",
    "time": "08:22:52 ",
    "level": "ERROR",
    "name": "agent",
    "message": "HTTPSConnectionPool(host='api.prefect.io', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError(\"HTTPSConnectionPool(host='api.prefect.io', port=443): Read timed out. (read timeout=15)\"))"
}, {
    "date": "20 December 2021",
    "time": "08:23:38 ",
    "level": "ERROR",
    "name": "agent",
    "message": "HTTPSConnectionPool(host='api.prefect.io', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError(\"HTTPSConnectionPool(host='api.prefect.io', port=443): Read timed out. (read timeout=15)\"))"
}]

Failed Flow 2

flow id: e7d3b5af-28d8-4c71-ad94-75a676cf12b4 flow run id: cb635fbb-2160-4662-8b97-dc2051570f23

Scheduled to start at 8:10am EST. Started at 8:21am and failed at 8:28am.

Expand to see complete logs [{ "date": "20 December 2021", "time": "08:21:13 ", "level": "INFO", "name": "prefect.S3", "message": "Downloading flow from s3://my-s3-bucket/allaccounts/2021-12-16t14-25-51-898616-00-00" }, { "date": "20 December 2021", "time": "08:21:13 ", "level": "INFO", "name": "prefect.S3", "message": "Downloading flow from s3://my-s3-bucket/allaccounts/2021-12-16t14-25-51-898616-00-00" }, { "date": "20 December 2021", "time": "08:21:13 ", "level": "INFO", "name": "prefect.S3", "message": "Downloading flow from s3://my-s3-bucket/allaccounts/2021-12-16t14-25-51-898616-00-00" }, { "date": "20 December 2021", "time": "08:21:13 ", "level": "INFO", "name": "prefect.S3", "message": "Downloading flow from s3://my-s3-bucket/allaccounts/2021-12-16t14-25-51-898616-00-00" }, { "date": "20 December 2021", "time": "08:21:13 ", "level": "INFO", "name": "prefect.S3", "message": "Downloading flow from s3://my-s3-bucket/allaccounts/2021-12-16t14-25-51-898616-00-00" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.S3", "message": "Flow successfully downloaded. ETag: \"39784b2e1c991abc70fbc0a779ce6a76\", LastModified: 2021-12-16T14:25:52+00:00, VersionId: None" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.S3", "message": "Flow successfully downloaded. ETag: \"39784b2e1c991abc70fbc0a779ce6a76\", LastModified: 2021-12-16T14:25:52+00:00, VersionId: None" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.S3", "message": "Flow successfully downloaded. ETag: \"39784b2e1c991abc70fbc0a779ce6a76\", LastModified: 2021-12-16T14:25:52+00:00, VersionId: None" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.S3", "message": "Flow successfully downloaded. ETag: \"39784b2e1c991abc70fbc0a779ce6a76\", LastModified: 2021-12-16T14:25:52+00:00, VersionId: None" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.S3", "message": "Flow successfully downloaded. ETag: \"39784b2e1c991abc70fbc0a779ce6a76\", LastModified: 2021-12-16T14:25:52+00:00, VersionId: None" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'AllAccounts'" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'AllAccounts'" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'AllAccounts'" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'AllAccounts'" }, { "date": "20 December 2021", "time": "08:21:14 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'AllAccounts'" }, { "date": "20 December 2021", "time": "08:21:48 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-ab0ff53a" }, { "date": "20 December 2021", "time": "08:22:03 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-ab0ff53a" }, { "date": "20 December 2021", "time": "08:22:19 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-ab0ff53a" }, { "date": "20 December 2021", "time": "08:22:26 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-ab0ff53a" }, { "date": "20 December 2021", "time": "08:23:16 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-ab0ff53a" }, { "date": "20 December 2021", "time": "08:23:41 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'query_name': Starting task run..." }, { "date": "20 December 2021", "time": "08:23:41 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'query_name': Starting task run..." }, { "date": "20 December 2021", "time": "08:24:13 ", "level": "ERROR", "name": "agent", "message": "HTTPSConnectionPool(host='api.prefect.io', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError(\"HTTPSConnectionPool(host='api.prefect.io', port=443): Read timed out. (read timeout=15)\"))" }, { "date": "20 December 2021", "time": "08:24:18 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-ab0ff53a" }, { "date": "20 December 2021", "time": "08:24:36 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-ab0ff53a" }, { "date": "20 December 2021", "time": "08:25:09 ", "level": "ERROR", "name": "agent", "message": "HTTPSConnectionPool(host='api.prefect.io', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError(\"HTTPSConnectionPool(host='api.prefect.io', port=443): Read timed out. (read timeout=15)\"))" }, { "date": "20 December 2021", "time": "08:25:20 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:20 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:20 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:20 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:20 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:41 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:41 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:47 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'query_name': Finished task run for task with final state: 'Success'" }, { "date": "20 December 2021", "time": "08:25:47 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'query_name': Finished task run for task with final state: 'Success'" }, { "date": "20 December 2021", "time": "08:25:56 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:25:56 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:26:22 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:26:22 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:26:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:26:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:26:49 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'execute_query': Starting task run..." }, { "date": "20 December 2021", "time": "08:26:49 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'execute_query': Starting task run..." }, { "date": "20 December 2021", "time": "08:26:59 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:26:59 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:21 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:21 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:27:43 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:28:06 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:28:06 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:28:28 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:28:28 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "ERROR", "name": "prefect.CloudTaskRunner", "message": "Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID f6d2eead-24e5-4f86-8535-c1dbeab12cb2: provided a running state but associated flow run cb635fbb-2160-4662-8b97-dc2051570f23 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/task_runner.py\", line 91, in call_runner_target_handlers\n state = self.client.set_task_run_state(\n File \"/usr/local/lib/python3.9/site-packages/prefect/client/client.py\", line 1917, in set_task_run_state\n result = self.graphql(\n File \"/usr/local/lib/python3.9/site-packages/prefect/client/client.py\", line 569, in graphql\n raise ClientError(result[\"errors\"])\nprefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID f6d2eead-24e5-4f86-8535-c1dbeab12cb2: provided a running state but associated flow run cb635fbb-2160-4662-8b97-dc2051570f23 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]" }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "ERROR", "name": "prefect.CloudTaskRunner", "message": "Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID f6d2eead-24e5-4f86-8535-c1dbeab12cb2: provided a running state but associated flow run cb635fbb-2160-4662-8b97-dc2051570f23 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/task_runner.py\", line 91, in call_runner_target_handlers\n state = self.client.set_task_run_state(\n File \"/usr/local/lib/python3.9/site-packages/prefect/client/client.py\", line 1917, in set_task_run_state\n result = self.graphql(\n File \"/usr/local/lib/python3.9/site-packages/prefect/client/client.py\", line 569, in graphql\n raise ClientError(result[\"errors\"])\nprefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID f6d2eead-24e5-4f86-8535-c1dbeab12cb2: provided a running state but associated flow run cb635fbb-2160-4662-8b97-dc2051570f23 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]" }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'execute_query': Finished task run for task with final state: 'ClientFailed'" }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'execute_query': Finished task run for task with final state: 'ClientFailed'" }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "WARNING", "name": "prefect.CloudFlowRunner", "message": "Flow run is no longer in a running state; the current state is: " }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Flow run RUNNING: terminal tasks are incomplete." }, { "date": "20 December 2021", "time": "08:28:35 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Flow run RUNNING: terminal tasks are incomplete." }]

Successful but very late flow

flow id: flow run id: 6b67f9e1-8588-4171-86b0-a390750ac98d

Scheduled to start at 8:58am EST. Started at 9:04am, completed at 10:40am. Complete logs:

Expand to see complete logs [{ "date": "20 December 2021", "time": "09:04:36 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-e28423a1" }, { "date": "20 December 2021", "time": "09:04:39 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'Nautilus File Drop'" }, { "date": "20 December 2021", "time": "09:04:39 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'Nautilus File Drop'" }, { "date": "20 December 2021", "time": "09:05:04 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'run_query_and_write_results_to_s3': Starting task run..." }, { "date": "20 December 2021", "time": "09:05:29 ", "level": "ERROR", "name": "prefect.CloudTaskRunner", "message": "Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID bbae880a-17d0-4183-a74e-695a9581875e: provided a running state but associated flow run 6b67f9e1-8588-4171-86b0-a390750ac98d is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py\", line 91, in call_runner_target_handlers\n state = self.client.set_task_run_state(\n File \"/usr/local/lib/python3.8/site-packages/prefect/client/client.py\", line 1917, in set_task_run_state\n result = self.graphql(\n File \"/usr/local/lib/python3.8/site-packages/prefect/client/client.py\", line 569, in graphql\n raise ClientError(result[\"errors\"])\nprefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID bbae880a-17d0-4183-a74e-695a9581875e: provided a running state but associated flow run 6b67f9e1-8588-4171-86b0-a390750ac98d is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]" }, { "date": "20 December 2021", "time": "09:05:29 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'run_query_and_write_results_to_s3': Finished task run for task with final state: 'ClientFailed'" }, { "date": "20 December 2021", "time": "09:05:29 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Flow run RUNNING: terminal tasks are incomplete." }, { "date": "20 December 2021", "time": "10:40:00 ", "level": "INFO", "name": "prefect-server.Lazarus.FlowRun", "message": "Rescheduled by a Lazarus process. This is attempt 1." }, { "date": "20 December 2021", "time": "10:40:08 ", "level": "INFO", "name": "agent", "message": "Submitted for execution: Job prefect-job-43c6248e" }, { "date": "20 December 2021", "time": "10:40:11 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Beginning Flow run for 'Nautilus File Drop'" }, { "date": "20 December 2021", "time": "10:40:11 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'run_query_and_write_results_to_s3': Starting task run..." }, { "date": "20 December 2021", "time": "10:40:11 ", "level": "INFO", "name": "prefect.run_query_and_write_results_to_s3", "message": "About to execute /home/flycatcher/src/queries/extract_nautilus.sql" }, { "date": "20 December 2021", "time": "10:40:13 ", "level": "INFO", "name": "prefect.run_query_and_write_results_to_s3", "message": "Finished executing /home/flycatcher/src/queries/extract_nautilus.sql" }, { "date": "20 December 2021", "time": "10:40:13 ", "level": "INFO", "name": "prefect.CloudTaskRunner", "message": "Task 'run_query_and_write_results_to_s3': Finished task run for task with final state: 'Success'" }, { "date": "20 December 2021", "time": "10:40:14 ", "level": "INFO", "name": "prefect.CloudFlowRunner", "message": "Flow run SUCCESS: all reference tasks succeeded" }]

Expected Behavior

A failed flow should always trigger the failure state handler, resulting in a Slack notification.

Reproduction

def _slack_failure_notification(obj, new_state):
    msg = _construct_failure_message(obj)
    SlackTask().run(message="oops", webhook_url="slack.com/foobar")

slack_failure_notification = callback_factory(
    fn=_slack_failure_notification,
    check=lambda state: state.is_failed(),
)

with Flow(
    "My Flow",
    state_handlers=[slack_failure_notification],
) as flow:
    my_task()

Environment

I am running a KubernetesAgent. My flows use DockerStorage and Kubernetes RunConfigs with a custom job template.

Diagnostics from the Agent:

{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__CLOUD__API",
    "PREFECT__CLOUD__AGENT__LABELS",
    "PREFECT__CLOUD__API_KEY",
    "PREFECT__CLOUD__AGENT__AGENT_ADDRESS",
    "PREFECT__BACKEND"
  ],
  "system_information": {
    "platform": "Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-debian-10.10",
    "prefect_backend": "cloud",
    "prefect_version": "0.15.3",
    "python_version": "3.7.11"
  }
}

Thanks in advance for any help or insight you can offer!

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] commented 1 year ago

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.