Open benjchristensen opened 9 years ago
tldr; A user should be able to use only Reactive IPC to run a server or client.
My perspective is that one should be able to use Reactive IPC directly to create and run a client or server. Reactive IPC should not just be an SPI that is useless by itself. In other words, without further abstractions on top I should be able to use Reactive IPC APIs, transports and Reactive Streams callbacks.
For example, I should be able to do the following:
Publisher
/Subscriber
On the client side I would similarly expect to be able to:
Publisher
/Subscriber
Layers on top are to expose different stream compositions approaches (such as RxJava), usability opinions, etc.
For example, I want to be able to do similar to what RxNetty offers (ignoring the API design, just the functionality):
ReactiveIPC.createHttpServer(8999, (req, resp) -> resp.writeStringAndFlush("Hello")).start();
Further opinions would all layer on top of these basic capabilities. For example, Netlix Ribbon and Karyon would hide all of this from users, as would Spring Boot and Spring Web, all of which would have different opinions of how to start servers or execute requests.
I think writeAndFlush should map on the Reactive Stream contract directly:
On the input side (registering a Subscriber to a request Publisher): Close -> Subscription.cancel() Request N IO Read -> Subscription.request(long n), Long.MAX -> unbounded read Input Ready -> Subscriber.onSubscribe Input Read -> Subscriber.onNext Input Error -> Subscriber.onError Input Closed (reset by peer) -> Subscriber.onComplete
On the output side: Output Close -> might be cancelling upstream and so close connection Output ready to write N Data -> Subscription.request(long n), Long.MAX -> unbounded Output Start (possibly Headers) -> Subscriber.onSubscribe Output Write -> Subscriber.onNext Output Flush -> Subscriber.onComplete (use of windows for batch flush), cancel upstream Output Error -> Subscriber.onError
To get the discussion going, the initial commit I pushed adds just two abstractions to represent TCP communication: Connection
and ConnectionPublisher
. Since each protocol will have abstractions that represent its basic constructs I went with the TCP naming of "connection" rather than Netty's more general "channel" abstraction. But honestly I'm fine with either one and I don't really have a preference.
I did not implement the connection-level read
other than to simply use the pending value as an indication of whether to handle data or not. The initial commit is just a means to an end and the implementation code is really just to try and figure out what commonalities are going to exist that need to be in core.
I also added a Buffer
abstraction in core. I put it in the javadoc but it should be noted that a Buffer
in ripc terms does not mean just a variation on byte[]
or ByteBuffer
. The only (partial) implementation that exists right now is one based on Netty's ByteBuf
, but the Buffer
abstraction itself doesn't care what the underlying data actually is. It could be a Buffer<Frame>
or a Buffer<Pojo>
. This will provide simple read-and-convert functionality for common things like length-field-based encoding that uses Snappy or Zip compression, etc... that's going to be nearly identical for all use cases.
Thanks for submitting code ... will review and then discuss further.
I think writeAndFlush should map on the Reactive Stream contract directly:
Are you suggesting onComplete
flushing the writes?
Output Flush -> Subscriber.onComplete (use of windows for batch flush), cancel upstream
If yes, then this isn't addressing all the usescases. This would mean that all writes are buffered in the channel outbound buffer, till the stream completes. For infinite streams, never, for slow streams, causes buffer bloat.
We would atleast need a semantic like:
write(Observable<W> msgs, Func1<W, Boolean> flushSelector);
which gets a feedback from a flushSelector
about when to flush (invoked after every element emission). A simplistic version being flushOnEach
where flushSelector
always returns true
.
@jbrisbin I have questions about the implementation and let me create separate issues to have a focussed discussion on those different points.
Discussion to establish scope of client/server/protocol functionality for Reactive IPC.