oakserver / oak

A middleware framework for handling HTTP with Deno, Node, Bun and Cloudflare Workers 🐿️ 🦕
https://oakserver.org
MIT License
5.09k stars 231 forks source link

Returning response body as an async-iterable or a readable stream #267

Closed ducaale closed 3 years ago

ducaale commented 3 years ago

Apart from normal data types (string, number, object, etc), Deno.Reader is the only other data type that is supported as a response body. It would be good if oak also supported async-iterable/Readable streams as a response body since they can be easily generated and composed using generators.

function getUsers() {
  return knex('users').stream()
}

async function* reports() {
  for await (const user of getUsers()) {
    yield expensiveOperation(user.id)
  }
}

async function* json2csv(iterable) {
  for await (const obj of iterable) {
    yield Object.values(obj).join(',')
  }
}

router.get('/reports', (ctx) => {
  ctx.response.body = json2csv(reports())
})
ducaale commented 3 years ago

In my quest for getting oak to work with streamed data, I wrote a class for transforming an AsyncIterator to a Deno.Reader. But it doesn't seem the response is being streamed when I test it with curl --no-buffer localhost:8000/stream. Instead, I am getting a response after 7 seconds.

async function* streamData() : AsyncIterator<string> {
  const names = ['Joshua', 'Rodriguez', 'James', 'Martinez', 'Harry', 'Thomas', 'Davis'];
  for (const name of names) {
    await delay(1000);
    yield JSON.stringify({name}) + '\n';
  }
}

class AsyncIterReader implements Deno.Reader {
  constructor(public asyncIter: AsyncIterator<string>) { }

  async read(p: Uint8Array): Promise<number | null> {
    let {value, done} = await this.asyncIter.next();
    if (done) {
      return null;
    }

    const encoded = encode(value)
    p.set(encoded);
    return encoded.length;
  }
}

router.get('/stream', (ctx) => {
  ctx.response.body = new AsyncIterReader(streamData());
})
ducaale commented 3 years ago

It seems that Deno.Reader is waiting until all items from streamData() are available. Only then, it starts to send them one by one.

use std::time::Instant;

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    let start = Instant::now();
    let mut res = reqwest::get("http://localhost:8000/stream").await?;

    while let Some(chunk) = res.chunk().await? {
        println!("[{:?}] Chunk: {:?}", start.elapsed(), chunk);
    }

    Ok(())
}
[7.3896007s] Chunk: b"{\"name\":\"Joshua\"}\n"
[7.3900469s] Chunk: b"{\"name\":\"Rodriguez\"}\n"
[7.3903625s] Chunk: b"{\"name\":\"James\"}\n"
[7.3907648s] Chunk: b"{\"name\":\"Martinez\"}\n"
[7.3911339s] Chunk: b"{\"name\":\"Harry\"}\n"
[7.3914712s] Chunk: b"{\"name\":\"Thomas\"}\n"
[7.3918822s] Chunk: b"{\"name\":\"Davis\"}\n"
kitsonk commented 3 years ago

oak should support WHATWG streams as well as async iterables.

ducaale commented 3 years ago

I have tested the AsyncIterReader class I had in Deno's http/server and it seems responses are being streamed as I was expecting.

import { serve } from "https://deno.land/std@0.84.0/http/server.ts";

async function* streamData() : AsyncIterator<string> { /* code omitted */ }
class AsyncIterReader implements Deno.Reader { /* code omitted */ }

const server = serve({ port: 8000 });
console.log("http://localhost:8000/");
for await (const req of server) {
  const body = new AsyncIterReader(streamData());
  req.respond({ body });
}
# Notice how each chunk of the response is delayed by 1 second
[1.3479045s] Chunk: b"{\"name\":\"Joshua\"}\n"
[2.3538616s] Chunk: b"{\"name\":\"Rodriguez\"}\n"
[3.3683234s] Chunk: b"{\"name\":\"James\"}\n"
[4.3769208s] Chunk: b"{\"name\":\"Martinez\"}\n"
[5.3870532s] Chunk: b"{\"name\":\"Harry\"}\n"
[6.3943863s] Chunk: b"{\"name\":\"Thomas\"}\n"
[7.4102371s] Chunk: b"{\"name\":\"Davis\"}\n"

Edit: I suspect this line is what preventing from the data to be streamed.

kitsonk commented 3 years ago

@ducaale there are a few challenges with the approach... ultimately it needs to be like BufReader in the sense that the reader can be required to span a iterator cycle of some number of reads, depending on the size of the .byteLength of the read buffer passed in.

I have a patch about half done that will fix this and allow the oak response.body to accept async iterables.

NfNitLoop commented 3 months ago

Related issue: #659