Closed tcompa closed 1 week ago
Some preliminary work on asyncssh.
First of all, as long as AsyncSSH is designed around Python's asyncio library it is NOT thread-safe (https://docs.python.org/3/library/asyncio-sync.html). If we need to use AsyncSSH in a multi-threaded environment (BackgroundTasks), we should ensure that each thread has its own event loop, like this:
ffrom fastapi import FastAPI, BackgroundTasks
from fastapi import Request
import asyncssh
import asyncio
from contextlib import asynccontextmanager
host = '172.17.0.2'
username = 'test01'
password = 'test01'
command = 'sleep 10'
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Setup START")
app.state.connection = await asyncssh.connect('172.17.0.2',
username="test01",
password="test01",
agent_path=None)
print(f"Startup OK, {app.state.connection.is_closed()=}")
yield
print("Shutdown START")
app.state.connection.close()
await app.state.connection.wait_closed()
print(f"Shutdown OK, {app.state.connection.is_closed()=}")
app = FastAPI(lifespan=lifespan)
async def run_ssh_command(conn):
result = await conn.run(command)
print(f"Command output: {result.stdout}")
def start_ssh_command(conn):
asyncio.run(run_ssh_command(conn))
@app.post("/execute-command/")
async def execute_command(background_tasks: BackgroundTasks, request:Request):
background_tasks.add_task(start_ssh_command, request.app.state.connection)
return {"message": "Command execution started"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
The main problem was observed with SFTP methods, rather than SSH. This example allows us to both consinstely reproduce the error, and to provisionally fix it with an additional lock. Not clear:
Connection
methods keep working when the SFTP client is broken?from contextlib import asynccontextmanager
import asyncio
import threading
import datetime
from fabric import Connection
from fastapi import FastAPI
from fastapi import Request
from fastapi import BackgroundTasks
from fractal_server.ssh._fabric import check_connection
from fractal_server.ssh._fabric import run_command_over_ssh
import time
from contextlib import contextmanager
@contextmanager
def acquire_timeout(lock, timeout):
result = lock.acquire(timeout=timeout)
try:
print("Trying to acquire lock")
yield result
finally:
if result:
lock.release()
print("Lock was acquired, and now released")
class OurOwnLock(object):
lock: threading.Lock
connection: Connection
def __init__(self, connection: Connection):
self.lock = threading.Lock()
self.conn = connection
def put(self, *args, **kwargs):
with acquire_timeout(self.lock, timeout=100):
return self.conn.put(*args, **kwargs)
def get(self, *args, **kwargs):
with acquire_timeout(self.lock, timeout=100):
return self.conn.get(*args, **kwargs)
def run(self, *args, **kwargs):
with acquire_timeout(self.lock, timeout=100):
return self.conn.run(*args, **kwargs)
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Setup START")
app.state.connection = Connection(
host="172.18.0.2",
user="test01",
connect_kwargs={"password": "test01", "allow_agent": False},
)
app.state.our_own_lock = OurOwnLock(connection=app.state.connection)
check_connection(app.state.connection)
print(f"Startup OK, {app.state.connection.is_connected=}")
yield
print("Shutdown START")
app.state.connection.close()
print(f"Shutdown OK, {app.state.connection.is_connected=}")
app = FastAPI(lifespan=lifespan)
def run_sftp_command(conn, i):
print(f"{threading.current_thread()} - SFTP COMMAND - {i}")
conn.put(local="/tmp/test", remote=f"/tmp/AAA/test.container{i:06d}")
time.sleep(1)
print(f"Command SFTP: {datetime.datetime.now()} - {i}")
@app.post("/sftp/{i}")
async def sftp(background_tasks: BackgroundTasks, request: Request, i: int):
background_tasks.add_task(run_sftp_command, request.app.state.our_own_lock, i)
return {"message": f"SFTP {i}"}
@app.post("/still_there")
async def still_there( request: Request):
print(app.state.connection)
print(app.state.connection.sftp())
print(vars(app.state.connection.sftp()))
print(app.state.connection.is_connected)
res = app.state.connection.run("whoami")
print(f"{res=}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
If we do not acquire the lock, for the put
method, we can "break" the SFTP client via
for i in {0..200}; do curl -X POST http://localhost:8000/sftp/$i; done
while if we use the example as is, even requesting 200 SFTP operations does not break the SFTP client. Note that the SSH commands can still run, even when the SFTP client is in a broken state.
Closed with #1618
If we run two SSH task collection background tasks from the same app worker (e.g. by having a single uvicorn/gunicorn worker) and they happen to perform SSH operations simultaneously, this sometimes leads to hanging/broken behavior.
One solution we are exploring is to add a
connectino_lock: threading.Lock
object, that is acquired and released whenever needed.Another option is to look, once again, at asyncssh - to understand whether its connections would be thread-safe (or not). TBD.
Ref #1597