PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.37k stars 1.64k forks source link

Wait time between tasks is proportional to the size of the data passed between - Kubernetes #9934

Closed josh-gree closed 1 year ago

josh-gree commented 1 year ago

First check

Bug summary

When running flows on Kubernetes - I am seeing muti-minute delays between a task ending and the next task starting - this delay seems to be proportional to the size of the data being passed between tasks. With thhe following flow I am able to reproduce this issue consitently;

import pandas as pd
import prefect
import json

from typing import Dict, List
from prefect import flow, task

from faker import Faker

@task
def t1(N: int) -> List[Dict]:
    Faker.seed(42)

    fake = Faker()

    out = fake.json(
        data_columns={
            "Spec": "@1.0.1",
            "ID": "pyint",
            "x1": "address",
            "x2": "address",
            "x3": "address",
            "x4": "address",
            "x5": "address",
            "x6": "address",
            "x7": "address",
            "x8": "address",
            "x9": "address",
            "x10": "address",
        },
        num_rows=N,
    )
    return json.loads(out)

@task
def t2(in_: List[Dict]) -> pd.DataFrame:
    df = pd.DataFrame(in_)

    logger = prefect.get_run_logger()

    logger.info(df.head())
    return df

@flow
def mvp(N: int, M: int):
    out1 = t1(N)
    out1_df = t2(out1)

    out2 = t1(M)
    out2_df = t2(out2)

This flow consits of two pairs of tasks that can pass varying sizes of data between them

The logs for a run of this flow;

