aws / aws-lambda-nodejs-runtime-interface-client

Apache License 2.0
188 stars 55 forks source link

Override `highWaterMark` didn't work #94

Open H4ad opened 10 months ago

H4ad commented 10 months ago

The following code will send the data BEFORE 6s:

const util = require('node:util');
const HANDLER_HIGHWATERMARK = Symbol.for(
  'aws.lambda.runtime.handler.streaming.highWaterMark',
);

const handler = async (event, responseStream, context) => {
        responseStream.setContentType('text/html; charset=utf-8');

        responseStream.write("Thinking...");
        responseStream.write("a".repeat(64 * 1024));

        setTimeout(function() {
            responseStream.write("b".repeat(64 * 1024));
        }, 3000);

        setTimeout(function() {
            responseStream.write("After 6s");
            responseStream.end();
        }, 6000);
    };

handler[HANDLER_HIGHWATERMARK] = 512;

exports.handler = awslambda.streamifyResponse(handler, {
        highWaterMark: 512
    }
);

The following code will only send data AFTER 6s.

const util = require('node:util');
const HANDLER_HIGHWATERMARK = Symbol.for(
  'aws.lambda.runtime.handler.streaming.highWaterMark',
);

const handler = async (event, responseStream, context) => {
        responseStream.setContentType('text/html; charset=utf-8');

        responseStream.write("Thinking...");
        responseStream.write("a".repeat(1 * 1024));

        setTimeout(function() {
            responseStream.write("b".repeat(1 * 1024));
        }, 3000);

        setTimeout(function() {
            responseStream.write("After 6s");
            responseStream.end();
        }, 6000);
    };

handler[HANDLER_HIGHWATERMARK] = 512;

exports.handler = awslambda.streamifyResponse(handler, {
        highWaterMark: 512
    }
);

From what I tested, the default highWaterMark is 64Kb, but there is no way to override it to be lower, I tried two different ways to do it.

This causes weird issues where I can't send some tiny text and then wait 6s to send more data, it will be sent as a single chunk only after 6s.

This issue was initially discovered at https://github.com/H4ad/serverless-adapter/issues/166

conico974 commented 9 months ago

From what i've found the threshold seems to be around 16Kb to get reliable streaming, but even that seems to be pretty inconsistent. Calling responseStream.cork() before write and responseStream.uncork() after seems to help a little bit as well.

The things i've noticed is that even for the same data sent, for small chunks, sometimes the stream is buffering and no data is sent, and sometimes it just works well. I assume it might be when the data are close to this highWaterMark or the receiving server are not ready (but this doesn't seem to happen for big chunks)

Potentially the issue might be on the other hand of the stream, they create a request to an external server https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/eeac0b3f2590bc5a3a6172faa04def74cc5ff598/src/ResponseStream.js#L160 and it might be on this server that the buffering is happening. It might also explain why the highWaterMark has no effect locally even though it seems to be applied.

stackia commented 9 months ago

I found the first 2 writes are always bufferer:

const handler = awslambda.streamifyResponse(async (event, responseStream, context)=>{
    for (let i = 1; i <= 10; i++) {
        responseStream.write(i.toString());
        await new Promise((r) => setTimeout(r, 1000));
    }
    responseStream.end();
});

export { handler };

https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/assets/5107241/80ddb647-96c0-4118-a275-238fc148a252

stackia commented 9 months ago

Found a tricky workaround

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms))
}

const handler = awslambda.streamifyResponse(async (event, responseStream, context)=>{
    async function flush() {
        await sleep(0);
        responseStream.write(" ")
        await sleep(0);
    }

    await flush();

    responseStream.write("a".repeat(1 * 1024))
    await flush();

    await sleep(1000)

    responseStream.write("b".repeat(1 * 1024));
    await flush();

    await sleep(1000)

    responseStream.write("c".repeat(1 * 1024));
    await flush();

    await sleep(1000)

    responseStream.write("d".repeat(1 * 1024));
    await flush();

    await sleep(1000)

    responseStream.write("e".repeat(1 * 1024));
    await flush();

    await sleep(1000)

    responseStream.write("f".repeat(1 * 1024));
    await flush();

    responseStream.end();
});

export { handler };

It looks like every write will flush the previous buffer (starting from the second write).

conico974 commented 8 months ago

I’ve delved into some experimentation with this and i found some very interesting things.

The responseStream is only a restricted version of an http.request, and streaming can be achieved without awslambda.streamifyResponse by creating the request ourselves (This is what i've done to eliminate some potential issues)

Moreover, it seems that neither pipe nor pipeline are affected by the minimum size constraint. The example lambda below works fine even with small chunks of data. I think there is still a minimum size but it's far less than with write

Last things i've discovered is that using awslambda.HttpResponseStream.from can also trigger some other buffering issues that can cause the stream to wait for the end. This doesn’t occur consistently, only about 15-20% of the time. I've found a workaround for this, if you let enough time (roughly 25ms has proven effective most of the time, but still not 100%) after the last write from awslambda.HttpResponseStream.from (the req.write(new Uint8Array(8))) and your first write, the stream seems to work fine.

My guess as to why this happen is that if your first write and the req.write(new Uint8Array(8)) happens to end up on the same chunk, then the underlying implementation think that it has not received the end marker and then wait for the full response before returning anything. It's possible that the underlying implementation expect a chunk to end with new Uint8Array(8) to signify the headers’ termination and thus the start of the stream.

For those interested here is the lambda i used

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms))
}
import {request} from "http"
import {setDefaultHighWaterMark, Readable} from "stream"

// Changing this doesn't seem to have any effect
setDefaultHighWaterMark(false,16384)

const handler = async (event, context) => {

    const req = request(
        {
          host: "127.0.0.1",
          port: 9001,
          path: `/2018-06-01/runtime/invocation/${context.awsRequestId}/response`,
          method: 'POST', 
          headers: {
              "Lambda-Runtime-Function-Response-Mode": "streaming",
              "transfer-encoding": "chunked",
              "Content-Type": "application/vnd.awslambda.http-integration-response"
          },
        },
        res => {
            var result = ''
            res.on('data', function (chunk) {
                result += chunk;
            });

            res.on('end', function () {
                console.log(result);
            });
        }
    ) 
    let _resolve
    const promise = new Promise((resolve) => {
        _resolve = resolve
    })
    const size = 64
    console.log(req.writableHighWatermark)

    // This is equivalent to running awslambda.HttpResponseStream.from
    req.write(
        JSON.stringify({
            statusCode: 200,
            cookies: [],
            headers: {
                "content-type": "text/html",
                "x-test": "1"
            }
        }))
    req.write(new Uint8Array(8))
    await sleep(10)

    async function * generate() {
        yield "<html>"
        await sleep(1)
        yield "<p>initial </p>"
        await sleep(1500)
        // yield "<html>"
        for(let i = 0; i <10; i++){
            yield `<p>${i}</p>`
            await sleep(200)
        }
        yield "</html>"
    }
    const dataStream = Readable.from(generate())

    dataStream.pipe(req)
    req.on('end',() => _resolve())
    await promise
    return {}
};

export { handler };
stackia commented 8 months ago

From my guess, the gateway between awslambda.HttpResponseStream and the actual user request is the root cause of all these buffer issues.

Anything writes to awslambda.HttpResponseStream are first streamed to the gateway. The gateway decides when to send its internal buffer to the user. For example, the gateway might flush its buffer every time it receives new data after some idle time.