fractal-analytics-platform / fractal-server

Fractal backend
https://fractal-analytics-platform.github.io/fractal-server/
BSD 3-Clause "New" or "Revised" License
8 stars 3 forks source link

SSH-command execution blocks API worker #1605

Closed tcompa closed 1 day ago

tcompa commented 5 days ago

If we start fractal-server with a single worker, and trigger background collection of a task package, the API does not respond until the ongoing SSH command is over.

EDIT: see our latest understanding of the problem below: https://github.com/fractal-analytics-platform/fractal-server/issues/1605#issuecomment-2213418931.

tcompa commented 4 days ago

Note: this is the reason why we had to set a huge gunicorn-worker timeout in the current experimental deployment, namely to avoid timeout errors due to the pip install fractal-tasks-core[fractal-tasks] part.

tcompa commented 4 days ago

With @mfranzon

import asyncio
from devtools import debug
import asyncssh
import fabric
import time
from functools import partial
from functools import wraps

host = "172.20.0.2"
username = "fractal"
password = "fractal"
CMD = "sleep 3"

connection_fabric = fabric.Connection(
    host=host,
    user=username,
    connect_kwargs={"password": password},
)
connection_fabric.open()

def async_wrap(func: callable) -> callable:
    """
    Wrap a synchronous callable in an async task.
    """

    @wraps(func)
    async def async_wrapper(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)

    return async_wrapper

async def connect_and_cmd_asyncssh():
    async with asyncssh.connect(
        host,
        username=username,
        password=password,
    ) as connection:
        print("SSH command - START")
        await connection.run(CMD, check=True)
        print("SSH command - END")

async def connect_and_cmd_fabric_vanilla():
    print("SSH command - START")
    connection_fabric.run(CMD)
    print("SSH command - END")

def connect_and_cmd_fabric_vanilla_sync():
    print("SSH command - START")
    connection_fabric.run(CMD)
    print("SSH command - END")

async def connect_and_cmd_fabric_asynchronous():
    print("SSH command - START")
    promise = connection_fabric.run(CMD, asynchronous=True)
    promise.join()
    print("SSH command - END")

async def print_during_sleep():
    for i in range(3):
        print(f"Sleep {i} - START")
        await asyncio.sleep(1)
        print(f"Sleep {i} - END")

async def run(function_to_review: callable):
    """
    Run function together with non-blocking function, to review concurrency
    """
    res1 = asyncio.create_task(function_to_review())
    res2 = asyncio.create_task(print_during_sleep())
    await res1
    await res2

# Run the main function in the event loop
if __name__ == "__main__":
    for NAME, FUNCTION in [
        ("ASYNCSSH", connect_and_cmd_asyncssh),
        ("FABRIC/vanilla", connect_and_cmd_fabric_vanilla),
        ("FABRIC/asynchronous", connect_and_cmd_fabric_asynchronous),
        ("FABRIC/asynchronous+wrap", async_wrap(connect_and_cmd_fabric_vanilla_sync)),
    ]:
        debug(NAME)
        t0 = time.perf_counter()
        asyncio.run(run(FUNCTION))
        t1 = time.perf_counter()
        debug(NAME, t1 - t0)
        print()

Output

test_async.py:97 <module>
    NAME: 'ASYNCSSH' (str) len=8
Sleep 0 - START
SSH command - START
Sleep 0 - END
Sleep 1 - START
Sleep 1 - END
Sleep 2 - START
Sleep 2 - END
SSH command - END
test_async.py:101 <module>
    NAME: 'ASYNCSSH' (str) len=8
    t1 - t0: 3.313834198001132 (float)

test_async.py:97 <module>
    NAME: 'FABRIC/vanilla' (str) len=14
SSH command - START
SSH command - END
Sleep 0 - START
Sleep 0 - END
Sleep 1 - START
Sleep 1 - END
Sleep 2 - START
Sleep 2 - END
test_async.py:101 <module>
    NAME: 'FABRIC/vanilla' (str) len=14
    t1 - t0: 6.025104853000812 (float)

test_async.py:97 <module>
    NAME: 'FABRIC/asynchronous' (str) len=19
SSH command - START
SSH command - END
Sleep 0 - START
Sleep 0 - END
Sleep 1 - START
Sleep 1 - END
Sleep 2 - START
Sleep 2 - END
test_async.py:101 <module>
    NAME: 'FABRIC/asynchronous' (str) len=19
    t1 - t0: 6.014284078999481 (float)

test_async.py:97 <module>
    NAME: 'FABRIC/asynchronous+wrap' (str) len=24
SSH command - START
Sleep 0 - START
Sleep 0 - END
Sleep 1 - START
Sleep 1 - END
Sleep 2 - START
Sleep 2 - END
SSH command - END
test_async.py:101 <module>
    NAME: 'FABRIC/asynchronous+wrap' (str) len=24
    t1 - t0: 3.008613319001597 (float)
tcompa commented 4 days ago

Concerning this block

    @wraps(func)
    async def async_wrapper(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)

This is executed with executor=None, which (as of https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) runs the function in the default executor - namely the ThreadPoolExecutor:

    def run_in_executor(self, executor, func, *args):
        self._check_closed()
        if self._debug:
            self._check_callback(func, 'run_in_executor')
        if executor is None:
            executor = self._default_executor
            # Only check when the default executor is being used
            self._check_default_executor()
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor(
                    thread_name_prefix='asyncio'
                )
                self._default_executor = executor
        return futures.wrap_future(
            executor.submit(func, *args), loop=self)

This likely explains concurrency: it's not a property of the event loop itself (in this case), but rather due to spawning new threads.

tcompa commented 4 days ago

TLDR As of our current understanding, the only way to make fabric really non-blocking is to spawn an additional thread for each call. This is not a big issue, and we can test that things remain under control. Asyncssh, on the other hand, seems to offer concurrency with no special configuration. We haven't fully explored the internal mechanism yet. After following the stack of calls, this happens via https://docs.python.org/3/library/asyncio-future.html#asyncio.ensure_future

tcompa commented 4 days ago

Immediately actionable: add async_wrap to the current run-command fabric function.

tcompa commented 1 day ago

With @mfranzon, we reviewed once again the way fastapi&starlette submit background tasks. Depending on how the background-task function is defined, it is either added to the app event loop (in case of async def) or run in a separate thread (in case of def). This is clearly visible in https://github.com/encode/starlette/blob/6bfe9fea04fe129d20dce25d29ee112e0f60cb43/starlette/background.py#L17-L30.

In the current use of background tasks within fractal-server we've always been using asynchronous functions, which were then executed from within the same event loop where the API requests are handled. For this reason, any blocking operation in a background task would slow down API responses. To mitigate this issue we introduced async_wrap, which effectively runs the process_workflow function from within another thread. A much simpler way to proceed is to just define the background task functions as standard def functions, so that starlette immediately executes them on a different thread - with no need of a custom wrapper.

For the moment we are only adopting the simple solution for the SSH task collection. Later on, we may use the same solution also on the other background tasks.