Logs ``` 12:51:23.925 | DEBUG | prefect.profiles - Using profile 'default' /usr/local/lib/python3.9/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour warn(RuntimeWarning(msg)) 12:51:23.977 | DEBUG | MainThread | prefect._internal.concurrency - Waiter watching for callbacks 12:51:23.978 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call retrieve_flow_then_begin_flow_run(UUID('853debb8-0e06-4327-b4f5-9b3cdd6a671b'), user_thread=<_MainThread(MainThread, started 140659442...) in thread 'GlobalEventLoopThread' with timeout None 12:51:23.978 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:23.978 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 12:51:23.979 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call retrieve_flow_then_begin_flow_run(UUID('853debb8-0e06-4327-b4f5-9b3cdd6a671b'), user_thread=<_MainThread(MainThread, started 140659442...) in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 12:51:23.979 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:25.484 | DEBUG | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/e8ae2ca5-703a-4f0a-b3c7-d1dd3c8cbabd/workspaces/e8da6c6a-ac8c-4de9-9fad-12d5a2103129/ 12:51:25.888 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Starting service 12:51:25.889 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Downloading flow code from storage at '/usr/local/lib/python3.9/site-packages/'", 'timestamp': '2023-06-13T12:51:25.887624+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 261} 12:51:25.887 | INFO | Flow run 'convivial-caracal' - Downloading flow code from storage at '/usr/local/lib/python3.9/site-packages/' 12:51:25.890 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:33.715 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 10, 'message': "Importing flow code from 'flows/mvp/flow.py:mvp'", 'timestamp': '2023-06-13T12:51:33.715121+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 230} 12:51:33.715 | DEBUG | Flow run 'convivial-caracal' - Importing flow code from 'flows/mvp/flow.py:mvp' 12:51:33.728 | DEBUG | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/e8ae2ca5-703a-4f0a-b3c7-d1dd3c8cbabd/workspaces/e8da6c6a-ac8c-4de9-9fad-12d5a2103129/ 12:51:33.733 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Downloading flow code from storage at '/usr/local/lib/python3.9/site-packages/'", 'timestamp': '2023-06-13T12:51:25.887624+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 261/3000000) 12:51:33.734 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 10, 'message': "Importing flow code from 'flows/mvp/flow.py:mvp'", 'timestamp': '2023-06-13T12:51:33.715121+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 491/3000000) 12:51:34.703 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 10, 'message': "Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...", 'timestamp': '2023-06-13T12:51:34.702500+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 258} 12:51:34.702 | DEBUG | Flow run 'convivial-caracal' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently... 12:51:34.704 | DEBUG | prefect.task_runner.concurrent - Starting task runner... 12:51:34.705 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:34.707 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 10, 'message': "Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...", 'timestamp': '2023-06-13T12:51:34.702500+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 749/3000000) 12:51:35.732 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 749 12:51:35.858 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 10, 'message': "Executing flow 'mvp' for flow run 'convivial-caracal'...", 'timestamp': '2023-06-13T12:51:35.857620+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 238} 12:51:35.857 | DEBUG | Flow run 'convivial-caracal' - Executing flow 'mvp' for flow run 'convivial-caracal'... 12:51:35.859 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 10, 'message': 'Executing mvp(10, 500000)', 'timestamp': '2023-06-13T12:51:35.859108+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 207} 12:51:35.859 | DEBUG | Flow run 'convivial-caracal' - Executing mvp(10, 500000) 12:51:35.860 | DEBUG | MainThread | prefect._internal.concurrency - Running call mvp(10, 500000) in thread 'MainThread' with timeout None 12:51:35.860 | DEBUG | MainThread | prefect._internal.concurrency - entered 12:51:35.861 | DEBUG | MainThread | prefect._internal.concurrency - Waiter , flow_run_context=FlowRunContext(start_time=DateT...), owner='MainThread'> watching for callbacks 12:51:35.862 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in thread 'GlobalEventLoopThread' with timeout None 12:51:35.863 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:35.863 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 12:51:35.863 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 12:51:35.864 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:36.142 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 10, 'message': "Executing flow 'mvp' for flow run 'convivial-caracal'...", 'timestamp': '2023-06-13T12:51:35.857620+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 238/3000000) 12:51:36.143 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 10, 'message': 'Executing mvp(10, 500000)', 'timestamp': '2023-06-13T12:51:35.859108+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 445/3000000) 12:51:36.149 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't1-0' for task 't1'", 'timestamp': '2023-06-13T12:51:36.148762+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 219} 12:51:36.148 | INFO | Flow run 'convivial-caracal' - Created task run 't1-0' for task 't1' 12:51:36.150 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't1-0' immediately...", 'timestamp': '2023-06-13T12:51:36.150413+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 213} 12:51:36.150 | INFO | Flow run 'convivial-caracal' - Executing 't1-0' immediately... 12:51:36.154 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't1-0' for task 't1'", 'timestamp': '2023-06-13T12:51:36.148762+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 664/3000000) 12:51:36.156 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't1-0' immediately...", 'timestamp': '2023-06-13T12:51:36.150413+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 877/3000000) 12:51:36.562 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Starting service 12:51:36.566 | DEBUG | EventsWorkerThread | prefect._internal.concurrency - Running call get() in thread 'EventsWorkerThread' with timeout None 12:51:36.567 | DEBUG | EventsWorkerThread | prefect._internal.concurrency - entered 12:51:36.984 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 10, 'message': 'Executing t1(10)', 'timestamp': '2023-06-13T12:51:36.984121+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'e4673919-7b0f-4599-be4b-bcb001226f4d', '__payload_size__': 232} 12:51:36.984 | DEBUG | Task run 't1-0' - Executing t1(10) 12:51:36.986 | DEBUG | WorkerThread-0 | prefect._internal.concurrency - Running call t1(10) in thread 'WorkerThread-0' with timeout None 12:51:36.986 | DEBUG | WorkerThread-0 | prefect._internal.concurrency - entered 12:51:36.988 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 10, 'message': 'Executing t1(10)', 'timestamp': '2023-06-13T12:51:36.984121+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'e4673919-7b0f-4599-be4b-bcb001226f4d'} to batch (size 1109/3000000) 12:51:37.060 | DEBUG | WorkerThread-0 | prefect._internal.concurrency - exited 12:51:37.060 | DEBUG | WorkerThread-0 | prefect._internal.concurrency - Finished call t1(10) 12:51:37.061 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:37.062 | INFO | WorkerThread-0 | prefect._internal.concurrency - Exiting worker thread 'WorkerThread-0' 12:51:37.062 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:37.064 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:37.064 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:37.604 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T12:51:37.604141+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'e4673919-7b0f-4599-be4b-bcb001226f4d', '__payload_size__': 401} 12:51:37.604 | INFO | Task run 't1-0' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`')) 12:51:37.606 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T12:51:37.604141+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'e4673919-7b0f-4599-be4b-bcb001226f4d'} to batch (size 1510/3000000) 12:51:37.607 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:37.607 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:37.607 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 12:51:37.608 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) 12:51:37.609 | DEBUG | MainThread | prefect._internal.concurrency - Waiter , flow_run_context=FlowRunContext(start_time=DateT...), owner='MainThread'> watching for callbacks 12:51:37.610 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in thread 'GlobalEventLoopThread' with timeout None 12:51:37.611 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:37.611 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 12:51:37.611 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 12:51:37.612 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:37.783 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't2-0' for task 't2'", 'timestamp': '2023-06-13T12:51:37.782913+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 219} 12:51:37.782 | INFO | Flow run 'convivial-caracal' - Created task run 't2-0' for task 't2' 12:51:37.785 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't2-0' immediately...", 'timestamp': '2023-06-13T12:51:37.784903+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 213} 12:51:37.784 | INFO | Flow run 'convivial-caracal' - Executing 't2-0' immediately... 12:51:37.790 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't2-0' for task 't2'", 'timestamp': '2023-06-13T12:51:37.782913+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 1729/3000000) 12:51:37.792 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't2-0' immediately...", 'timestamp': '2023-06-13T12:51:37.784903+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 1942/3000000) 12:51:38.144 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 1942 12:51:38.701 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 10, 'message': "Executing t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\\nNew Jamesside, MT 29394', 'x2':...)", 'timestamp': '2023-06-13T12:51:38.700868+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'ecb017a4-8ee9-4858-ab9b-e18b17436fdc', '__payload_size__': 334} 12:51:38.700 | DEBUG | Task run 't2-0' - Executing t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\nNew Jamesside, MT 29394', 'x2':...) 12:51:38.703 | DEBUG | WorkerThread-1 | prefect._internal.concurrency - Running call t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\nNew Jamesside, MT 29394', 'x2':...) in thread 'WorkerThread-1' with timeout None 12:51:38.704 | DEBUG | WorkerThread-1 | prefect._internal.concurrency - entered 12:51:38.709 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 10, 'message': "Executing t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\\nNew Jamesside, MT 29394', 'x2':...)", 'timestamp': '2023-06-13T12:51:38.700868+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'ecb017a4-8ee9-4858-ab9b-e18b17436fdc'} to batch (size 334/3000000) 12:51:38.733 | DEBUG | WorkerThread-1 | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 20, 'message': ' Spec ... x10\n0 1.0.1 ... 71822 Arroyo Expressway\\nAllisonchester, IL 71187\n1 1.0.1 ... 48740 Cynthia Village Suite 005\\nLake Tina, GA...\n2 1.0.1 ... 7738 Leon Underpass Apt. 148\\nClarencebury, TX...\n3 1.0.1 ... 003 Mary Road\\nEast David, TN 15494\n4 1.0.1 ... 427 Monique Ports\\nTaylormouth, MA 24665\n\n[5 rows x 12 columns]', 'timestamp': '2023-06-13T12:51:38.708290+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'ecb017a4-8ee9-4858-ab9b-e18b17436fdc', '__payload_size__': 640} 12:51:38.754 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 20, 'message': ' Spec ... x10\n0 1.0.1 ... 71822 Arroyo Expressway\\nAllisonchester, IL 71187\n1 1.0.1 ... 48740 Cynthia Village Suite 005\\nLake Tina, GA...\n2 1.0.1 ... 7738 Leon Underpass Apt. 148\\nClarencebury, TX...\n3 1.0.1 ... 003 Mary Road\\nEast David, TN 15494\n4 1.0.1 ... 427 Monique Ports\\nTaylormouth, MA 24665\n\n[5 rows x 12 columns]', 'timestamp': '2023-06-13T12:51:38.708290+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'ecb017a4-8ee9-4858-ab9b-e18b17436fdc'} to batch (size 974/3000000) 12:51:38.708 | INFO | Task run 't2-0' - Spec ... x10 0 1.0.1 ... 71822 Arroyo Expressway\nAllisonchester, IL 71187 1 1.0.1 ... 48740 Cynthia Village Suite 005\nLake Tina, GA... 2 1.0.1 ... 7738 Leon Underpass Apt. 148\nClarencebury, TX... 3 1.0.1 ... 003 Mary Road\nEast David, TN 15494 4 1.0.1 ... 427 Monique Ports\nTaylormouth, MA 24665 [5 rows x 12 columns] 12:51:38.764 | DEBUG | WorkerThread-1 | prefect._internal.concurrency - exited 12:51:38.764 | DEBUG | WorkerThread-1 | prefect._internal.concurrency - Finished call t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\nNew Jamesside, MT 29394', 'x2':...) 12:51:38.765 | INFO | WorkerThread-1 | prefect._internal.concurrency - Exiting worker thread 'WorkerThread-1' 12:51:38.765 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:38.766 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:38.766 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:38.766 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:38.966 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`'))", 'timestamp': '2023-06-13T12:51:38.966453+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'ecb017a4-8ee9-4858-ab9b-e18b17436fdc', '__payload_size__': 406} 12:51:38.966 | INFO | Task run 't2-0' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`')) 12:51:38.969 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`'))", 'timestamp': '2023-06-13T12:51:38.966453+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'ecb017a4-8ee9-4858-ab9b-e18b17436fdc'} to batch (size 1380/3000000) 12:51:38.969 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:38.969 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 12:51:38.970 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 12:51:38.970 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) 12:51:38.971 | DEBUG | MainThread | prefect._internal.concurrency - Waiter , flow_run_context=FlowRunContext(start_time=DateT...), owner='MainThread'> watching for callbacks 12:51:38.972 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in thread 'GlobalEventLoopThread' with timeout None 12:51:38.973 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:38.973 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 12:51:38.974 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 12:51:38.975 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 12:51:39.157 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't1-1' for task 't1'", 'timestamp': '2023-06-13T12:51:39.157327+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 219} 12:51:39.157 | INFO | Flow run 'convivial-caracal' - Created task run 't1-1' for task 't1' 12:51:39.159 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't1-1' immediately...", 'timestamp': '2023-06-13T12:51:39.159419+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 213} 12:51:39.159 | INFO | Flow run 'convivial-caracal' - Executing 't1-1' immediately... 12:51:39.163 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't1-1' for task 't1'", 'timestamp': '2023-06-13T12:51:39.157327+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 1599/3000000) 12:51:39.165 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't1-1' immediately...", 'timestamp': '2023-06-13T12:51:39.159419+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 1812/3000000) 12:51:40.185 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 10, 'message': 'Executing t1(500000)', 'timestamp': '2023-06-13T12:51:40.185488+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'fca7892a-c8a6-490b-abe0-08bcdd264d41', '__payload_size__': 236} 12:51:40.185 | DEBUG | Task run 't1-1' - Executing t1(500000) 12:51:40.188 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 10, 'message': 'Executing t1(500000)', 'timestamp': '2023-06-13T12:51:40.185488+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'fca7892a-c8a6-490b-abe0-08bcdd264d41'} to batch (size 2048/3000000) 12:51:40.188 | DEBUG | WorkerThread-2 | prefect._internal.concurrency - Running call t1(500000) in thread 'WorkerThread-2' with timeout None 12:51:40.189 | DEBUG | WorkerThread-2 | prefect._internal.concurrency - entered 12:51:40.315 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 2048 13:12:31.573 | DEBUG | WorkerThread-2 | prefect._internal.concurrency - exited 13:12:31.574 | DEBUG | WorkerThread-2 | prefect._internal.concurrency - Finished call t1(500000) 13:12:31.574 | INFO | WorkerThread-2 | prefect._internal.concurrency - Exiting worker thread 'WorkerThread-2' 13:12:32.520 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:12:32.521 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:12:32.521 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:12:32.521 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:12:33.994 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T13:12:33.994210+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'fca7892a-c8a6-490b-abe0-08bcdd264d41', '__payload_size__': 401} 13:12:33.994 | INFO | Task run 't1-1' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`')) 13:12:33.996 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:12:33.996 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:12:33.996 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:12:33.997 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) 13:12:34.002 | DEBUG | MainThread | prefect._internal.concurrency - Waiter , flow_run_context=FlowRunContext(start_time=DateT...), owner='MainThread'> watching for callbacks 13:12:38.650 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T13:12:33.994210+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'fca7892a-c8a6-490b-abe0-08bcdd264d41'} to batch (size 401/3000000) 13:12:47.836 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in thread 'GlobalEventLoopThread' with timeout None 13:12:57.528 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:12:57.528 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:12:57.529 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 13:13:06.606 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:15:21.392 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 401 13:15:21.607 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't2-1' for task 't2'", 'timestamp': '2023-06-13T13:15:21.607242+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 219} 13:15:21.607 | INFO | Flow run 'convivial-caracal' - Created task run 't2-1' for task 't2' 13:15:21.609 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't2-1' immediately...", 'timestamp': '2023-06-13T13:15:21.608905+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 213} 13:15:21.608 | INFO | Flow run 'convivial-caracal' - Executing 't2-1' immediately... 13:15:21.644 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Created task run 't2-1' for task 't2'", 'timestamp': '2023-06-13T13:15:21.607242+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 219/3000000) 13:15:21.645 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Executing 't2-1' immediately...", 'timestamp': '2023-06-13T13:15:21.608905+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 432/3000000) 13:18:42.999 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 432 13:18:47.013 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 10, 'message': "Executing t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\\nNew Jamesside, MT 29394', 'x2':...)", 'timestamp': '2023-06-13T13:18:47.012797+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'f6665d4a-a7d2-4089-8484-cdcd408da6a9', '__payload_size__': 334} 13:18:47.012 | DEBUG | Task run 't2-1' - Executing t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\nNew Jamesside, MT 29394', 'x2':...) 13:18:47.016 | DEBUG | WorkerThread-3 | prefect._internal.concurrency - Running call t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\nNew Jamesside, MT 29394', 'x2':...) in thread 'WorkerThread-3' with timeout None 13:18:50.535 | DEBUG | WorkerThread-3 | prefect._internal.concurrency - entered 13:18:50.536 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 10, 'message': "Executing t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\\nNew Jamesside, MT 29394', 'x2':...)", 'timestamp': '2023-06-13T13:18:47.012797+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'f6665d4a-a7d2-4089-8484-cdcd408da6a9'} to batch (size 334/3000000) 13:18:50.558 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 334 13:18:52.400 | DEBUG | WorkerThread-3 | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 20, 'message': ' Spec ... x10\n0 1.0.1 ... 71822 Arroyo Expressway\\nAllisonchester, IL 71187\n1 1.0.1 ... 48740 Cynthia Village Suite 005\\nLake Tina, GA...\n2 1.0.1 ... 7738 Leon Underpass Apt. 148\\nClarencebury, TX...\n3 1.0.1 ... 003 Mary Road\\nEast David, TN 15494\n4 1.0.1 ... 427 Monique Ports\\nTaylormouth, MA 24665\n\n[5 rows x 12 columns]', 'timestamp': '2023-06-13T13:18:52.377292+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'f6665d4a-a7d2-4089-8484-cdcd408da6a9', '__payload_size__': 640} 13:18:52.411 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 20, 'message': ' Spec ... x10\n0 1.0.1 ... 71822 Arroyo Expressway\\nAllisonchester, IL 71187\n1 1.0.1 ... 48740 Cynthia Village Suite 005\\nLake Tina, GA...\n2 1.0.1 ... 7738 Leon Underpass Apt. 148\\nClarencebury, TX...\n3 1.0.1 ... 003 Mary Road\\nEast David, TN 15494\n4 1.0.1 ... 427 Monique Ports\\nTaylormouth, MA 24665\n\n[5 rows x 12 columns]', 'timestamp': '2023-06-13T13:18:52.377292+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'f6665d4a-a7d2-4089-8484-cdcd408da6a9'} to batch (size 640/3000000) 13:18:52.377 | INFO | Task run 't2-1' - Spec ... x10 0 1.0.1 ... 71822 Arroyo Expressway\nAllisonchester, IL 71187 1 1.0.1 ... 48740 Cynthia Village Suite 005\nLake Tina, GA... 2 1.0.1 ... 7738 Leon Underpass Apt. 148\nClarencebury, TX... 3 1.0.1 ... 003 Mary Road\nEast David, TN 15494 4 1.0.1 ... 427 Monique Ports\nTaylormouth, MA 24665 [5 rows x 12 columns] 13:18:52.422 | DEBUG | WorkerThread-3 | prefect._internal.concurrency - exited 13:18:52.423 | DEBUG | WorkerThread-3 | prefect._internal.concurrency - Finished call t2([{'Spec': '1.0.1', 'ID': 1824, 'x1': '32181 Johnson Course Apt. 389\nNew Jamesside, MT 29394', 'x2':...) 13:18:55.976 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:18:55.998 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:18:55.998 | INFO | WorkerThread-3 | prefect._internal.concurrency - Exiting worker thread 'WorkerThread-3' 13:18:55.999 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:18:56.000 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:18:56.002 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 640 13:18:56.254 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`'))", 'timestamp': '2023-06-13T13:18:56.254193+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'f6665d4a-a7d2-4089-8484-cdcd408da6a9', '__payload_size__': 406} 13:18:56.254 | INFO | Task run 't2-1' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`')) 13:18:56.443 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.task_runs', 'level': 20, 'message': "Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`'))", 'timestamp': '2023-06-13T13:18:56.254193+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': 'f6665d4a-a7d2-4089-8484-cdcd408da6a9'} to batch (size 406/3000000) 13:18:56.444 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:18:56.444 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:18:56.444 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:18:56.444 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call get_task_call_return_value(task=, flow_run_context=FlowRunContext(start_time=DateT...) 13:18:56.450 | DEBUG | MainThread | prefect._internal.concurrency - exited 13:19:05.609 | DEBUG | MainThread | prefect._internal.concurrency - Finished call mvp(10, 500000) 13:19:05.612 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call _wait() in thread 'GlobalEventLoopThread' with timeout None 13:19:05.612 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.612 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.612 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call _wait() in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 13:19:05.612 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call _wait() in thread 'GlobalEventLoopThread' with timeout None 13:19:05.612 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call _wait() in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call _wait() in thread 'GlobalEventLoopThread' with timeout None 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call _wait() in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Running call _wait() in thread 'GlobalEventLoopThread' with timeout None 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.613 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Scheduling coroutine for call _wait() in running loop <_UnixSelectorEventLoop running=True closed=False debug=False> 13:19:05.614 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.614 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.614 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.614 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - entered 13:19:05.614 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.614 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call _wait() 13:19:05.614 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.615 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call _wait() 13:19:05.615 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.615 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call _wait() 13:19:05.615 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:05.615 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call _wait() 13:19:05.621 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:19:05.621 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:19:05.622 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:19:05.622 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - --> return coroutine for internal await 13:19:06.224 | DEBUG | prefect.task_runner.concurrent - Shutting down task runner... 13:19:06.225 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service enqueing item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Finished in state Completed(message='All states completed.', type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T13:19:06.225133+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None, '__payload_size__': 386} 13:19:06.225 | INFO | Flow run 'convivial-caracal' - Finished in state Completed(message='All states completed.', type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`')) 13:19:06.226 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Draining service 13:19:06.226 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Stopping service 13:19:06.227 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Draining service 13:19:06.227 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Stopping service 13:19:06.227 | DEBUG | EventsWorkerThread | prefect._internal.concurrency - exited 13:19:06.228 | DEBUG | EventsWorkerThread | prefect._internal.concurrency - Finished call get() 13:19:06.228 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service added item {'name': 'prefect.flow_runs', 'level': 20, 'message': "Finished in state Completed(message='All states completed.', type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))", 'timestamp': '2023-06-13T13:19:06.225133+00:00', 'flow_run_id': '853debb8-0e06-4327-b4f5-9b3cdd6a671b', 'task_run_id': None} to batch (size 792/3000000) 13:19:06.229 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Exiting service 13:19:06.230 | INFO | EventsWorkerThread | prefect._internal.concurrency - Exiting worker thread 'EventsWorkerThread' 13:19:06.230 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service processing batch of size 792 13:19:06.354 | INFO | APILogWorkerThread | prefect._internal.concurrency - Exiting worker thread 'APILogWorkerThread' 13:19:06.356 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - exited 13:19:06.356 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Finished async call retrieve_flow_then_begin_flow_run(UUID('853debb8-0e06-4327-b4f5-9b3cdd6a671b'), user_thread=<_MainThread(MainThread, started 140659442...) ```

