PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.34k stars 1.59k forks source link

prefect.get_run_logger() throws MissingContextError when using concurrent.futures.ThreadPoolExecutor #8652

Closed austinweisgrau closed 1 year ago

austinweisgrau commented 1 year ago

First check

Bug summary

prefect.get_run_logger() raises MissingContextError when called in a method called by concurrent.futures.ThreadPoolExecutor.

Reproduction

from concurrent.futures import ThreadPoolExecutor

from prefect import flow, get_run_logger, task

def concurrent_subtask() -> None:
    # This raises MissingContextError
    get_run_logger()

@task
def basic_task():
    get_run_logger().info("This works.")
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = []
        for _ in range(2):
            future = executor.submit(concurrent_subtask)
            futures.append(future)

        for future in futures:
            future.result()

@flow
def helloworld() -> None:
    basic_task()

if __name__ == "__main__":
    helloworld()

Error

$ python flows/helloworld/helloworld_flow.py 
14:24:04.006 | INFO    | prefect.engine - Created flow run 'flying-civet' for flow 'helloworld'
14:24:05.257 | INFO    | Flow run 'flying-civet' - Created task run 'basic_task-0' for task 'basic_task'
14:24:05.258 | INFO    | Flow run 'flying-civet' - Executing 'basic_task-0' immediately...
14:24:05.773 | INFO    | Task run 'basic_task-0' - This works.
14:24:05.776 | ERROR   | Task run 'basic_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1512, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 21, in basic_task
    future.result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 8, in concurrent_subtask
    get_run_logger()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/logging/loggers.py", line 114, in get_run_logger
    raise MissingContextError("There is no active flow or task run context.")
prefect.exceptions.MissingContextError: There is no active flow or task run context.
14:24:05.974 | ERROR   | Task run 'basic_task-0' - Finished in state Failed('Task run encountered an exception: MissingContextError: There is no active flow or task run context.\n')
14:24:05.976 | ERROR   | Flow run 'flying-civet' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 651, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 26, in helloworld
    basic_task()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/tasks.py", line 456, in __call__
    return enter_task_run_engine(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 941, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1090, in get_task_call_return_value
    return await future._result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1512, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 21, in basic_task
    future.result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 8, in concurrent_subtask
    get_run_logger()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/logging/loggers.py", line 114, in get_run_logger
    raise MissingContextError("There is no active flow or task run context.")
prefect.exceptions.MissingContextError: There is no active flow or task run context.
14:24:06.125 | ERROR   | Flow run 'flying-civet' - Finished in state Failed('Flow run encountered an exception. MissingContextError: There is no active flow or task run context.\n')
Traceback (most recent call last):
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 30, in <module>
    helloworld()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/flows.py", line 456, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 170, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 250, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 651, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 26, in helloworld
    basic_task()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/tasks.py", line 456, in __call__
    return enter_task_run_engine(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 941, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1090, in get_task_call_return_value
    return await future._result()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/engine.py", line 1512, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 21, in basic_task
    future.result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/aradox/.pyenv/versions/3.10.9/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/aradox/code/wfp/wfp-prefect/flows/helloworld/helloworld_flow.py", line 8, in concurrent_subtask
    get_run_logger()
  File "/home/aradox/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/logging/loggers.py", line 114, in get_run_logger
    raise MissingContextError("There is no active flow or task run context.")
prefect.exceptions.MissingContextError: There is no active flow or task run context.

Versions

Version:             2.8.2
API version:         0.8.4
Python version:      3.10.9
Git commit:          afbed19d
Built:               Fri, Feb 17, 2023 10:02 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Additional context

No response

zanieb commented 1 year ago

@austinweisgrau you'll need to propagate context variables if you're going to use the context across threads you are managing yourself e.g. https://github.com/PrefectHQ/prefect/blob/main/src/prefect/_internal/concurrency/executor.py#L76-L77

austinweisgrau commented 1 year ago

@madkinsz Can you demonstrate how that would be accomplished? If I swap out the import in my example with from prefect._internal.concurrency.executor import ThreadPoolExecutor I still get the same error.

austinweisgrau commented 1 year ago

A meta-question here is, is using ThreadPoolExecutor within a prefect task an anti-pattern? Is there a preferred implementation for nested concurrency within a prefect task? I'm confused about how to implement nontrivially structured nested concurrent functions within prefect, the docs don't seem to discuss it. (slack thread here)

zanieb commented 1 year ago

@austinweisgrau you could import the Executor from there — if you import ThreadPoolExecutor you're just importing the standard library one. Those are internal models though, so I can't recommend you use them in production. You could accomplish this with your previous example like so:

from concurrent.futures import ThreadPoolExecutor
import contextvars

from prefect import flow, get_run_logger, task

def concurrent_subtask() -> None:
    # This raises MissingContextError
    get_run_logger()

@task
def basic_task():
    get_run_logger().info("This works.")
    context = contextvars.copy_context()
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = []
        for _ in range(2):
            future = executor.submit(context.run, concurrent_subtask)
            futures.append(future)

        for future in futures:
            future.result()

@flow
def helloworld() -> None:
    basic_task()

if __name__ == "__main__":
    helloworld()

A meta-question here is, is using ThreadPoolExecutor within a prefect task an anti-pattern?

Basically, yeah. Prefect tasks are intended to be the smallest level of concurrency. We provide task runners to manage concurrent execution of tasks. Using additional concurrency mechanisms within tasks isn't recommend, but probably it's okay if you're just running your tasks sequentially / locally. You just won't have access to the Prefect context while managing concurrency yourself unless you copy it explicitly.

austinweisgrau commented 1 year ago

Got it, thanks!