falconry / falcon

The no-magic web data plane API and microservices framework for Python developers, with a focus on reliability, correctness, and performance at scale.
https://falcon.readthedocs.io/en/stable/
Apache License 2.0
9.51k stars 937 forks source link

bug: sse-stream not properly consumed #1956

Closed ccwienk closed 2 years ago

ccwienk commented 3 years ago

I recently implemented an async route, where I "awaited" some result, but had to send a "keep-alive"-message in case the result takes longer than a certain time (1m in my case).

The exact same code works when setting the generator as value for response.stream, but will get stuck if instead setting it as response.sse (will work for small data, though).

This is my code:

# works:
async def wait_for_result_and_yield_keepalives():
    nonlocal awaitable
    while True:
        done, pending = await asyncio.wait((awaitable,), timeout=30)
        if not done:
            logger.info('sending keep-alive')
            awaitable = pending.pop()
            yield b' '
            continue
        break
    result = done.pop().result()
    logger.info(f'scan done {result=}')
    yield json.dumps(result).encode('utf-8')

resp.stream = wait_for_result_and_yield_keepalives()

# does not work (will get stuck):
async def wait_for_result_and_yield_keepalives():
    nonlocal awaitable
    while True:
        done, pending = await asyncio.wait((awaitable,), timeout=30)
        if not done:
            logger.info('sending keep-alive')
            awaitable = pending.pop()
            yield falcon.asgi.SSEvent(event='keep-alive')
            continue
        break
    result = done.pop().result()
    logger.info(f'scan done {result=}')
    yield falcon.asgi.SSEvent(event='result', json=result)

resp.sse = wait_for_result_and_yield_keepalives()

To be fair, I am so far not very familiar w/ "async", so maybe my code is conceptionally flawed (feedback welcome..). However, I still would expect that behaviour between using response.stream and response.sse ought to be consistent.

Maybe there are more elegant ways of sending those "keep-alives"? I have to send those, because in my (kubernetes) environment, stale HTTP connections are closed (I think by Load-Balancer) after about 1m of inactivity (just in case you wonder why I want to do such a thing in the first place).

CaselIT commented 3 years ago

Hi,

What server are you using to serve your api? Trying with unicorn seems to be working. How long is the wait time usually?

Test application

import asyncio
import json
from datetime import datetime, timedelta

from falcon.asgi import App, SSEvent

async def thing(wait):
    print('waiting for', wait)
    await asyncio.sleep(wait)
    return {'ok': 1}