NB: Please do let me know if I should redact any of the above logs?

The parts of the logs that display this issue are - the first pair of tasks pass 10 json records between them;

12:51:37.604 | INFO    | Task run 't1-0' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))
...
12:51:37.782 | INFO    | Flow run 'convivial-caracal' - Created task run 't2-0' for task 't2' ### less than a second later

There is less than a second gap between the end of the first task and the start if the next.

The second pair of tasks pass 500k records between them;

13:12:33.994 | INFO    | Task run 't1-1' - Finished in state Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`')) ### TASK FINISHES

...

13:15:21.608 | INFO    | Flow run 'convivial-caracal' - Executing 't2-1' immediately... ### NEXT STARTS ~2.5mins later

In this case there is 2.5 min gap between the end of one task and the start of the next...

Since I am running on GKE autopilot this 2.5mins of deadtime is extremely costly across many flows - as things stand my only solution is to completely dispense with the use of tasks at all and just wrap vanilla python functions in a flow - this doesn't feel right!

Reproduction

Above

Error

Abvove

Versions

Version:             2.10.13
API version:         0.8.4
Python version:      3.9.9
Git commit:          179edeac
Built:               Thu, Jun 8, 2023 4:10 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud

Not sure this helps so much since this is occuring on a k8s cluster - see additional context

Additional context

Manifests used for the agent in the cluster;

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: prefect-agent
rules:
  - apiGroups: [""]
    resources: ["namespaces"]
    verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: prefect-agent-cluster-role-binding
subjects:
  - kind: ServiceAccount
    name: default
    namespace: prefect2
roleRef:
  kind: ClusterRole
  name: prefect-agent
  apiGroup: rbac.authorization.k8s.io
----
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-agent
  namespace: prefect2
  labels:
    app: prefect-agent
spec:
  selector:
    matchLabels:
      app: prefect-agent
  replicas: 1
  template:
    metadata:
      labels:
        app: prefect-agent
    spec:
      containers:
        - name: agent
          image: prefecthq/prefect:2.10.13-python3.9
          command: ["prefect", "agent", "start", "-q", "kubernetes"]
          imagePullPolicy: "IfNotPresent"
          resources:
            requests:
              cpu: 250m
              ephemeral-storage: 1Gi
              memory: 512Mi
            limits:
              cpu: 250m
              ephemeral-storage: 1Gi
              memory: 512Mi
          env:
            - name: PREFECT_API_URL
              value: https://api.prefect.cloud/api/accounts/e8ae2ca5-703a-4f0a-b3c7-d1dd3c8cbabd/workspaces/e8da6c6a-ac8c-4de9-9fad-12d5a2103129
            - name: PREFECT_EXPERIMENTAL_ENABLE_EVENTS_CLIENT
              value: "False"
            - name: PREFECT_API_KEY
              valueFrom:
                secretKeyRef:
                  name: prefect2-api-key
                  key: prefect2_api_key
----
kind: Namespace
apiVersion: v1
metadata:
  name: prefect2
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: prefect-agent
  namespace: prefect2
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log", "pods/status"]
    verbs: ["get", "watch", "list"]
  - apiGroups: ["batch"]
    resources: ["jobs"]
    verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: prefect-agent-role-binding
  namespace: prefect2
subjects:
  - kind: ServiceAccount
    name: default
    namespace: prefect2
roleRef:
  kind: Role
  name: prefect-agent
  apiGroup: rbac.authorization.k8s.io

Dockerfile that pod runs;

FROM python:3.9.9

COPY . /opt/src

WORKDIR /opt/src
RUN pip install --upgrade pip && pip install poetry && poetry build && pip install --verbose ./dist/*.whl

Pyproject.toml of the project installed in the container;

Suppose my pyproject.yml might be important too;

[tool.poetry]
name = "prefect2_flows"
version = "0.1.0"
description = ""
authors = ["---"]
readme = "README.md"
packages = [{ include = "flows" }]

[tool.poetry.dependencies]
python = ">=3.9,<3.10.0"
prefect = "^2.10.13"
gql = "^3.4.1"
requests-toolbelt = "^1.0.0"
pandas = "^2.0.2"
jsonlines = "^3.1.0"
google-cloud-storage = "^2.9.0"
pandas-gbq = "^0.19.2"
google-cloud-bigquery = "^3.11.0"
faker = "^18.10.1"

[tool.poetry.group.dev.dependencies]
black = "^23.3.0"
ruff = "^0.0.270"
ipython = "^8.14.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
desertaxle commented 1 year ago

Hey @ josh-gree, the delay might be related to the result persistence for your tasks. Can you set persist_result=False on your tasks and see if you still see a large delay between tasks?

zanieb commented 1 year ago

Possible regression related to https://github.com/PrefectHQ/prefect/issues/7065

jakekaplan commented 1 year ago

I did some digging into this. Not surprisingly the biggest time is create_task_run_then_submit split between create_task_run and submit_task_run.

quote() on the parameter totally reduces the submit time, although the task creation time still scales relative to the size of the parameter. For example, if the json size was doubled to 100_000 create task run will take 14s

(theres some extra prints in the source here to show how long create/submit take specifcially)

Output:

(demo-flows) ➜  demo-flows git:(main) ✗ python big_flows.py
11:52:17.884 | INFO    | prefect.engine - Created flow run 'remarkable-urchin' for flow 'my-flow'
11:52:17.890 | INFO    | Flow run 'remarkable-urchin' - View at https://app.prefect.cloud/account/3cf6b38f-5244-474a-9554-302144506e43/workspace/ce8b1412-01b7-4700-a508-8dbd1f43f623/flow-runs/flow-run/8329c2f0-6f84-4532-92b5-abb29b97e49f
No `quote()`:
11:53:58.932 | INFO    | Flow run 'remarkable-urchin' - Created task run 'my_task-0' for task 'my_task'
Create Task Run: 6.912395715713501
11:53:58.940 | INFO    | Flow run 'remarkable-urchin' - Executing 'my_task-0' immediately...
11:54:13.146 | INFO    | Task run 'my_task-0' - Finished in state Completed()
Submit Task Run: 14.208080768585205
Total task time: 21.138995885849
With `quote()`:
11:54:20.471 | INFO    | Flow run 'remarkable-urchin' - Created task run 'my_task-1' for task 'my_task'
Create Task Run: 7.3107006549835205
11:54:20.473 | INFO    | Flow run 'remarkable-urchin' - Executing 'my_task-1' immediately...
11:54:21.018 | INFO    | Task run 'my_task-1' - Finished in state Completed()
Submit Task Run: 0.5463552474975586
Total task time: 7.868835687637329
11:54:21.163 | INFO    | Flow run 'remarkable-urchin' - Finished in state Completed('All states completed.')

Repro script:

import pandas as pd
import json
from prefect.utilities.annotations import quote
from typing import Dict, List
from prefect import flow, task
from faker import Faker
import time

def make_a_giant_json(N: int):
    Faker.seed(42)
    fake = Faker()
    out = fake.json(
        data_columns={
            "Spec": "@1.0.1",
            "ID": "pyint",
            "x1": "address",
            "x2": "address",
            "x3": "address",
            "x4": "address",
            "x5": "address",
            "x6": "address",
            "x7": "address",
            "x8": "address",
            "x9": "address",
            "x10": "address",
        },
        num_rows=N,
    )
    x = json.loads(out)
    return x

@task
def my_task(big_json):
    pass

@flow
def my_flow(n: int):
    big_json = make_a_giant_json(n)
    print("No `quote()`:")
    s = time.time()
    my_task(big_json)
    e = time.time()
    print("Total task time:", e - s)

    print("With `quote()`:")
    s = time.time()
    my_task(quote(big_json))
    e = time.time()
    print("Total task time:", e - s)

if __name__ == '__main__':
    my_flow(n=50000)
josh-gree commented 1 year ago

Thanks for the responses here - much appreciated - I have, as suggested, tried to use the quote annotation and in my real world case it does not reduce the issue particularly - I am currently just running all my flows without tasks - they run sub 2 mins in total now in comparison to 10+ mins using tasks - this will probably just have to do for now which is unfortunate :-(

I will keep an eye here to see if any developments.

josh-gree commented 1 year ago

@desertaxle - I cannot try this today but will get back to you tomorrow and let you know if this helps - I did assume that persistance is opt in anyway right? The UI of the runs does seem to suggest unpersisted results...

josh-gree commented 1 year ago

Hi @zhen0 - what details do you need from me? It seems that @jakekaplan has a clear repro above?

zhen0 commented 1 year ago

@josh-gree - I added it for this line "I cannot try this today but will get back to you tomorrow and let you know if this helps" - I am guessing it didn't help!

Adding to our backlog as it looks like this is still an issue.