Open chrisprobst opened 10 years ago
Currently, ChunkedInput.nextChunk()
must be a non-blocking operation. When there's no next chunk available, the ChankedInput
is expected to call ChunkedWriteHandler.resumeTransfer()
to resume the transfer.
However, I like the idea of making nextChunk()
asynchronous (i.e. making it return a Future<CHUNK>
), as well as adopting the pipe abstraction.
If you are interested in contributing it to Netty, please go ahead. :-)
Alright then, I will have a look at this during semester break, will be a nice project =).
Nice! Looking forward to it :)
Am 17.10.2014 um 08:17 schrieb Christopher Probst notifications@github.com:
Alright then, I will have a look at this during semester break, will be a nice project =).
— Reply to this email directly or view it on GitHub.
Ok, I finally found some time. I think, I'll start with making the ChunkedWriteHandler async, since writing files asynchronously seems to be more useful for now.
We should discuss the best way of implementing it. I see two options:
Create an abstract class implementing ChunkedInput (lets call it ChunkedAsyncInput), which wraps the mechanics of using a future. An abstract implementation might look like this:
public abstract class ChunkedAsyncInput<B> implements ChunkedInput<B> {
protected abstract Future<B> readChunkAsync(ChannelHandlerContext channelHandlerContext);
private Future<B> pendingChunk;
@Override
public B readChunk(final ChannelHandlerContext ctx) throws Exception {
if (isEndOfInput() || (pendingChunk != null && !pendingChunk.isDone())) {
return null;
}
if (pendingChunk != null) {
Future<B> tmp = pendingChunk;
pendingChunk = null;
if(tmp.isSuccess()) {
return tmp.get();
} else if(tmp.isCancelled()) {
throw new CancellationException();
} else {
throw new IOException(tmp.cause());
}
} else {
pendingChunk = readChunkAsync(ctx);
pendingChunk.addListener(new GenericFutureListener<Future<? super B>>() {
@Override
public void operationComplete(Future<? super B> future) throws Exception {
((ChunkedWriteHandler) ctx.handler()).resumeTransfer();
}
});
return null;
}
}
}
This way, the ChunkedWriteHandler could be left unmodified, which is good, since a lot of testing already went into this class, I guess.
Create an async sibling of ChunkedInput, which operates completely asynchronously:
public interface ChunkedAsyncInput<B> {
Future<Boolean> isEndOfInput();
Future<B> readChunk(ChannelHandlerContext ctx);
// Could be left unmodified, since these methods cannot be made async using stock jdk
void close() throws Exception;
long length();
long progress();
}
In this case, we must extend the ChunkedWriteHandler to support the ChunkedAsyncInput interface. So some kind of:
if (pendingMessage instanceof ChunkedInput) { ... }
else if (pendingMessage instanceof ChunkedAsyncInput) { ... }
Not as trivial as the first option, but probably cleaner.
As some final note: The method isEndOfInput() kind of causes some trouble, since it might read or not, in order to verify the end of a stream.
Any thoughts about this issue ?
Btw: I've setup netty already as a project + all your contribution rules. So making a pull request would be not much work for me ;)
EDIT: As a final note: If we use option 2, we could reimplement the synchronous solution by using something line return Future.of(byteBufInstance);. In other words: Doing synchronous stuff, which looks asynchronous, is easy. The other way is much harder =P.
@chrisprobst, did you ever get this implemented? This would be useful for a project I'm working on. How to properly use .resumeTransfer() is not at all clear to me, and it would be a lot nicer if this all worked asynchronously with no intervention.
PS I see a LazyChunkedInput in Play 1.x that might be helpful: https://github.com/playframework/play1/blob/master/framework/src/play/server/PlayHandler.java#L1024
I did never get a response for my proposal. Im not wokring with netty anymore so i have no plans to implement this currently. Sry.
Hello.
In a POC I tried to wite an IMAP server implementing "Streaming" for the IMAP responses.
Having such an abstraction would be ideal to me.
Hi,
the ChunkedWriteHandler lets one transfer huge data sets without getting an OutOfMemoryException or something like that.
But the ChunkedInput has a blocking interface for getting the next chunk. If the ChunkedInput has a slow source, this blocks all channels served by the same event-loop.
Are there any plans for implementing some sort of unifying Pipe-abstraction, like node.js's pipe() ? This would also relate to piping channels to other channels, while respecting flow-control. I know that vert.x does provide something similar, but I think, that this is something that really belongs to the core of netty.
A couple of weeks/months ago I already asked for AsynchronousChannel support in Netty 4+5. If this feature would be re-integrated, it would be very easy to kill two birds with one shot. Because then file access could be implemented as a netty channel and the Pipe-abstraction could be implemented using a ChannelHandler which fetches input from one channel and pushes it into the other one.
What do you think about those kinds of changes ?