class Res:
    async def on_get_stream(self, req, resp):
        print('on_get_stream')
        # works:
        async def wait_for_result_and_yield_keepalives():
            awaitable = thing(wait := req.get_param_as_int('wait', default=10))
            while True:
                done, pending = await asyncio.wait((awaitable,), timeout=wait // 10)
                if not done:
                    print('sending keep-alive')
                    awaitable = pending.pop()
                    yield b' '
                    continue
                break
            result = done.pop().result()
            print(f'scan done {result=}')
            yield json.dumps(result).encode('utf-8')

        resp.stream = wait_for_result_and_yield_keepalives()

    async def on_get_event(self, req, resp):
        print('on_get_event')
        # does not work (will get stuck):
        async def wait_for_result_and_yield_keepalives():
            awaitable = thing(wait := req.get_param_as_int('wait', default=10))
            start = datetime.utcnow()
            end = start + timedelta(seconds=wait)
            yield SSEvent(event='scheduled', json={'end': end.isoformat()})
            while True:
                done, pending = await asyncio.wait(
                    (awaitable,), timeout=min(wait // 10, 30)
                )
                if not done:
                    print('sending keep-alive')
                    awaitable = pending.pop()
                    eta = end - datetime.utcnow()
                    yield SSEvent(event='keep-alive', json={'eta': eta.total_seconds()})
                    # yield SSEvent(event='keep-alive')
                    continue
                break
            result = done.pop().result()
            print(f'scan done {result=}')
            yield SSEvent(event='result', json=result)

        resp.sse = wait_for_result_and_yield_keepalives()

app = App()
res = Res()
app.add_route('/stream', res, suffix='stream')
app.add_route('/event', res, suffix='event')
vytas7 commented 3 years ago

Hi @ccwienk ! It is indeed interesting that it works with resp.stream, but hangs with resp.sse.

I understand the whole business logic behind the awaitable variable is probably too complex to share and/or relies on other services. But could you create some sort of mock that still illustrates the issue, but is simple enough to share with us for debugging? Ideally something as simple as one or more asyncio.sleep()s, and returning a fake result fixture.

Adding to @CaselIT 's answer above, could it be that your awaitable is a coroutine object, not a task/future? This could be the cause of things hanging non-deterministically, as explained here for asyncio.wait: confusing behaviour.

vytas7 commented 3 years ago

@CaselIT your awaitable may also be subject to the mentioned pitfall :smiling_imp:

CaselIT commented 3 years ago

@CaselIT your awaitable may also be subject to the mentioned pitfall 😈

I like the risk :)

CaselIT commented 3 years ago

So far I've tried with success 10minutes with sse, trying 20minutes

CaselIT commented 3 years ago

20min was fine with sse

ccwienk commented 3 years ago

dear @CaselIT and @vytas7 : thanks for your swift replies!

I just ran another (more extensive) test, and now also run into the same issue when using response.stream.

I can share the full code, I think (basically, I open an async-socket in order to stream contents to clamd (ClamAV Daemon).

I am using uvicorn. You are correct, I think - my awaitable is a coroutine, I think

this is the function I need to await (I removed some irrelevant parts)

async def _run_scan(self, req, resp):
    reader, writer = await asyncio.open_unix_connection(_lookup_clamd_so

    writer.write(b'zINSTREAM\x00') # this tells clamd that we will send it a stream

    async for chunk in req.stream:
        leng = len(chunk)
        size = struct.pack(b'!L', len(chunk))
        writer.write(size)
        writer.write(chunk)
        await writer.drain()

    # tell clamd we are done 
    writer.write(struct.pack(b'!L', 0))
    await writer.drain()

    async def read_response():
        result = await reader.read()
        result = result.decode('utf-8')
        return result

    result = await read_response()
    return result # actually, I parse this before returning, but this is not relevant for my issue, so I removed this
ccwienk commented 3 years ago

oh dear. So basically I just need to wrap this into a Task? will check

CaselIT commented 3 years ago

oh dear. So basically I just need to wrap this into a Task? will check

I think wait does that for you

ccwienk commented 3 years ago

@CaselIT : just tried. to avoid mis-understandings:

I replaced

awaitable = self._run_scan(req, resp)

with

awaitable = asyncio.create_task(self._run_scan(req, resp))

-> same behaviour :-(

update: I just re-ran my test-code multiple times. It succeeded once, and got stuck at different places (I send a sequence of single files to be scanned - something like 100-ish, summing up to roughly 5 MiBs in total).

not sure whether this helps: the server will always accept the contents (so my client always successfully can write; it stalls upon response-sending)

forgot to answer one more question: typical wait-time is a couple of seconds (depends on file-size, and contents). I saw some few minutes, up do maybe 1h in some cases

will try to strip this down into a "working" reproducer

vytas7 commented 3 years ago

Aha, I see, you are streaming both the request and response at the same time, maybe some complications arise from that :thinking: And yeah, it would be awesome if you could isolate the problem in a smaller reproducible case (easier to dig into this for us).

ccwienk commented 3 years ago

@vytas7 : yes, I am a great fan of streaming :-). asamof, also the client interacting w/ the server retrieves and uploads the contents using streaming

not sure whether this helps: I just tested again using curl rather than the actual client - this consistently works. I suppose, however, I should rather continue debugging tomorrow. Thanks so much for your help so far!

vytas7 commented 3 years ago

Looking at the relevant piece of falcon.asgi.app's source code, I think I see a potential cause though.

resp.sse has a simple disconnect watcher, and while it is handy to have, there are at least two unwanted side effects for your scenario:

ccwienk commented 3 years ago

@vytas7 :

maybe this is not even actually related to falcon, but rather to asyncio? 🤔

vytas7 commented 3 years ago

@ccwienk Tbh, I'm thinking to myself that your use case doesn't really fit the spirit of the SSE spec :thinking:

SSE are most commonly consumed by the browser, and there you cannot really specify which request method is used (it's GET, or maybe a preflight), nor can you send a request body; see also this discussion on Stack Overflow: Can Server Sent Events (SSE) with EventSource pass parameter by POST?

vytas7 commented 3 years ago

in my test-setup there were not premature disconnects. however, when I did abort the client, the code on server-side eventually finished (I have a log/debug-print in place); so to me it seems there is some deadlock condition (maybe related to GIL?); the fact that it occurs often, but not always, somewhat indicates to me it could be a race-condition

Aha, interesting :thinking: That makes it even more intriguing; we really need to get our hands on a reproducible example :smiling_imp:

maybe this is not even actually related to falcon, but rather to asyncio? :thinking:

Yeah, asyncio isn't always easy to reason about...

Just to clarify though, have you seen any problems with resp.stream, or is it still strictly confined to the SSE case?

ccwienk commented 3 years ago

@vytas7 : ever since my initial posting, I exclusively tested using resp.stream (where I now also see issues) I promise I shall provide you w/ a working reproducer today :-)

asamof, I could also share the full code - it is just ~170 lines of code and has just one dependency - a running instance of clamd

ccwienk commented 3 years ago

the only runtime-dependency required, as mentioned, would be a running clamd (https://github.com/Cisco-Talos/clamav); for many Linux distributions there are official packages. If this is too cumbersome for you, I will see I keep my promise and create a stripped-down reproducer..

#!/usr/bin/env python3

import argparse
import asyncio
import functools
import json
import logging
import os
import struct
import sys

import falcon.asgi
import uvicorn

# this is an optional dependency to make uvicorn more verbose; `pip install gardener-cicd-base`, if you want it..
# import ci.log
# ci.log.configure_default_logging()

logger = logging.getLogger('malware_scanner')

@functools.cache
def _lookup_clamd_socket(path='/run/clamav/clamd.sock'):
    if not os.path.exists(path):
        if not os.path.isfile(clamd_cfg_path := '/etc/clamav/clamd.conf'):
            raise ValueError(f'clamd socket does not exist at {path=}')
        # lookup in clamd.conf
        path = _read_cfg('LocalSocket')

        if not path:
            raise ValueError(f'did not find `LocalSocket` directive in {clamd_cfg_path=}')

    if not os.path.exists(path):
        raise ValueError(f'clamd socket not found at {path=}')

    return path

@functools.cache
def _max_chunk_leng_octets() -> int:
    max_leng = _read_cfg('StreamMaxLength')
    if not max_leng:
        return 25 * 1024 * 1024 # 25 MiB

    try:
        return int(max_leng)
    except ValueError:
        pass

    if not max_leng.endswith('M'):
        raise NotImplementedError(f'cannot parse {max_leng=}')

    return int(max_leng[:-1]) * 1024 * 1024 # hard-code amount of MiBs

class MalwareScanner:
    async def on_post(self, req, resp):
        size = req.content_length or 0
        max_chunk = _max_chunk_leng_octets()

        if size and size > max_chunk:
            raise falcon.HTTPPayloadTooLarge(
                title=f'too large - {max_chunk=}',
            )

        async def wait_for_result_and_yield_keepalives():
            awaitable = asyncio.create_task(self._run_scan(req, resp))
            #nonlocal awaitable
            while True:
                done, pending = await asyncio.wait((awaitable,), timeout=30)
                if not done:
                    logger.info('sending keep-alive')
                    awaitable = pending.pop()
                    yield b' '
                    continue
                break
            result = done.pop().result()
            logger.info(f'scan done {result=}')
            yield json.dumps(result).encode('utf-8')

        resp.stream = wait_for_result_and_yield_keepalives()

    async def _run_scan(self, req, resp):
        max_chunk = _max_chunk_leng_octets()
        reader, writer = await asyncio.open_unix_connection(_lookup_clamd_socket())

        writer.write(b'zINSTREAM\x00')

        total = 0

        async for chunk in req.stream:
            leng = len(chunk)
            total += leng
            if total > max_chunk:
                raise falcon.HTTPPayloadTooLarge(
                    title=f'too large - {max_chunk=}',
                )
            size = struct.pack(b'!L', len(chunk))
            writer.write(size)
            writer.write(chunk)
            # await writer.drain()

        writer.write(struct.pack(b'!L', 0))
        await writer.drain()

        async def read_response():
            result = await reader.read()
            result = result.decode('utf-8')
            return result

        result = await read_response()

        writer.close()

        # format: <path>:<remainder> - in streaming, path is always `stream`
        result = result.split(':', 1)[1][:-1].strip()

        if result == 'OK':
            result = {
                'result': 'OK',
                'message': 'no malware was found',
            }
        else:
            result = {
                'result': 'FOUND_MALWARE',
                'message': 'malware was found',
                'details': result,
            }

        return result

def _read_cfg(cfg_name: str):
    if not os.path.exists(clamd_cfg_path := '/etc/clamav/clamd.conf'):
        raise ValueError(f'no cfg file at {clamd_cfg_path=}')

    with open(clamd_cfg_path, 'r') as f:
        for line in f.readlines():
            if not line.startswith(cfg_name):
                continue

            value = line.split(' ', 1)[-1].strip()
            return value

        return None # rather raise?

def app():
    app = falcon.asgi.App()
    app.add_route('/scan', MalwareScanner())

    return app

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--port', type=int, default=5001)
    parser.add_argument('--workers', type=int, default=4)

    parsed = parser.parse_args()

    uvicorn.run(
        'malware_scanner:app',
        port=parsed.port,
        host='0.0.0.0', # allow external connections
        workers=parsed.workers,
        debug=True,
        log_level='debug',
        factory=True,
    )

if __name__ == '__main__':
    main()
vytas7 commented 3 years ago

asamof, I could also share the full code - it is just ~170 lines of code and has just one dependency - a running instance of clamd

In a pinch, that's doable too :+1:

I need to hit the hay now though, it's getting late here (err, early). Hopefully I'll have more time to take a look at this on Friday.

ccwienk commented 3 years ago

just for the sake of completeness, I would also like to share the client I built for the server:

https://github.com/gardener/cc-utils/blob/master/clamav/client_asgi.py

While it's not strictly related to this ticket, I would like to give you some context. I intend to use the server as a malware-scanning service to scan our ever-growing bom of OCI Container Images.

Considering the load pattern (a full scan consists of roughly 25 GiB of data, distributed among some 1000-ish layer-blobs, each containing some 100-ish or 1000-ish files, with a configurable concurrency), I assume we do not really benefit that much from async, and could probably achieve a comparable performance using WSGI. However, I wanted to try async + ASGI :-)

--

I just checked again.

If using response.sse, it will neither work w/ curl, nor "my" client (to be precise: the keep-alives are properly sent, however, the actual awaitable (response from clamd) is never resolved / returned, thus resulting in an endless sequence of keep-alives.

If using response.stream (as from the code I shared above), it will consistently work using curl, and sometimes work using "my" client.

So my initial observation still holds true: there is a difference in behaviour (however, both variants are broken, which may of course be "my fault").

CaselIT commented 3 years ago

20min was fine with sse

I've also done 1h, so the normal logic should not be at fault. not sure about the issue found by @vytas7

CaselIT commented 3 years ago

Looking at the relevant piece of falcon.asgi.app's source code, I think I see a potential cause though.

resp.sse has a simple disconnect watcher, and while it is handy to have, there are at least two unwanted side effects for your scenario:

* The watcher is awaiting ASGI `receive()` continuously, which will effectively drain your request stream, robbing you of some input data.

* The watcher stops iterating upon the client's premature disconnect and leaves behind your SSE event generator, which may or may not make it hang given the pattern you are using.

not sure if we maybe should activate the watched after the first sse even is sent. so move https://github.com/falconry/falcon/blob/e255bff9ae5a90d0cb3fe9af7c16917f18a92dc3/falcon/asgi/app.py#L548 before this line https://github.com/falconry/falcon/blob/e255bff9ae5a90d0cb3fe9af7c16917f18a92dc3/falcon/asgi/app.py#L584 (or maybe before sending the sse event)

but I guess to be sure we should not tough receive at all, since theoretically a responder could start returning data before finishing reading the request stream.

If we move the watcher after the first sse event we could probably document this behavior

cc @kgriffs

CaselIT commented 3 years ago

Looking at the relevant piece of falcon.asgi.app's source code, I think I see a potential cause though.

resp.sse has a simple disconnect watcher, and while it is handy to have, there are at least two unwanted side effects for your scenario:

* The watcher is awaiting ASGI `receive()` continuously, which will effectively drain your request stream, robbing you of some input data.

* The watcher stops iterating upon the client's premature disconnect and leaves behind your SSE event generator, which may or may not make it hang given the pattern you are using.

I was reading the asgi spec https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event

Disconnect - receive event Sent to the application when a HTTP connection is closed or if receive is called after a response has been sent. This is mainly useful for long-polling, where you may want to trigger cleanup code if the connection closes early.

Am I correct in reading that receive can only yield disconnect event after a body has been sent? If a partial body (`more_body=True) is valid it seem that this case could be avoided if the watched is moved after the sent of the first sse event.

But I have no clue in this case where any remaining incoming request body would end up. Discarded by the server?

vytas7 commented 2 years ago

Hi again @ccwienk, and sorry for taking time to get back to you regarding this bug. As discussed before, we don't really handle the SSE case with request body, as it isn't really in the spirit of the standard.

However, I've now found another fairly serious bug which might have been causing this for the non-SSE case, and also causing weird behaviour depending on the HTTP client used. I've filed this as https://github.com/falconry/falcon/issues/2024. I'll try to fix that one ASAP, and if you are still interested in this investigation, you could try whether it resolves your issues.

vytas7 commented 2 years ago

@ccwienk could you possibly retest your scenario with Falcon 3.1.0? A fix for the mentioned bug (#2024) has been shipped, so it would be good to rule out the possibility of this one simply being a duplicate before continuing this investigation.

vytas7 commented 2 years ago

Closing this. Don't hesitate to reopen or file a new issue if this is still a problem in Falcon 3.1+.