DalgoT4D / DDP_backend

Django app for the DDP platform
GNU Affero General Public License v3.0
12 stars 40 forks source link

flow_run without deployment hitting webhook #625

Closed fatchat closed 5 months ago

fatchat commented 5 months ago
{'id': 'a0d89150-d46a-4572-99c4-3da3f7bf46e0',
 'created': '2024-05-26T02:00:13.811541+00:00',
 'updated': '2024-05-26T02:59:02.429269+00:00',
 'name': 'colossal-bloodhound',
 'flow_id': '4fa25858-fcba-406c-974a-ca19e559bc4b',
 'state_id': '37d531af-d40b-4dee-8c10-297c06cbedb2',
 'deployment_id': None,
 'work_queue_id': None,
 'work_queue_name': None,
 'flow_version': '8a8fd9c5f7d96408286533e822ba7f84',
 'parameters': {'payload': {'seq': 0,
   'slug': 'airbyte-sync',
   'type': 'Airbyte Connection',
   'timeout': 15,
   'flow_name': None,
   'orgtask_uuid': 'None',
   'connection_id': '5e098cf9-e045-4726-8a3c-340fc6e1bb5f',
   'flow_run_name': None,
   'airbyte_server_block': 'antarang-airbyte-server'}},
 'idempotency_key': None,
 'context': {},
 'empirical_policy': {'max_retries': 0,
  'retry_delay_seconds': 0.0,
  'retries': 0,
  'retry_delay': 0,
  'pause_keys': [],
  'resuming': False},
 'tags': [],
 'parent_task_run_id': 'b096e3e4-bd8c-41e7-9dc0-da8b6d09aed1',
 'state_type': 'COMPLETED',
 'state_name': 'Completed',
 'run_count': 1,
 'expected_start_time': '2024-05-26T02:00:13.810813+00:00',
 'next_scheduled_start_time': None,
 'start_time': '2024-05-26T02:00:13.982772+00:00',
 'end_time': '2024-05-26T02:59:02.422905+00:00',
 'total_run_time': 3528.440133,
 'estimated_run_time': 3528.440133,
 'estimated_start_time_delta': 0.171959,
 'auto_scheduled': False,
 'infrastructure_document_id': None,
 'infrastructure_pid': None,
 'created_by': None,
 'work_pool_id': None,
 'work_pool_name': None,
 'state': {'id': '37d531af-d40b-4dee-8c10-297c06cbedb2',
  'type': 'COMPLETED',
  'name': 'Completed',
  'timestamp': '2024-05-26T02:59:02.422905+00:00',
  'message': None,
  'data': {'type': 'unpersisted'},
  'state_details': {'flow_run_id': 'a0d89150-d46a-4572-99c4-3da3f7bf46e0',
   'task_run_id': 'b096e3e4-bd8c-41e7-9dc0-da8b6d09aed1',
   'child_flow_run_id': None,
   'scheduled_time': None,
   'cache_key': None,
   'cache_expiration': None,
   'untrackable_result': False,
   'pause_timeout': None,
   'pause_reschedule': False,
   'pause_key': None,
   'refresh_cache': None}},
 'status': 'COMPLETED'}
fatchat commented 5 months ago

from the parent_task_run_id we query "task_runs/filter/" with the query {'task_runs': {'id': {'any_': ['b096e3e4-bd8c-41e7-9dc0-da8b6d09aed1']}}} to get

{'id': 'b096e3e4-bd8c-41e7-9dc0-da8b6d09aed1',
 'created': '2024-05-26T02:00:13.716976+00:00',
 'updated': '2024-05-26T02:59:02.429269+00:00',
 'name': 'tangerine-albatross',
 'flow_run_id': 'a5be711c-657f-484f-be71-adf505db18cb',
 'task_key': '__prefect_loader__.run_airbyte_connection_flow_v1',
 'dynamic_key': '0',
 'cache_key': None,
 'cache_expiration': None,
 'task_version': '8a8fd9c5f7d96408286533e822ba7f84',
 'empirical_policy': {'max_retries': 0,
  'retry_delay_seconds': 0.0,
  'retries': 0,
  'retry_delay': 0,
  'retry_jitter_factor': None},
 'tags': [],
 'state_id': '3c6a2d27-8659-4c18-bbe3-fb052d44dba9',
 'task_inputs': {'payload': []},
 'state_type': 'COMPLETED',
 'state_name': 'Completed',
 'run_count': 1,
 'flow_run_run_count': 0,
 'expected_start_time': '2024-05-26T02:00:13.716412+00:00',
 'next_scheduled_start_time': None,
 'start_time': '2024-05-26T02:00:14.018787+00:00',
 'end_time': '2024-05-26T02:59:02.449981+00:00',
 'total_run_time': 3528.431194,
 'estimated_run_time': 3528.431194,
 'estimated_start_time_delta': 0.302375,
 'state': {'id': '3c6a2d27-8659-4c18-bbe3-fb052d44dba9',
  'type': 'COMPLETED',
  'name': 'Completed',
  'timestamp': '2024-05-26T02:59:02.449981+00:00',
  'message': None,
  'data': {'type': 'unpersisted'},
  'state_details': {'flow_run_id': 'a5be711c-657f-484f-be71-adf505db18cb',
   'task_run_id': 'b096e3e4-bd8c-41e7-9dc0-da8b6d09aed1',
   'child_flow_run_id': 'a0d89150-d46a-4572-99c4-3da3f7bf46e0',
   'scheduled_time': None,
   'cache_key': None,
   'cache_expiration': None,
   'untrackable_result': False,
   'pause_timeout': None,
   'pause_reschedule': False,
   'pause_key': None,
   'refresh_cache': None}}}

which also has no deployment_id. but it does have state_details['flow_run_id'] = 'a5be711c-657f-484f-be71-adf505db18cb' which gives us the deployment_id = '3050c965-1881-457a-ba00-7588a7842fab'

fatchat commented 5 months ago

question: do we put the flow run having id a0d89150-d46a-4572-99c4-3da3f7bf46e0 into our PrefectFlowRun table against this deployment_id or do we ignore it

i.e. should only the flow run having id a5be711c-657f-484f-be71-adf505db18cb go into PrefectFlowRun

fatchat commented 5 months ago

since PrefectFlowRun is used to show the pipeline overview, it should only contain top-level flow runs not children of tasks or top-level flow runs

Ishankoradia commented 5 months ago

since PrefectFlowRun is used to show the pipeline overview, it should only contain top-level flow runs not children of tasks or top-level flow runs

Agreed. Otherwise, we will have wrong count being displayed on the pipeline overview

Ishankoradia commented 5 months ago

So initially we were using this to populate our db with flow runs

Backend : prefect_service.get_flow_runs_by_deployment_id which interally calls Proxy: service.get_flow_runs_by_deployment_id . this function already has "status": flow_run["state"]["type"].

After the last release we now fetch (in the webhook) flow runs and populate in db using Backend: prefect_service.get_flow_run Proxy: service.get_flow_run . By default prefect doesn't have a status column and it needs to translated as above. So this was throwing the graphana errors Keyrror: "status". PR in proxy fixes this