amphp / http-server

An advanced async HTTP server library for PHP, perfect for real-time apps and APIs with high concurrency demands.
https://amphp.org/http-server
MIT License
1.29k stars 100 forks source link

Compression is incompatible with streaming responses #324

Closed Nek- closed 2 years ago

Nek- commented 2 years ago

I tried to make a super-simple SSE server (code below). It was just not working until I disabled the compression. Since I assume streamed responses should work with compression enabled, I open an issue about it.

use Amp\ByteStream\ResourceOutputStream;
use Amp\Http\Server\HttpServer;
use Amp\Http\Server\RequestHandler\CallableRequestHandler;
use Amp\Http\Server\Response;
use Amp\Http\Status;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Socket;
use Amp\Http\Server\Request;
use Monolog\Logger;
use Amp\ByteStream\IteratorStream;
use Amp\Producer;

function newEvent() {
    $id = mt_rand(1, 1000);
    return ['id' => $id, 'title' => 'title ' . $id, 'content' => 'content ' . $id];
}

$events = [
    newEvent(),
    newEvent(),
];

Amp\Loop::run(static function () use (&$events) {
    $cert = new Socket\Certificate(__DIR__ . '/../ssl/cert.pem', __DIR__ . '/../ssl/key.pem');

    $context = (new Socket\BindContext)
        ->withTlsContext((new Socket\ServerTlsContext)->withDefaultCertificate($cert));

    $servers = [
        Socket\Server::listen("0.0.0.0:1337"),
        Socket\Server::listen("[::]:1337"),
        Socket\Server::listen("0.0.0.0:1338", $context),
        Socket\Server::listen("[::]:1338", $context),
    ];

    $logHandler = new StreamHandler(new ResourceOutputStream(STDOUT));
    $logHandler->setFormatter(new ConsoleFormatter);
    $logger = new Logger('server');
    $logger->pushHandler($logHandler);

    $server = new HttpServer($servers, new CallableRequestHandler(static function (Request $request) use (&$events) {
        if ($request->getUri()->getPath() === '/') {
            return new Response(
                Status::OK,
                [
                    "content-type" => "text/html; charset=utf-8"
                ],
                <<<FRONT
                <!DOCTYPE html>
                <html lang="en">
                    <head>
                    <title>Yo</title>
                    </head>
                    <body>
                        <h1>Hello World!</h1>
                        <div id="news"></div>
                        <script>
                        //*
                        var news = document.getElementById('news');
                        const evtSource = new EventSource("/sse");
                        evtSource.addEventListener('news', function (event) {
                            news.innerHTML = news.innerHTML + "<p>"+event.data+"</p>";
                        });
                        //*/
                        </script>            
                    </body>            
                </html>
                FRONT
            );
        }

        if ($request->getUri()->getPath() === '/sse') {
            return new Response(
                Status::OK,
                [
                    'Access-Control-Allow-Origin' => '*',
                    'Content-Type' => 'text/event-stream',
                    'Cache-Control' => 'no-cache',
                    'X-Accel-Buffering' => 'no'
                ],
                new IteratorStream(new Producer(function (callable $emit) use (&$events) {
                        while(true) {
                            if (empty($events)) {
                                yield new \Amp\Delayed(10);
                            } else {
                                $data = json_encode(array_pop($events));
                                yield $emit(
                                    "event: news\ndata: $data\n\n"
                                );
                            }
                        }
                    }
                ))
            );
        }

        return new Response(Status::NOT_FOUND, ["content-type" => "text/plain; charset=utf-8"], '404 Not found');

    // uncomment the option part to make it work
    }), $logger/*, (new \Amp\Http\Server\Options())->withoutCompression()*/);

    yield $server->start();

    // Stop the server when SIGINT is received (this is technically optional, but it is best to call Server::stop()).
    Amp\Loop::onSignal(\SIGINT, static function (string $watcherId) use ($server) {
        Amp\Loop::cancel($watcherId);
        yield $server->stop();
    });
});

Using this works:

$server = new HttpServer($servers, stack(new CallableRequestHandler(static function (Request $request) use (&$events) {
    // ...
}), new \Amp\Http\Server\Middleware\CompressionMiddleware(12, 1));
// Note: 12 is minimum data for an sse.
trowski commented 2 years ago

Sorry for the delay in responding – though it was clear you figured this out quickly, probably why none of us felt the need.

CompressionMiddleware buffers a certain number of bytes before compressing them to be more efficient. I wonder if using compression with SSE is wise. You may actually use more data for small payloads. If you are sending larger payloads on average, then reducing the number of bytes buffered in the constructor is exactly the solution you needed.

Nek- commented 2 years ago

The best solution would actually be to buffer only the message. I think the current behavior is far from optimal.

FYI I was looking into that because I was trying to build a mercure.rock implementation on my own.

trowski commented 2 years ago

I missed the obvious here and @kelunik pointed it out in chat. I guess we should change the behavior of CompressionMiddleware for text/event-stream rather than forcing users to disable compression for certain routes.

trowski commented 2 years ago

We also have the stream threshold setting that gets in the way of event streaming. Looks like it would be advantageous to add an event-stream example too.

MBauerDC commented 2 years ago

In the merged change, bufferTimeout is a constructor parameter for CompressionMiddleware, but the property is not set in the constructor. This leads to bufferTimout always being null.

I have prepared the (single-line change) in a PR here: https://github.com/amphp/http-server/pull/339