python / cpython

The Python programming language
https://www.python.org
Other
63.81k stars 30.55k forks source link

asyncio.from_thread #88472

Open f82f2f79-bfff-44c7-86e7-c93f7a9fd1fe opened 3 years ago

f82f2f79-bfff-44c7-86e7-c93f7a9fd1fe commented 3 years ago
BPO 44306
Nosy @asvetlov, @1st1, @graingert

Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

Show more details

GitHub fields: ```python assignee = None closed_at = None created_at = labels = ['expert-asyncio'] title = 'asyncio.from_thread' updated_at = user = 'https://github.com/graingert' ``` bugs.python.org fields: ```python activity = actor = 'asvetlov' assignee = 'none' closed = False closed_date = None closer = None components = ['asyncio'] creation = creator = 'graingert' dependencies = [] files = [] hgrepos = [] issue_num = 44306 keywords = [] message_count = 3.0 messages = ['395054', '395055', '415911'] nosy_count = 3.0 nosy_names = ['asvetlov', 'yselivanov', 'graingert'] pr_nums = [] priority = 'normal' resolution = None stage = None status = 'open' superseder = None type = None url = 'https://bugs.python.org/issue44306' versions = [] ```

f82f2f79-bfff-44c7-86e7-c93f7a9fd1fe commented 3 years ago

create a asyncio.from_thread shortcut to run async functions from a thread started with asyncio.to_thread

def from_thread(async_func, /, *args, **kwargs):
    """Synchronously run function *async_func* in the event loop thread.

    Any *args and **kwargs supplied for this function are directly passed
    to *func*. Also, the current :class:`contextvars.Context` is propogated,
    allowing context variables from the main thread to be accessed in the
    separate thread.

    Return a concurrent.futures.Future to wait for the result from the event
    loop thread.
f82f2f79-bfff-44c7-86e7-c93f7a9fd1fe commented 3 years ago
"""High-level support for working with threads in asyncio"""

import functools
import contextvars

from . import events
from . import tasks

__all__ = "to_thread", "from_thread"

class _Local(threading.local):
    loop = None

_local = _Local()

def _with_loop(loop, func, /, *args, **kwargs):
    _loop.loop = loop
    try:
        return func(*args, **kwargs)
    finally:
        _loop.loop = None

async def to_thread(func, /, *args, **kwargs):
    """Asynchronously run function *func* in a separate thread.

    Any *args and **kwargs supplied for this function are directly passed
    to *func*. Also, the current :class:`contextvars.Context` is propogated,
    allowing context variables from the main thread to be accessed in the
    separate thread.

    Return a coroutine that can be awaited to get the eventual result of *func*.
    """
    loop = events.get_running_loop()
    ctx = contextvars.copy_context()
    func_call = functools.partial(_with_loop, loop, ctx.run, func, *args, **kwargs)
    return await loop.run_in_executor(None, func_call)

def _create_task(async_func, /, *args, **kwargs):
    return events.create_task(async_func(*args, **kwargs))

async def _with_context(ctx, async_func, /, *args, **kwargs):
    return await ctx.run(_create_task, async_func, *args, **kwargs)

def from_thread(async_func, /, *args, **kwargs):
    """Synchronously run function *async_func* in the event loop thread.

    Any *args and **kwargs supplied for this function are directly passed
    to *func*. Also, the current :class:`contextvars.Context` is propogated,
    allowing context variables from the main thread to be accessed in the
    separate thread.

    Return a concurrent.futures.Future to wait for the result from the event
    loop thread.
    """
    loop = _loop.loop
    if loop is None:
        raise RuntimeError(
            "asyncio.from_thread can only be run in a thread started by "
            "asyncio.to_thread"
        )

    ctx = contextvars.copy_context()
    return tasks.run_coroutine_threadsafe(loop, _with_context(ctx, async_func, *args, **kwargs))
asvetlov commented 2 years ago

How is it better than passing the loop instance explicitly? What is the real use case?

kumaraditya303 commented 2 years ago

Closing as the use case is unclear. Also one can easily implement something like this by passing loop and call_soon_threadsafe.

gvanrossum commented 2 years ago

Let's keep this open.

For more about the use case, this seems a design copied from Trio which has both to_thread() and from_thread(). (I found the Trio design by following some links from the PR that added to_thread(). Maybe there was a plan to add more? Several core folks on the issue that led to the PR were enthusiastic about Trio's design. (The Trio page also explains why the loop doesn't need to be passed in directly.)

@graingert seems to have a small improvement to to_thread(). FWIW maybe we can use the context to pass the loop instead of using a thread-local?

gvanrossum commented 2 years ago

FWIW, asyncio.threads.to_thread() is just a shorthand for run_in_executor but running the function in the current context (using contextvars.copy_context().run()), plus a little trampoline to be able to pass **kwargs (which I should have stopped in the code review -- asyncio doesn't bother with **kwargs when taking a callable and *args).

It's too bad that we didn't think of adding the contextvars.copy_context().run call to the default run_in_executor, then we wouldn't have needed to_thread(). I'm guessing that's too late now -- although perhaps we could add a keyword argument to enable this behavior. (Adding such flags is the reason asyncio doesn't take **kwargs to pass along BTW. :-)

It seems call_soon_threadsafe() already uses the current context, and hence so does run_coroutine_threadsafe() -- and the proposed from_thread() is a very thin wrapper around the latter. Maybe we can make everything here just go away in favor of the new flag to run_in_executor() and the existing run_coroutine_threadsafe()?