whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.35k stars 160 forks source link

Support reading bytes into buffers allocated by user code on platforms where only async read is available #253

Closed domenic closed 9 years ago

domenic commented 9 years ago

Had a thought-provoking conversation with @piscisaureus today. He works on libuv and was giving me their perspective on how they do I/O and how it interfaces with Node.js's streams.

He pointed out that resources like files and pipes shared with other processes (seekable resources, he called them) don't have the epoll + read interface that ReadableByteStream's current design is based on. In Node.js, they do blocking I/O in a thread pool with pre-allocated buffers.

This works for a ready + read() JS interface, since then the implementation can pre-allocate (say) a 1 MiB buffer, read into it off-thread using blocking I/O, then fulfill the ready promise. When JS calls read(), it just returns the pre-allocated buffer.

It doesn't work as well with ReadableByteStream's ready + readInto(ab, offset, sizeDesired) interface, since we don't know what size buffer to allocate until too late. If we pre-allocate something too small, readInto will always be returning a number below the desired size, which is a bit user-hostile. If we pre-allocate too large, we are at the least wasting memory, and at the worst getting ourselves into a bunch of trouble as we need to figure out how to merge the "leftovers" with other chunks we read in the future.

(I am assuming here it is easy in V8 to map the [offset, offset + max(sizeRead, sizeDesired)] portion of ab onto a given C++ backing buffer.)

A readIntoAsync(ab, offset, desiredSize) model would work a bit better, since then we'd know to pre-allocate a buffer of size desiredSize. But, I was curious if you guys had any other thoughts? Did you think of this issue when designing the API, @tyoshino? I imagine @willchan could be helpful here too.

tyoshino commented 9 years ago

If the blocking I/O:

Then, we want to allow the user code to allocate a buffer with size determined based on their status. To realize this, yes, we need some API like readIntoAsync(). But between ready + read() and ready + readInto(ab, offset, sizeDesired), I don't think there's so much difference. ready + read() doesn't know what size is appropriate. 1MiB may be too big or too small. The only disadvantage of ready + readInto() for this kind of I/O is an extra copy (internal buffer to user allocated buffer).

In what form can we provide readIntoAsync() together with read()? To allow sync read() we have on the ReadableStream, we need to pull data from the source at least when we're in the "waiting" state. Otherwise, we never enter "readable" state. But this spoils the readIntoAsync()'s benefit. We'll have some data already pulled (to serve for read() user) when readIntoAsync() is called.

So, we need to add some interface to suppress automatic pull when we want to use readIntoAsync() style reading?


If the blocking I/O:

Then, ready + read()'s

(I assumed that the unblocking event leads to fulfillment of ready, and read() allocates a buffer with suitable size and populates it)

For this case, there's no big difference between readIntoAsync() and readInto(), I think.

domenic commented 9 years ago

Sorry for the delay on this. I had a hard time paging it back in.

Unfortunately I think it is the first case:

  • accepts a buffer
  • based on the size of the given buffer, controls how much data it pulls from the source

This can be seen by looking at e.g. Win32 ReadFile or Unix fread and pread.

In what form can we provide readIntoAsync() together with read()? To allow sync read() we have on the ReadableStream, we need to pull data from the source at least when we're in the "waiting" state. Otherwise, we never enter "readable" state. But this spoils the readIntoAsync()'s benefit. We'll have some data already pulled (to serve for read() user) when readIntoAsync() is called.

