amphp / http-client

An advanced async HTTP client library for PHP, enabling efficient, non-blocking, and concurrent requests and responses.
https://amphp.org/http-client
MIT License
706 stars 66 forks source link

Response streaming during request upload #353

Open danog opened 1 year ago

danog commented 1 year ago

@trowski @kelunik There's an issue with http-client: only when uploading from a stream, request won't return a Response until the stream is closed, even if the server has already started streaming a response.

Here's an example that demonstrates the issue:

<?php

require dirname(__DIR__) . "/vendor/autoload.php";

use Amp\ByteStream;
use Amp\ByteStream\Pipe;
use Amp\ByteStream\ReadableIterableStream;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpContent;
use Amp\Http\Client\Request as ClientRequest;
use Amp\Http\Client\StreamedContent;
use Amp\Http\HttpStatus;
use Amp\Http\Server\DefaultErrorHandler;
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler\ClosureRequestHandler;
use Amp\Http\Server\Response;
use Amp\Http\Server\SocketHttpServer;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Revolt\EventLoop;

use function Amp\delay;
use function Amp\now;
use function Amp\trapSignal;

// Run this script, then visit http://localhost:1337/ in your browser.

$logHandler = new StreamHandler(ByteStream\getStdout());
$logHandler->pushProcessor(new PsrLogMessageProcessor());
$logHandler->setFormatter(new ConsoleFormatter());
$logger = new Logger('server');
$logger->pushHandler($logHandler);

$server = SocketHttpServer::createForDirectAccess($logger);

$server->expose("0.0.0.0:1337");
$server->expose("[::]:1337");

$start = time();
function l(string $v) {
    global $start;
    $diff = time() - $start;
    var_dump("+$diff: $v");
}

$server->start(new ClosureRequestHandler(function (Request $request): Response {
    l("server: Got request");

    $p = new Pipe(1024);
    $sink = $p->getSink();
    EventLoop::queue(function () use ($sink) {
        delay(4.0);
        l("server: Writing full response");
        $sink->write("done");
        $sink->close();
        l("server: Wrote full response");
    });

    l("server: Return response");
    return new Response(HttpStatus::OK, [], $p->getSource());
}), new DefaultErrorHandler());

EventLoop::queue(function () {
    // Instantiate the HTTP client
    $client = HttpClientBuilder::buildDefault();

    $p = new Pipe(1024);
    $sink = $p->getSink();
    EventLoop::queue(function () use ($sink) {
        delay(2.0);
        l("client: Writing full request");
        $sink->write("done");
        l("client: Wrote full request");
        delay(6.0);
        $sink->close();
        l("client: Closed full request");
    });

    $body = StreamedContent::fromStream($p->getSource());
    //$body = "test";
    $request = new ClientRequest('http://localhost:1337', "POST", $body);
    $request->setBodySizeLimit(128 * 1024 * 1024); // 128 MB
    $request->setTransferTimeout(120); // 120 seconds

    // Make an asynchronous HTTP request
    l("client: Made request");
    $response = $client->request($request);
    l("client: !!!! Got response !!!!");

    $bytes = 0;

    // The response body is an instance of Payload, which allows buffering or streaming by the consumers choice.
    // We could also use Amp\ByteStream\pipe() here, but we want to show some progress.
    while (null !== $chunk = $response->getBody()->read()) {
        $bytes += strlen($chunk);
    }

    l("client: Got full response");
});

// Await a termination signal to be received.
$signal = trapSignal([\SIGHUP, \SIGINT, \SIGQUIT, \SIGTERM]);

$logger->info(sprintf("Received signal %d, stopping HTTP server", $signal));

$server->stop();

Results in:

string(24) "+0: client: Made request"
string(23) "+0: server: Got request"
string(27) "+0: server: Return response"
string(32) "+2: client: Writing full request"
string(30) "+2: client: Wrote full request"
string(33) "+4: server: Writing full response"
string(31) "+4: server: Wrote full response"
string(31) "+8: client: Closed full request"
string(34) "+8: client: !!!! Got response !!!!"
string(29) "+8: client: Got full response"

If $body is the StreamedContent, and in

string(24) "+0: client: Made request"
string(23) "+0: server: Got request"
string(27) "+0: server: Return response"
string(34) "+0: client: !!!! Got response !!!!"
string(33) "+4: server: Writing full response"
string(31) "+4: server: Wrote full response"
string(29) "+4: client: Got full response"

If $body is a string.