Open Daxten opened 9 years ago
I agree that support for streams would be useful. Slick 3.0.0 may not be the best example as its operations return Future
s which are already supported by autowire.
It can also return a Publisher for Akka Streams => http://slick.typesafe.com/doc/3.0.0/dbio.html#streaming
I'm wondering too whether this is supported.
The word stream
only occurs once in Autowire's code base and there it's java.io.ObjectInputStream
. So I guess it's not implemented yet.
Yeah I would like to figure out how to support a Source[T: Pickleable] in my apis.
We would have to implement something like protobuffers (grpc) for that I guess
I can pickle everything to byte arrays using boopickle. But trying to define a pickler for a Source[Bytestring] is where I got hung up I think.
Yeah, the "default" abstractions around Autowire has the following requirements:
T
s into V
s, where T
is the input type and V
is a type of your choosingV
s over the wire somehow, as bytesV
on the other sideandV
s into T
s again. uPickle clearly doesn't support binary data well by default (tho you could pickle binary data into base64 if you wish, at a 30% data-size penalty) and BooPickle seems not to support streaming stuff by default.
But if you look at the specification, presumably you can't efficiently collapse a Source[T]
into a "flat" T
without buffering it all up in memory, but could you:
Source
structure and include it inside some kind of V
V
containing the streaming Source
over the network in an efficient/streaming wayV
with a streaming Source
inside, without needing to wait for the entire Source
to turn upV
-containing-streaming-Source
into a Source[T]
on the other endThat seems like something that should work, even without needing any changes to Autowire itself. You'd just need to choose a different data-structure for V
that isn't just a strict Js.Value
or ByteString
as uPickle or BooPickle
do, but some kind of composite data structure with both strict and streaming portions, with your own T: VWriter: VReader
to convert your T
s into your V
s and back again.
You could still use uPickle
or BooPickle
as part of your implementation of VWriter
and VReader
for the individual items or the strict parts, and will just need your own logic to deal with preserving streaming things and streaming them over the network
Discussions about marshalling streams in Akka
and Alpakka
The above idea is more generic because it proposes marshalling the Source
as opposed all the stream
data itself.
gRPC
has some design elements that could be adopted in autowire
, I think.
It uses stream
key word on *.proto files to define streaming APIs
It has the concept of transport
https://github.com/grpc/grpc-java#transport: in autowire
this is only implied.
It (gRPC
) also has a runtime lib, io.grpc
, that implements StreamObserver
.
Then, it compiles proto code like this
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
to this
/**
* <pre>
* A Bidirectional streaming RPC.
* Accepts a stream of RouteNotes sent while a route is being traversed,
* while receiving other RouteNotes (e.g. from other users).
* </pre>
*/
public io.grpc.stub.StreamObserver<io.grpc.examples.routeguide.RouteNote> routeChat(
io.grpc.stub.StreamObserver<io.grpc.examples.routeguide.RouteNote> responseObserver) {
return asyncUnimplementedStreamingCall(getRouteChatMethod(), responseObserver);
}
which then one can override to this
/**
* Receives a stream of message/location pairs, and responds with a stream of all previous
* messages at each of those locations.
*
* @param responseObserver an observer to receive the stream of previous messages.
* @return an observer to handle requested message/location pairs.
*/
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "routeChat cancelled");
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
@lihaoyi - thoughts on how welcome a design inspired from this would be in autowire
?
Wish transport
and something like StreamObserver
were explicit so that Ajax/HTTP and Kafka could be used to as implementations.
Also looking at https://github.com/cakesolutions/kafka-wire for inspiration.
is there a way (and or an example?) to stream a result through the autowire router? (for example with Slick 3.0.0 or any other reactive stream)