Open fzhinkin opened 1 year ago
Another option based on combining ktor + okio:
We found the key to okio working well in practice was the use of a single read/write methods taking a concrete buffer type (which is what allows the implementation to change owners rather than copy). The channel above is similar to ktor byte channels but goes through a concrete buffer type. Effectively it's one extra copy/move between buffers allowing a single producer and consumer to sync. The downside is it's a new type.
While good old blocking IO remains the winner when it comes to throughput, there are scenarios where one can't simply launch a new coroutine performing blocking IO. When dealing with thousands of connections, having a thread per connection (or a coroutine backed by a thread) may expose significant overhead. At the same time, there are already a plethora of non-blocking/asynchronous IO APIs users may need to integrate their apps with, so it seems reasonable to provide an abstraction expressing the asynchronous nature of such APIs.
Below are some thoughts and observations that directed the design of the proposed async API.
There are several approaches to the asynchronous IO API that I would split into two major categories:
The first category seems to provide a very convenient API by making all familiar synchronous IO operations asynchronous, but there is a price for this convenience.
From a user perspective, such APIs encourage writing code having many asynchronous calls to read or write small portions of data. In Kotlin, each suspendable function has a small overhead compared to regular synchronous functions, and when there are lots of suspendable calls, it might lead to worse performance. Here is a good discussion of that: https://github.com/square/okio/issues/814#issuecomment-735286404
From a library developer perspective, having an asynchronous API mirroring the synchronous API is a burden, as each of the synchronous functions probably requires an asynchronous counterpart and each of them has to be implemented, documented, tested, and supported.
On the contrary, asynchronous APIs providing only bulk operations reduce the scope of asynchronous code and encourage fetching as much data as needed asynchronously, but process it then synchronously. From a user perspective, it may lead to a better performance. From a developer perspective, it reduces the amount of code that needs to be written and maintained as a bulk API is usually much smaller and simpler.
Yet another observation regarding the data formats being actively used for data exchange is that the size of the whole message/packet/unit of data transfer is usually known in advance (for protocols having fixed-size frames), or explicitly encoded in a binary message (like in the BSON format), or the message consists of multiple chunks whose size is encoded in a header (like in the PNG image format, gRPC over HTTP2, or Thrift RPC framing transport), or the message is terminated by a special value (like null-terminated strings in a GZip header). So it's relatively easy to use bulk IO API with such formats: we need to fetch a header containing the payload size and then read the whole payload (that is obviously not the case for large messages, but for them, we will still know the exact number of bytes to read and can load the data in batches).
Described above was the main reasoning behind the proposed kotlinx-io
Async API.
As a foundation for the asynchronous IO, two interfaces mirroring their blocking counterparts in terms of naming and functionality are proposed:
public interface AsyncRawSink {
public suspend fun write(buffer: Buffer, bytesCount: Long)
public suspend fun flush()
public suspend fun close()
}
public interface AsyncRawSource : AutoCloseable {
public suspend fun readAtMostTo(buffer: Buffer, bytesCount: Long): Long
override fun close()
}
These interfaces aim to implement asynchronous sources and sinks with the same semantics as the synchronous RawSink and RawSource.
The API specifies neither when the readAtMostTo
/write
/any other suspending method may or should suspend,
nor what coroutine context it should use.
It's up to implementors to decide how their sinks/sources will behave.
The only recommendations are to use Dispatchers.IO
context when executing suspending operations
and to make implementations cancellation-friendly.
For the buffered sinks and sources, there is no one-to-one mapping between the synchronous and asynchronous API:
public class AsyncSource(private val source: AsyncRawSource) : AsyncRawSource {
public val buffer: Buffer
/**
* Throws an exception when the source is exhausted before fulfilling the predicate.
*/
public suspend fun await(until: AwaitPredicate): Unit
/**
* Returns `true` if the predicate was fulfilled, `false` otherwise.
*/
public suspend fun tryAwait(until: AwaitPredicate): Boolean
override suspend fun readAtMostTo(buffer: Buffer, bytesCount: Long): Long
override fun close()
}
public class AsyncSink(private val sink: AsyncRawSink) : AsyncRawSink {
public val buffer: Buffer = Buffer()
override suspend fun write(buffer: Buffer, bytesCount: Long)
override suspend fun flush()
override suspend fun close()
}
These classes don't provide the same future-rich interface as Sink or Source.
Instead, they only encapsulate a buffer and provide functions to asynchronously fill it with data
until a condition expressed using AwaitPredicate
is met, or flush the data to the underlying sink.
It assumed that the existing Buffer's API as well as all the existing extensions could be used for parsing the
data once it is fetched as well as use it to serialize a message that then will be sent to a sink.
AwaitPredicate
interface aimed to inspect the already received data and if more data is needed according
to a criterion, fetch it and check again:
public interface AwaitPredicate {
public suspend fun apply(buffer: Buffer, fetchMore: suspend () -> Boolean): Boolean
}
There will be some predefined predicates checking for the minimum number of bytes available, underlying source's exhaustion, and the presence of particular values in the fetched data.
In the simplest form, a predicate might look like this:
public class MinNumberOfBytesAvailable : AwaitPredicate {
override suspend fun apply(buffer: Buffer, fetchMore: suspend () -> Boolean): Boolean {
while (buffer.size < bytesCount && fetchMore()) { /* do nothing */ }
return buffer.size >= bytesCount
}
}
The buffer
is exactly the same buffer encapsulated by the AsyncSource
.
If the buffer already contains data fulfilling a predicate when await
or tryAwait
is called, then
it is assumed that no more data will be fetched from the underlying source (but it's, of course, up a predicate's implementation).
AsyncSource
/AsyncSink
encourage writing the code using suspending calls for batch operations, like reading a large chunk of data into the buffer or writing the whole message into the buffer before flushing it downstream. Such a code should have better performance characteristics compared to a code performing multiple suspending calls to read/write some data field-by-field, byte-by-byte. Nevertheless, it's still possible to perform multiple short reads if needed (for example, to parse a header with dynamic size to figure out how many bytes to read after that).
Here's how the BSON reading/writing example from the kotlinx-io-core
module will look with the async API:
suspend fun Message.toBson(sink: AsyncSink) {
val buffer = Buffer()
with (buffer) {
writeByte(0x9) // UTC-timestamp field
writeString("timestamp") // field name
writeByte(0)
writeLongLe(timestamp) // field value
writeByte(0x2) // string field
writeString("text") // field name
writeByte(0)
writeIntLe(text.utf8Size().toInt() + 1) // field value: length followed by the string
writeString(text)
writeByte(0)
writeByte(0) // end of BSON document
}
// Write document length and then its body
sink.buffer.writeIntLe(buffer.size.toInt() + 4)
buffer.transferTo(sink.buffer)
sink.flush()
}
suspend fun Message.Companion.fromBson(source: AsyncSource): Message {
source.await(AwaitPredicate.dataAvailable(4)) // check if the source contains length
val buffer = source.buffer
val length = buffer.readIntLe() - 4L
source.await(AwaitPredicate.dataAvailable(length)) // check if the source contains the whole message
fun readFieldName(source: Buffer): String {
val delimiterOffset = source.indexOf(0) // find offset of the 0-byte terminating the name
check(delimiterOffset >= 0) // indexOf return -1 if value not found
val fieldName = source.readString(delimiterOffset) // read the string until terminator
source.skip(1) // skip the terminator
return fieldName
}
// for simplicity, let's assume that the order of fields matches serialization order
var tag = buffer.readByte().toInt() // read the field type
check(tag == 0x9 && readFieldName(buffer) == "timestamp")
val timestamp = buffer.readLongLe() // read long value
tag = buffer.readByte().toInt()
check(tag == 0x2 && readFieldName(buffer) == "text")
val textLen = buffer.readIntLe() - 1L // read string length (it includes the terminator)
val text = buffer.readString(textLen) // read value
buffer.skip(1) // skip terminator
buffer.skip(1) // skip end of the document
return Message(timestamp, text)
}
The only suspending calls are bulk reading and flushing, while almost all the parsing and all the marshaling are done synchronously.
This is brilliant. Great design, easy to understand, and comprehensive examples. Yay! I can’t yet say how it’ll work in practice, but I’m optimistic.
One small nitpick! I think the Source.close function aught to suspend.
In OkHttp’s cache we have a Source implementation that writes its content to a file. When this source is closed, it closes both the upstream source and the file sink that now contains the cached data. That file close may need to flush, and that operation is suspending.
In Moshi + Retrofit we’ve found it’s efficient to start decoding a JSON response before the last byte is received.
I’m very interested in learning the fastest way to decode JSON with this API. You could put suspends everywhere, or you could put ‘em on the beginnings of objects, arrays, and strings, or you could defer decoding until the entire document is returned.
At Google, our thinking has largely been as follows: if asynchrony is necessary at all for I/O, then it is usually caused by RPCs and the like whose overhead exceeds the cost of memory management for buffers etc. As a result, we've adopted a thin wrapper around Flow<ByteString>
as our current primary abstraction for async I/O, with natural adapters for Flow<String>
for text. It has worked very smoothly so far, with https://github.com/Kotlin/kotlinx.coroutines/issues/3274 helping with more stateful transformation.
To be clear, this is not necessarily in opposition to the above API -- but I do think it's worth considering what sorts of scenarios we might be using async I/O in, and whether performance sacrifices in the API design might produce clearer code without significant cost relative to the overhead already happening.
Update on async IO API.
After processing the feedback, the API was slightly reworked. The main changes are:
closed
method is now suspendable for both AsyncRawSink
and AsyncRawSource
;closedAbruptly
method was added to both these interfaces and aimed for quick and possibly ungrateful termination;AsyncSink
class was removed as it does not add much value; a new extension function allowing to write into a newly created buffer and then send it into a AsyncRawSink
was added instead;AsyncSource.await
was renamed to awaitOrThrow
, AsyncSource.tryAwait
was renamed to await
and now returns Result<Boolean>
instead of Boolean
.AsyncSource
's ctor is now allowing to specify hint for how many bytes need to be fetched if a predicate will request it.public interface AsyncRawSink {
public suspend fun write(source: Buffer, byteCount: Long)
public suspend fun flush()
public suspend fun close()
public fun closeAbruptly()
}
public interface AsyncRawSource {
public suspend fun readAtMostTo(sink: Buffer, byteCount: Long): Long
public suspend fun close()
public fun closeAbruptly()
}
public class AsyncSource(private val source: AsyncRawSource, private val fetchHint: Long = 8192L) : AsyncRawSource {
public val buffer: Buffer
public suspend fun awaitOrThrow(until: AwaitPredicate)
public suspend fun await(until: AwaitPredicate): Result<Boolean>
override suspend fun readAtMostTo(sink: Buffer, byteCount: Long): Long
override fun closeAbruptly()
override suspend fun close()
}
public suspend fun AsyncRawSink.writeWithBuffer(block: Buffer.() -> Unit)
public interface AwaitPredicate {
public suspend fun apply(buffer: Buffer, fetchMore: suspend () -> Boolean): Boolean
public companion object {
public fun exhausted(): AwaitPredicate
public fun available(bytes: Long): AwaitPredicate
public fun contains(expectedValue: Byte, maxLookAhead: Long = Long.MAX_VALUE): AwaitPredicate
public fun contains(expectedValue: ByteString, maxLookAhead: Long = Long.MAX_VALUE): AwaitPredicate
public fun newLine(maxLookAhead: Long = Long.MAX_VALUE): AwaitPredicate
}
}
Some questions arise:
await
and awaitOrThrow
regarding when they may/should throw or return different results?receive/receviveCatching
sounds better here (like await
/awaitCatching
) - so by default, shortest function is safer for end user of a library (as it will throw in exceptional situation), and if you need to handle error differently you will use function with longer name - but I think it's more of a preferenceuse
functions for Async*
parts or some kind of baseAsyncCloseable
interface?@whyoleg,
can you describe a contract of await and awaitOrThrow regarding when they may/should throw or return different results?
await
doesn't throw exceptions but returns a result that may throw something. If a predicate was fulfilled, the result contains true
, if it could not be fulfilled the result contains false
. In case of an error occurs while evaluating the predicate (IllegalStateException
on an attempt to read from a closed underlying source, CancellationException
if a coroutine was cancelled evaluating the predicate,IOException
if some other IO-related error happened while reading data), it will be wrapped into the result.
awaitOrThrow
throws IOException if a predicate cannot be fulfilled and all aforementioned exceptions in other cases.
Also for me naming similar to coroutine channels with receive/receviveCatching sounds better here (like await/awaitCatching) - so by default, shortest function is safer for end user of a library (as it will throw in exceptional situation), and if you need to handle error differently you will use function with longer name - but I think it's more of a preference
While receiveCatching
is effectively try-catch around receive
, awaitCatching
will return false
for unfulfilled predicate and await
will throw an exception. That's why I didn't adopt the naming from coroutines.
as far as I understand, there will be no out of the box use functions for Async* parts or some kind of base AsyncCloseable interface?
There will be two extension functions, AsyncRawSource.use
and AsyncRawSink.use
. However, a single base interface could be introduced, as you suggested.
For suspend fun close()
(and fun closeAbruptly()
), should there be an AsyncCloseable
interface similar to Closeable
(or AutoCloseable
)?
And less related but still somewhat relevant, is there a plan to introduce an (async) type similar to SeekableByteChannel
(or AsynchronousFileChannel
), like what okio
has done by Source
for InputStream
and Sink
for OutputStream
? Or will the plan be not introducing that, and keep the design in okio
that people should create a dedicated concrete class like FileHandle
without an interface for that?
(Asking because I have an implementation of Java 8 NIO File API in my file manager Material Files (including local, archive and some network FS), and I'm also looking to create a more generic Kotlin file system API from that.)
For suspend fun close() (and fun closeAbruptly()), should there be an AsyncCloseable interface similar to Closeable (or AutoCloseable)?
Such an interface, most likely, will be introduced.
And less related but still somewhat relevant, is there a plan to introduce an (async) type similar to SeekableByteChannel (or AsynchronousFileChannel), like what okio has done by Source for InputStream and Sink for OutputStream?
That's a good question, thanks!
It was not previously discussed, but given that an introduction of seekable Sources/Sinks does not work well (there's a nice design overview in corresponding Okio issue: https://github.com/square/okio/issues/889), most likely there will be an extra class, like AsyncFileHandle
.
Thanks for the link to the discussion! I also found that the FileHandle
approach is better because for network file systems there usually isn't a concept of tracked file offset in the protocols, and simply exposing the random access methods are much simpler, more powerful and less errorprone.
Another question - why do we need the closeAbruptly()
function? In the synchronous world we may have a close()
method that flushes any caches blockingly (and close()
can throw IOException
), but there usually isn't a clsoeAbruptly()
for that either - why do we need to add it when we convert the blocking method to a suspend function?
Another question - why do we need the closeAbruptly() function?
The close
function (on both AsyncRawSink
and AsyncRawSource
) assumes graceful termination, with flushing in case of Sink
.
Sometimes, IO operations should be terminated immediately. For example, it might be the case when there is a coroutine/thread controlling the IO-session’s state, an error condition was detected and now all in-flight IO operations need to be terminated immediately. close
won’t work here, especially in the case of sinks as it performs graceful termination (and may suspend).
So it's aimed to immediately cancel/terminate all pending/suspended operations without flushing, and close the sink/source.
The close function (on both AsyncRawSink and AsyncRawSource) assumes graceful termination, with flushing in case of Sink.
I understand this part. But my question was, if a class implementing the original blocking Closeable.close()
was flushing upon close, it wasn't offering an alternative closeAbruptly()
that closes without flashing either - so I was wondering why we should provide such capability when we define the async interface.
Is the use cases for it worth requiring everyone to put effort into implementing both methods when they implement this AsyncCloseable
, or should we just keep the status quo of Closeable
only offering one close()
, and maybe introduce another optional interface for closeAbruptly()
?
In the context of this proposal, the fundamental difference between synchronous and asynchronous sinks/sources I see is that operations on the latter may require some no-trivial interactions between multiple agents, some of which may be non-local. It applies not only to read/write operations, but also to close
(thus, it's suspenable too).
That's, in general, may not be the case if we're talking about asynchronous file I/O, but that's the case for asynchronous sinks/sources over logical HTTP/2.0 or WebSocket channels, where a graceful close may imply notifying a reactor thread, that then has to mux the logical stream into a "physical" one, and then send it and finally notify the close
-caller that operation is finished. In scenarios where such a sink/source needs to be closed quickly/in some predictable amount of time (for instance, because an error state was detected and we wan't to terminate processing), calling a suspendable close
is not an option as it may remain suspended indefinitely long.
if a class implementing the original blocking
Closeable.close()
was flushing upon close, it wasn't offering an alternativecloseAbruptly()
Nobody asked for it so far, thus it's not a part of the interface. :)
There were some thought on auditing it to existing blocking interfaces, but for regular blocking sockets and files, there's usually only one way to close it, by calling a close(fd)
, so close
and closeAbruptly
on such a RawSink
/RawSource
would be identical.
Is the use cases for it worth requiring everyone to put effort into implementing both methods when they implement this
AsyncCloseable
, or should we just keep the status quo ofCloseable
only offering oneclose()
, and maybe introduce another optional interface forcloseAbruptly()
?
We can have a default closeAbruptly
implementation calling runBlocking { close() }
to ease the development of sources/sinks where these operations effectively do the same, but it's unlikely that an extra interface will be introduced to decouple suspend close()
and closeAbruptly()
.
Ktor migrated from okio to kotlinx-io but the migration resulted in them only using Sink/Source internally and on the interface expose a non-blocking API (ByteReadChannel
) for byte streams. (see also https://youtrack.jetbrains.com/issue/KTOR-7224/Unclear-kotlinx-io-based-interface-v3.0.0-beta2 )
If something like that was provided in the common io library (i.e. kotlinx-io), it would allow parser libraries such as kotlinx-serialization-json to offer an API for consuming async streams (use case e.g. parsing a really large JSON while the bytes are being received over the wire).
kotlinx-io
provides only synchronous API at the moment. In some scenarios, an async API could be more convenient and useful, so it is worth supporting it.There is no particular plan right now, and this issue rather claims the intent and provides a place to log different possible approaches to provide async support.
Some issues/discussions and libraries showing how the problem could be approached (or how and why it shouldn't):