Yes, I agree, this is a problem. The only thing I can think of immediately is to remove sync read() and the whole waiting/readable state transition :(. Assuming that we would carry that over to ReadableStream that would be a lot of churn this late in the game which is frustrating. Maybe we can think of a better solution. E.g. maybe SeekableReadableByteStream only matches ReadableStream in its pipeTo API? :-/

tyoshino commented 9 years ago

Shall we add a new mode to ReadableByteStream, say, manual feed mode, named after manual paper feed of printers.

ReadableByteStream would have a method named .feedArrayBuffer(arrayBuffer, offset, size). Via this method, the user feeds a tuple of an ArrayBuffer, offset and size. The tuples are queued and used by the ReadableByteStream to create the result ArrayBuffers to return from the .read() method. How to interpret the size argument is up to the API that returns the ReadableByteStream. The state and read() are kept the same as ReadableStream but ArrayBuffer feeding happens asynchronous to them.

tyoshino commented 9 years ago

PR #282 is quick prototype of the manual feedable stream.

wanderview commented 9 years ago

Warning: naming bikeshed ahead... feel free to ignore.

I think an interface for manual allocation might be useful, but not sure the "feedable" metaphor resonates. I've seen this kind of thing called an "allocation strategy" other places, etc.

It seems this would definitely be a power user feature, though. So lets make sure its optional?

domenic commented 9 years ago

@tyoshino this is sounding promising. I think I need a bit more detail to understand how it's supposed to work though. Especially how it interacts with readInto. I think I like the general approach though, of a more-specialized variant of ReadableByteStream that helps with this case.

Would love if you could flesh out the PR with some examples of consumer code, using read/readInto/feedArrayBuffer.

As for naming, I don't have any strong preferences. "Strategy" is kind of already taken, but something about allocation might be nice.

tyoshino commented 9 years ago

wanderview: I agree that this kind of stuff is called allocation something or allocator. I used feeding since it's not passive (asked to allocate) but the user tells it to pull by putting a new ArrayBuffer via the feedArrayBuffer() method. I don't care so much about the naming but that's my feeling. Feedable might be misleading. ManualArrayBufferFeeding... is the most descriptive name I come up with but too long.

wanderview commented 9 years ago

Could we use the revealing constructor pattern here again? Something like:

var rbs = new ReadableByteStream({
  allocate: function() { return new ArrayBuffer(myPreferredSize); }
});

Then we don't need a new stream name. :-)

If we add a free hook, then you could build a block pool allocator strategy:

// use ping pong buffers
var blocks = [];
blocks[0] = new ArrayBuffer(4096);
blocks[1] = new ArrayBuffer(4096);

var rbs = new ReadableByteStream({
  allocate: function() { return blocks.pop(); },
  free: function(buf) { return blocks.push(buf); }
});

If we don't like confusing these attributes with the constructor "source" concept, then it could be set in a different method. Something like:

var rbs = new ReadableByteStream(/*...*/);

// use ping pong buffers
var blocks = [];
blocks[0] = new ArrayBuffer(4096);
blocks[1] = new ArrayBuffer(4096);

rbs.setAllocator({
  allocate: function() { return blocks.pop(); },
  free: function(buf) { return blocks.push(buf); }
});

Would this address the problem?

domenic commented 9 years ago

I don't think so, unless I'm misunderstanding. The scenario I think we're trying to solve is that the consumer of the stream wants to be able to do "fill up count bytes of ab starting at offset" and then the stream will use this to do an async (i.e., blocking in another thread) operation like fread.

I guess setAllocator would do that, since the consumer gets access to it. The revealing constructor would not though, because it assumes the consumer is the same as the creator of the stream, which is rarely the case.

domenic commented 9 years ago

What would be most helpful for me to understand for all these proposed ideas is, how would a consumer---given a (Feedable?)ReadableByteStream from the UA---use the stream.

For example, if we took a drastic approach and overhauled everything so that readInto and read became async, it would be pretty clear:

// Passes ab, 0, CHUNK_SIZE to fread, which uses it
rbs.readInto(ab, 0, CHUNK_SIZE).then(...)

// Uses the default chunk size, currently specified by the stream constructor (although we could allow it to be specified by consumers, hmm).
rbs.read().then(...)

What would it look like for the feedBuffer and setAllocator situations?

wanderview commented 9 years ago

Actually, I had not thought about setAllocator() in the context of a DOM API producing the stream. I'm not sure we want to make the UA depend on content script to allocate buffers. Seems like a recipe for a bad time.

Async read seems ok to me, but I think it opens some questions. What does it do for concurrent read() calls? Do we allow overlap? Ordering guaranteed?

domenic commented 9 years ago

Well, that was kind of the entire point of ReadableByteStream, was to make it possible to do zero- or one-copy reads into user-allocated ArrayBuffers. For example you could do repeated readInto()s into the same array buffer that was pre-allocated to be the size of the file (determined by stat-ing) or response (determined by Content-Length).

Async read was more of a strawman, and I'd rather not dive into it right now because it'd be a lot of spec churn. (I previously explored it here, although at the time I wasn't aware of this particular case.) It's also less efficient for read(2)-style streams like HTTP streams, at least in theory. If we do decide to go that way then ... I'd feel bad about the churn at what seems like a pretty late stage, but it'd be OK I guess. But you guys had some interesting ideas for meshing the existing wait-for-ready-then-sync-read(Into) design with this use case, and I mainly wanted to see what those would look like in practice.

wanderview commented 9 years ago

Sorry. You're right. Sorry for my confusion. I do think setAllocator() could work.

To answer your question, I think an allocator function could handle this like so:

var rbs = new ReadableByteStream(/* ... */);

var bufferSize = 4096;
rbs.setAllocator({
  allocate: function() { new ArrayBuffer(bufferSize); }
});

rbs.read(); // uses 4096 byte buffer;

bufferSize = 136;
rbs.read() // uses 136 byte buffer

I don't really like depending on state between two statements like that, but it seems the feedBuffer() approach has the same problem.

domenic commented 9 years ago

I think I see.

OK, I think what we need to make this more concrete now is a series of use cases that we want to write sample code for.

Here is what I have off the top of my head. Please give me more :).

Here is my sample code for setAllocator, let me know if I got this right

// Common prelude
const rbs = getReadableByteStreamForFile("/path/to/file.flac");
const ONE_MIB = 1024 * 1024;
const TEN_MIB = 10 * ONE_MIB;
// (all)
const ab = new ArrayBuffer(TEN_MIB);

// This begins the fread(ab, 0, ab.byteLength) call
rbs.setAllocator({
  allocate() { return ab; }
});

// fulfills when the fread() call finishes
rbs.ready.then(() => {
  const readValue = rbs.read();
  assert(readValue === ab);
  console.log("done!", ab);

  // stream is now closed
});
// (chunkwise)
const ab = new ArrayBuffer(ONE_MIB);

// This begins the fread(ab, 0, ab.byteLength) call
rbs.setAllocator({
  allocate() { return ab; }
});

// fulfills when the first fread(ab, 0, ONE_MIB) call finishes
rbs.ready.then(() => {
  const readChunk1 = rbs.read();
  assert(readChunk1 === ab);
  console.log("first chunk", ab);

  // queue is now empty but we're not done, so stream automatically kicks off
  // fread(ab, ONE_MIB, ONE_MIB). This next rbs.ready will fulfill when that finishes

  rbs.ready.then(() => {
    const readChunk2 = rbs.read();
    assert(readChunk2 === readChunk1); // as desired; re-using the same buffer
    console.log("second chunk", ab);

    // Etc., until chunk 10, at which point the stream is closed.
    // Recursion left as an exercise for the reader.
  });
});
// (ping-pong)
const abs = [new ArrayBuffer(ONE_MIB), new ArrayBuffer(ONE_MIB)];
const counter = 0;

// This begins the fread(abs[0], 0, abs[0].byteLength) call
rbs.setAllocator({
  allocate() {
    return abs[counter++ % 2];
  }
});

// fulfills when the first fread(abs[0], 0, abs[0].byteLength) call finishes
rbs.ready.then(() => {
  const readChunk1 = rbs.read();
  assert(readChunk1 === abs[0]);
  console.log("first chunk", readChunk1);

  // after the rbs.read() call, the stream's queue is now empty but we're not
  // done, so stream automatically kicks off fread(abs[1], ONE_MIB, ONE_MIB).
  // This next rbs.ready will fulfill when that finishes

  Promise.all([
    processChunkAsync(readChunk1),
    rbs.ready
  ])
  .then(() => {
    const readChunk2 = rbs.read();
    assert(readChunk2 === abs[1]); // as desired
    console.log("second chunk", readChunk2);

    // Note that after calling rbs.read() this second time the stream is doing
    // fread(abs[0], 2 * ONE_MIB, ONE_MIB), so abs[0] === readChunk1 is starting
    // to be reused.

    // Anyway, keep going until chunk 10, abstract into recursive function, etc.
  });
});

Notably I didn't use your free hook for the (ping-pong) case, so maybe I'm missing something?

wanderview commented 9 years ago

I think you're right that free does not make sense. If anything, the consumer would need to be doing the free'ing not the stream. So it can just handle that logic itself if necessary.

I guess that raises the question if allocate() can return a promise to delay the next fread() until a re-usable block is ready to be processed again.

I didn't consider that fread() would actually be triggered by setAllocator(). I understand that a bit better now. Would we want a better name?

Its not completely clear to me how this would interact with readInto(), though. Consider:

rbs.setAllocator(function() { return new ArrayBuffer(1024);}
rbs.readInto(buf, 0, 512);
rbs.read();  // what exactly does this return?  a slice of the allocator returned buffer? 
domenic commented 9 years ago

I didn't consider that fread() would actually be triggered by setAllocator(). I understand that a bit better now. Would we want a better name?

I think maybe that is the motivation behind @tyoshino's feedArrayBuffer() paradigm. setAllocator() is a bit more automatic since you call it once instead of every time, but the ideas are starting to converge I think :).

Its not completely clear to me how this would interact with readInto(), though.

Hmm, yeah, readInto() makes less sense here. Maybe the right way to phrase it is, I'm not sure what use case it would address. Perhaps readInto() makes more sense for the read(2)-esque streams instead of the fread-based streams, and wouldn't exist on AllocatorControlledReadableByteStream (or whatever)?

domenic commented 9 years ago

One advantage of setAllocator over feedArrayBuffer is that once the allocator is set, you can pass off the ReadableByteStream to someone else and they can consume it as if it's a normal readable stream. I think with feedArrayBuffer they need to be aware of the feed/read cycle and can't just do ready/read like normal.

wanderview commented 9 years ago

It would be nice if this didn't require a new type.

I think setAllocator + readInto would be ok. It would just copy the data an extra time. The fread into the allocator returned buffer and then again into the readInto buffer. Not recommended, but not breaking.

Any readInto call would be limited to the size of the allocator returned buffer. If readInto completely drains the buffer, then the allocate() function is called again.

domenic commented 9 years ago

I don't have much against new types, as long as they are duck-compatible on a large subset of the API. shrug

wanderview commented 9 years ago

Well yea, but you were talking about removing a method... so a duck with only one foot. :-)

tyoshino commented 9 years ago

Hmm, yeah, readInto() makes less sense here.

Yeah. Streams with fread style underlying source would be queue-backed stream (though it's likely to be a single element queue) (see https://github.com/whatwg/streams/pull/275#issuecomment-73470112). So, readInto() is not useful but introduces an extra copy. Ben's https://github.com/whatwg/streams/issues/253#issuecomment-73938444 should just work.


One advantage of setAllocator over feedArrayBuffer is

I agree that the new user doesn't need to do the feeding, but the new user needs to:

These were not required for normal ReadableStream user. So, I think streams to pass to someone else should have an allocator that doesn't reuse ArrayBuffers.

For use cases in which the consumer code wants to adjust the size to pull (pass to fread()):

So, I suggest that we encapsulate fread() style underlying byte source by say, ManualBufferFeedReadableStream that has .feedArrayBuffer() but also has some switch to make it "auto-pull" or even .setAllocator() method to accept custom ArrayBuffer allocator so that it can be passed to ReadableStream consumers. Or, we could also introduce kind of "joint" (like transform-stream concept) that encapsulates the fread() style ReadableStream, calls .feedArrayBuffer appropriately and exports ReadableStream interface.

Hmm, but now I wonder if this is something we should define in the Streams standard or not. Maybe we should just include some advice how to encapsulate blocking I/Os with the ReadableStream interface?

domenic commented 9 years ago

Hmm, but now I wonder if this is something we should define in the Streams standard or not. Maybe we should just include some advice how to encapsulate blocking I/Os with the ReadableStream interface?

What is really worrying to me is that we seem to have painted ourselves into a corner, so that now all the ideas that we're considering involve passing off the resulting complexity to authors. Whether it be feedArrayBuffer() or setAllocator() or "follow these guidelines in the spec," in all cases it feels like this kind of sucks. All of the stuff we're discussing now seems too low-level---it's more stuff that the stream creator should be concerned with, when developing their underlying source object, and not stuff the consumer should be concerned with.

Stated another way. One of the things the current design does really well, I think, is focus on unifying push and pull sources into a single author-facing API: a "readable stream." I think it should be within our charter to unify read(2)-backed and fread-backed readable byte streams. (And, I also think it's important to make readable byte streams a special case of readable streams---or an extension of them, depending on how you look at it.)

At this point I really don't see any truly good solution except moving to async read---everywhere. But I feel really bad about this. One of the very original points of contention between your W3C streams draft and my WHATWG streams draft, inspired by Node streams, was saying that sync read was important. Now, I don't think at the time either of us had a complete understanding of all the issues involved, and how it would impact the byte-stream case due to this fread vs. read(2) dichotomy combined with the desire to specify up front how many bytes you want to read. (Or maybe you did and I just wasn't listening? :-/) We've learned a lot about the problem domain since and in general it's good to revise APIs in light of new information. But it's late! You guys have an implementation in Chrome that you want to ship soon! And making this big of a change this late gives off instability signals that make me quite sad.

I guess what I'm asking is---if I can draft up complete semantics for a switch to async read over the next week, how disruptive do you think this would be? To the implementation you guys are hoping to ship soon for Fetch, and to the ecosystem overall? Here's a tentative sketch of what I'm thinking, happy to revise:

ReadableStream

ReadableByteStream

Please let me know ASAP if you think I should work on this. If you think it's too late for such a big change and we should explore other options, I can understand. But if you think it's worth doing then I want to get it ready ASAP so as to minimize disruption.

tyoshino commented 9 years ago

I'd like to do some survey and create a table of APIs and their characteristics (sync/async, blocking/nonblocking, bring our own buffer/returns API allocated buffer, pull and read are separate/pull and read are combined, ...). Includes some non software API stuff for metaphor.

tyoshino commented 9 years ago

Domenic: Oh, OK. I'll think about revisiting everything. But for now, I'm optimistic about sync read(). Like .feedArrayBuffer(), we could add, say .feedReadToken() to ReadableStream to allow users to manually control how much objects to pull. In the "precise flow control" world I envisioned, these pull/read separated interfaces are the base than the current ReadableStream in the spec.

Current ReadableStream is relieving users from doing this manually by having an automatic token (without size arg) feeder named "strategy". Current ReadableByteStream is relieving users from allocating ArrayBuffers manually by allocating them automatically but with size=underlyingByteSource.readBufferSize. I'd view them as easy to use variants of the base interface, ManualBufferFeed.*.

It looks that the pros/cons of the promise-returning version you've sketched in https://github.com/whatwg/streams/issues/253#issuecomment-74114680 are:

domenic commented 9 years ago

Current ReadableStream is relieving users from doing this manually by having an automatic token (without size arg) feeder named "strategy". Current ReadableByteStream is relieving users from allocating ArrayBuffers manually by allocating them automatically but with size=underlyingByteSource.readBufferSize. I'd view them as easy to use variants of the base interface, ManualBufferFeed.*.

This does make me feel better. If we can make everything fit into one conceptual model, maybe we can even add async read/readInto on top as new methods (named e.g. readWhenReady()) for people who don't want to have to worry about it. Would love to get some more fleshed out examples from you about how to fit this all together. I like the idea of strategy being a special case of a more general feeder concept, as a way of unifying things.

domenic commented 9 years ago

I'm not fully sure if there're real use cases, but separation of pull and read basically gives users more degree of freedom. This approach loses it since pull and read are associated.

I can see the advantage of having the separation as being a more direct mapping to underlying system resources and thus giving you more control for the cases where you might need it. This is where stuff like your survey list come in handy.

tyoshino commented 9 years ago

Still thinking, but I started feeling that .readInto() shouldn't be on the ReadableByteStream but rather be a part of the pulling tools such as .feedArrayBuffer(). Current ReadableStream is a nicely-simple and easy-to-understand queue representer. It may receive pulling signal implicitly via .read() call, but except that point, it's just a queue. Maybe we should decouple pulling methods/attributes when designing custom pulling tool than complicating the simple ReadableStream interface more.

For POSIX socket,

class ByteSource {
  // true when the socket has any data for read.
  get pullable()
  // Returns the associated ReadableStream.
  get stream()
  // Returns a promise that fulfills when pullable value changes from
  // one on the watch() call.
  watch()
  // Calls read(2) on the given Uint8Array. Queues a new Uint8Array
  // representing the written region to the associated ReadableStream.
  readInto(abv)
}

For blocking or async I/O interfaces which takes a buffer on invocation,

class ByteSource {
  // Returns the number of bytes being pulled from the I/O but not finished.
  get bytesPulling()
  // Returns the number of bytes queued on the associated ReadableStream.
  get bytesReadable()
  // Returns the associated ReadableStream.
  get stream()
  // Returns a promise that fulfills when
  watch()
  // Kicks the async I/O with the given Uint8Array. Queues a new Uint8Array
  // representing the written region to the associated ReadableStream
  // on completion.
  pull(abv)
}
domenic commented 9 years ago

(Wow, when do you sleep @tyoshino!?)

Hmm yeah that is pretty interesting. I would probably use it :). I'm worried it might not be author-friendly though, even if it is good for implementers and gives a good mapping down to lower-level stuff.

Maybe it would help to more explicitly lay out the author use cases I am thinking of:

(1) Author who wants to operate on a generic stream without worrying about where it comes from or what's in it. In the current design they'll use rs.pipeTo in the simplest case and rs.ready/rs.state/rs.read, or maybe the reader versions of those, for more fine-grained control. In general this class of use cases is about generic compositionality---i.e., in the same way people write libraries and functions that are useful for all promises, we want people to be able to do so for all readable streams.

(2) Authors who are concerned about efficiency, at least at a high-level. Mainly, this would mean authors who want to avoid buffer copies, I think. For these authors I think a rs.readInto API, or something similar, is ideal.

(3) Authors who want direct control over the underlying sources as much as possible. These authors also presumably want some abstraction, otherwise they would not be using streams but instead would be manipulating the underlying source directly.

From my perspective your ByteSource ideas are pretty nice for (3). However I feel like async read/readInto address (1) and (2) more nicely. And I think maybe async read/readInto addresses (3) when combined with a strategy (or perhaps with something more like your precise flow control). The main advantage though is that it keeps the number of APIs small so that we can all agree on a single one that many specs can start returning.

Note that having a single API doesn't necessarily mean having a single backing model. As you noted the existing design is very queue-based. I think what we've learned here is that queue-based works well for (1). However I don't think it works as well for (2). The current thinking is that ReadableByteStream and ReadableStream would share the same API so that (1) authors could use both interchangeably. Whereas (2) authors would use the extra readInto method, and would depend on ReadableByteStream not being backed by a queue-based model at all.

tyoshino commented 9 years ago

Thanks for review, Domenic. I'll reply to your analysis later. Here's an attempt to keep sync read(). Follow up discussion coming today, maybe.

class ReadableStream {
  ...
  // Controls how much data to pull. For queue-backed ones, this corresponds to
  // the high water mark. Set a positive integer to pull or 0 not to pull. It
  // depends on each implementation whether the size of the positive integer is
  // interpreted or not. Initially set to 0.
  get window()
  set window(value)
  ...
}
class ReadableByteStream extends ReadableStream {
  // Returns the number of bytes available for synchronous read by readInto()
  // method.
  get buffered()
  // Feeds an ArrayBuffer with region specification. The next pull is done with
  // the specified buffer. If window has ever been set to non 0, throws.
  feedBuffer(uint8Array)
  // Reads bytes available for synchronous reading, stores them into the given
  // Uint8Array and returns the number of bytes stored (can be less than the
  // size of the specified region).
  readInto(uint8Array)
}
yutakahirano commented 9 years ago

I'm not strongly opposing to async read, but reforming ReadableStream's API largely because of a platform issue on ReadableByteStream seems not correct.

domenic commented 9 years ago

@yutakahirano

reforming ReadableStream's API largely because of a platform issue on ReadableByteStream seems not correct.

I understand where you're coming from. But I think it's a bit more than that. In general we design APIs are around platform limitations. The unfortunate discovery of this thread was that sync read is less flexible than async read. This is actually fairly common, to design something async even though some platforms could make it sync---e.g. navigator.getBattery() is promise-returning because for multi-process browsers it would require IPC, whereas single-process browsers (like the original implementation, in Firefox) used non-promise-returning navigator.battery. So in retrospect I feel kind of silly for not realizing it.

Regarding ReadableByteStream's impact on ReadableStream, I think it's pretty important to think of them as related. If we were in Java for example, there might be an IReadableStream interface defining the base contract, and then both ReadableStream and ReadableByteStream would implement IReadableStream. (Maybe it would be clearer if ReadableStream were named ReadableQueueStream!?) In JS we don't have a formal concept of an interface but the conceptual concept is still there: we want APIs to be able to accept "IReadableStream"s and process them, no matter whether they be ReadableQueueStreams or ReadableByteStreams. (Use case (1) in my previous post.) APIs that only accept ReadableByteStreams (use case (2)) can use the extra APIs. But we still want to enable ReadableByteStreams to be passed to IReadableStream consumers.

@tyoshino OK this is very interesting. Fairly minimal and gives more power. So it might win over simple async read()/readInto(). Let's flesh it out.

  // Controls how much data to pull. For queue-backed ones, this corresponds to
  // the high water mark. Set a positive integer to pull or 0 not to pull. It
  // depends on each implementation whether the size of the positive integer is
  // interpreted or not. Initially set to 0.

Can we tie the value to the stream's notion of "size", as governed by the strategy? So, we still let the stream creator judge size of objects. But now we let the stream consumer set the high water mark.

  // Feeds an ArrayBuffer with region specification. The next pull is done with
 // the specified buffer. If window has ever been set to non 0, throws.
 feedBuffer(uint8Array)

Is this supposed to take a "region specification" (= offset, count, I think?)? Or does it reuse the window that was set? If so maybe it still needs to take an offset? Or are you using Uint8Array instead of ArrayBuffer specifically in order to get a region specification built in? If so that's interesting, not sure what I think of it...

  // Reads bytes available for synchronous reading, stores them into the given
 // Uint8Array and returns the number of bytes stored (can be less than the
 // size of the specified region).
 readInto(uint8Array)

How does this interact with the buffers it was fed? I am unsure whether readInto is actually necessary at all given feedBuffer() + read(). See https://github.com/whatwg/streams/issues/253#issuecomment-73929164.

Other big questions:


Separate question, @tyoshino: in https://github.com/whatwg/streams/pull/275#issuecomment-73470112 and https://github.com/whatwg/streams/issues/253#issuecomment-74052102 you said you think read(2)-backed ReadableByteStreams will be backed by a one-element queue. I don't quite understand why this would be the case? It seems better to have readInto() (or, maybe, feedArrayBuffer()) directly call to read(2), instead of having the queue.

domenic commented 9 years ago

I created a gist that uses the (all), (chunkwise), and (ping-pong) use cases to demonstrate how different APIs work. So far I have async readInto and setAllocator. Please see https://gist.github.com/domenic/e251e37a300e51c5321f. It is a much-expanded version of my above comment, and demonstrates how with setAllocator you actually need pause()/resume(). I would love to see a version that demonstrates how your latest design would work @tyoshino.

tyoshino commented 9 years ago

@domenic

Can we tie the value to the stream's notion of "size", as governed by the strategy? So, we still let the stream creator judge size of objects. But now we let the stream consumer set the high water mark.

Yes, that's what I was thinking. window adjusts high water mark when there's a strategy and queue behind the stream.

Is this supposed to take a "region specification" (= offset, count, I think?)? Or does it reuse the window that was set?

As it's specified to throw when window has ever been set to 0, I made feedBuffer and window exclusive.

If so maybe it still needs to take an offset? Or are you using Uint8Array instead of ArrayBuffer specifically in order to get a region specification built in? If so that's interesting, not sure what I think of it...

So, right. I chose to represent region via the offset and size info held by the Uint8Array object. I don't have strong opinion about this. Separate topic. Sorry for confusing.

How does this interact with the buffers it was fed? I am unsure whether readInto is actually necessary at all given feedBuffer() + read(). See #253 (comment).

Right. Sorry it was just not brushed up well. I'll summarize discussion I had with Yutaka and read your gist to republish our current ideas.

tyoshino commented 9 years ago

Domenic,

Separate question, @tyoshino: in #275 (comment) and #253 (comment) you said you think read(2)-backed ReadableByteStreams will be backed by a one-element queue. I don't quite understand why this would be the case? It seems better to have readInto() (or, maybe, feedArrayBuffer()) directly call to read(2), instead of having the queue.

In the first comment, I introduced .amountReadable for I/Os that can tell us how many bytes can be read. I didn't mean that we should call read(2) in advance and determine how many bytes can be read. I think for read(2) backed stream, I'd return undefined on .amountReadable call.

In the second comment, I tried to unify interface for read(2) backed one and fread(3) backed one by using only one method feedBuffer(), and I said feedBuffer() will put the given buffer to a queue. We can make the ReadableStream's state "readable" synchronous to feedBuffer() call by completing read(2) immediately after the push of the fed buffer to the queue. So, this is equivalent to readInto() in terms of efficiency (though cumbersome that we need two steps. readInto vs. feedBuffer+read).

tyoshino commented 9 years ago

Domenic,

Here's ByteSource version of use case study (https://github.com/whatwg/streams/issues/253#issuecomment-74765051). https://github.com/tyoshino/streams_integration/blob/master/ByteSource.md (merged into PR #287)

tyoshino commented 9 years ago

BTW, the async readInto interface looks very close to the WritableStream interface. I'm trying to build something based on that in parallel.

tyoshino commented 9 years ago

WIP, but here's the repo of the attempt. https://github.com/tyoshino/streams_integration/blob/master/reference-implementation/operation-stream.js (see the next message)

calvaris commented 9 years ago

As suggested by Domenic, I read this whole thread and I still have to digest it because I still haven't thought about the possible implications of the implementations in WebKit, which is the one that I am doing.

Some questions remain open though (maybe I read too fast and skipped some important matters and I misunderstood the whole thing). I think the introduction of ReadableByteStream, though logical because of efficiency reasons, might disrupt a bit the idea of having a common interface to read data and specially the way we would pipe that data to other streams. Would we need to create WritableByteStreams and would they operate only with ReadableByteStreams or will we just let the loosily typed JavaScript do its magic and let a common WritableStream deal with the ArrayBuffer coming from a ReadableByteStream and let the garbage collector act after that? I think I require a bit of clarification for my understanding here.

About the read()+ready() vs the promise based read() implementation, I have no preference, both make sense to me (though it is true we would have to reimplement the second option)

tyoshino commented 9 years ago

Added network read example to operation stream and put them into a PR: https://github.com/whatwg/streams/pull/287

yutakahirano commented 9 years ago

@domenic thanks for the clarification.

From a ReadableByteStream POV, it is desirable to hide uncommitted data (a.k.a dirty read). So I think we should detach the given buffer. ArrayBuffer.transfer has been proposed to ES7, so we can use it when it gets stable enough. What do you think?

If we do so, the buffer returned by read is not the buffer we gave, regardless whether we use sync read + pull or async read.

tyoshino commented 9 years ago

I've rewritten the concept introduced in ByteSource.md (https://github.com/whatwg/streams/issues/253#issuecomment-74871167) and merged into https://github.com/whatwg/streams/pull/287

Domenic: I'll take a look at your proposal soon.

domenic commented 9 years ago

@yutakahirano the detaching thing seemed pretty good. But then I realized that I'm no longer sure how to read the entire stream into one big array buffer. E.g. as in this gist. Maybe this is just another missing ES7 API, to create an array buffer from many smaller array buffers?

domenic commented 9 years ago

Given the detached consideration I want to look in to feedArrayBuffer/setAllocator more. My gist above shows setAllocator causing some unanticipated problems but they are probably solvable, or alternately I don't think feedArrayBuffer has those problems. I will spend time on that next and put async read aside until/unless we have a good answer for detaching + getReader + async read.

tyoshino commented 9 years ago

Re: https://github.com/whatwg/streams/issues/253#issuecomment-75790533 by Domenic,

Good point. If we can get the new ArrayBuffer reference but pointing to the same underlying buffer on completion, it's ok?

let newBuffer = await rbs.readInto(ab, 0, ab.byteLength);
// here, ab is neutered.
return newBuffer;

If readInto() may return when it finishes writing partial data,

let result = await rbs.readInto(ab, 0, ab.byteLength);
// here, ab is neutered.
return result;

result.ab is a new reference. result.size is the number of bytes written.

Or, adopt ArrayBufferView based approach,

let newView = await rbs.readInto(view);
// here, view is neutered.
return newView;

newView specifies the region to which data have been written. newView and view are different references, and newView.buffer and view.buffer are also different references. view and view.buffer are both neutered on readInto() call.

tyoshino commented 9 years ago

These revised versions of readInto() correspond to WritableOperationStream.write() in my operation stream idea.

domenic commented 9 years ago

result.ab is a new reference. result.size is the number of bytes written.

This shouldn't be necessary since we can make newBuffer be of the partial size using that functionality of transfer, I think? Or maybe the result.ab/result.size design would allow concatenating into one large buffer, hmm.

But in general here are my thoughts.

let newBuffer = await rbs.readInto(ab, 0, ab.byteLength);
// here, ab is neutered.
return newBuffer;

This code is no longer nice, I think. That is, it no longer says what it means. Well, OK, first the name readInto is now wrong, so let's change that:

let newBuffer = await rbs.readBytes(ab, 0, ab.byteLength);
// here, ab is neutered.
return newBuffer;

It's still not very good however. Why are we giving ab to readBytes if it's just going to give us a new object anyway? What was the point of us creating it in the first place---why not just let readBytes create the array buffer fresh?

On the other hand, I think feedArrayBuffer is more do-what-I-mean:

rbs.feedArrayBuffer(ab); // use this for next read of underlying source.
// since we fed it to the stream, it's not ours anymore, it just gets detached.

await rbs.ready; // wait until the next read completes (using any fed array buffers)

let newBuffer = rbs.read();

However I haven't thought through feedArrayBuffer very far yet in this new detaching world. I want to write some sample code that explains how to use it for the cases in my gist. Maybe it has the same limitations as readInto and doesn't give us anything new besides being a bit more "say what I mean." I will find that out tomorrow :). I will also look into the Promise<{ ab, size }> idea since that does seem promising now that I think about it.

tyoshino commented 9 years ago

You're concerned with intuitiveness of the code, right? OK, I'll review the approaches from that point of view, too.

yutakahirano commented 9 years ago

https://github.com/whatwg/streams/issues/253#issuecomment-75904101

With detaching, we should return a buffer[view] as @tyoshino says. And I agree that it is somewhat confusing that readInto looks like (async) fread but the user cannot use the given buffer.

tyoshino commented 9 years ago

Issue: Supporting platform with only async reading I/O without losing efficiency

Started from https://github.com/whatwg/streams/issues/253#issuecomment-73618925

Plans:

Subissue: How the filled ArrayBuffer should be represented

https://github.com/whatwg/streams/issues/289#issuecomment-76686830

Plans:

Issue: Detaching ArrayBuffer

ArrayBuffers should be detached at the borders between user code and underlying platform code appropriately.

Started from https://github.com/whatwg/streams/issues/253#issuecomment-75197506

Issue: Locking

We should give clear semantics to locking infrastructure.

Summarizing discussion started from https://github.com/whatwg/streams/pull/288#issuecomment-75747164 and Yutaka's comment https://github.com/whatwg/streams/pull/288#issuecomment-75787952

For async read() plan, we have the following options:

  1. s.getReader() fails if there's any pending read() operation.
  2. s.getReader() causes any pending read() operation to stay pending until release, at which time it re-polls the queue
  3. s.getReader() doesn't affect pending read() operations.

For pull() + sync read(),

  1. TBA

Issue: Simpleness of manual reading

Copied from https://github.com/whatwg/streams/pull/288#issuecomment-76885522

pull() + sync read() (always 1:1 correspondence) pull() + sync read() (no 1:1 correspondence on object stream) async read() sync read() + OperationStatus
Handling read data No benefit compared to async read() (object stream) for-loop to repeat read() until state becomes waiting promise based. one microtask for each chunk for-loop + readableness check as long as status.state is "complete". Otherwise, need to wait using status.ready. Cumbersome. Complex.
Correspondence between pull() and read() clear Less clear clear clear
tyoshino commented 9 years ago

I now think "correspondence between pull() and read()" is less important compared to the other perspectives.

I'm leaning toward making getReader() succeed only when there's no pending read(). Maybe I'd represent that (lockable or not) using some attribute on the stream. After thinking carefully, I don't think there's any big benefit to allow acquisition of lock while there's pending read(). @yutakahirano