reactive-streams / reactive-streams-io

Reactive Streams Network Protocol
reactive-streams.io
Other
183 stars 20 forks source link

Goals and Motivation #1

Open benjchristensen opened 9 years ago

benjchristensen commented 9 years ago

Due to the successful collaborations on Reactive Streams for the JVM and community involvement in Reactive Extensions (RxJava and friends) I want to pursue greater polyglot support of the Reactive Stream semantics. This includes both language specific interfaces and over-the-network protocols. I propose that we collaborate as a community to achieve the use cases I list below, along with any others I'm missing that we derive together.

In full disclosure, personally I am building systems that need polyglot, stream-oriented network interaction, primarily between Java and JavaScript in the near future. I prefer to collaborate and design the solution openly rather than reinvent yet another competing solution. I am unsatisfied with current solutions or unaware of better ones. Teams at Netflix are creating custom one-off solutions based on ReactiveX/Reactive-Stream semantics and I'd prefer we not do this in isolation. Selfishly I want the input of people far better at this domain than I am since I am out of my league in defining network protocols and interfaces in non-Java languages. I also want to avoid NIH (not-invented-here) and solve these problems across organizations and companies since systems I'm building will most likely outlive my involvement in them and community support and involvement in core, foundational networking and messaging layers is far better than home grown solutions in the long run. I expect this to slow me down in the near term, but greatly accelerate and improve the medium and long-term accomplishments, and significantly improve the final outcome.

The timelines I'm hoping for would be functioning prototypes and protocol drafts in a few months, with release candidates in 6-9 months (Q3/Q4-2015) and a GA release in early 2016. I and the team I work with at Netflix intend on building our systems along these timelines to be proving out what we design here.

Additionally, I hope for collaboration across expertise domains to allow for debate, critiques, ideas and solutions that would not occur while staying in our individual silos.

Use Cases

The intent is to enable Reactive Stream semantics for async, stream-oriented IO supporting backpressure and cancelation.

On top of protocols such as TCP, WebSockets and possibly HTTP/2 it would allow bi-directional, multiplexed communication for these semantics:

Usage patterns would include:

Scalar Request, Scalar Response

This would behave similarly to RPC/IPC calls.

For example:

This would behave similarly to HTTP Server-Sent-Events.

For example:

Or with request(n) and unsubscribe on an infinite stream:

This would behave more like raw TCP or WebSockets.

The following example is very poor, but representative of desire for messaging UP with event propagation DOWN across multiple subscriptions.

Intended outcomes of this pursuit are:

1) Discover there is already a solution for this and we can shut this down and use it. 2) Decide we can't agree and we go off and build our own custom things. 3) We determine this is a useful and newish thing, collaborate and build the above.

Artifacts

Following are artifacts envisioned from this collaboration during this first phase.

Network Protocol

This is expected as purely a network protocol. Due to my ignorance I can't specify more, but I expect variations for:

Ultimately the desire is for protocols to be defined that can work on top of TCP, HTTP/1, HTTP/2, WebSockets and possibly others like UDP.

Java Interfaces and Reference Implementation

Java interfaces for exposing the various use cases using Reactive Streams interfaces would be very powerful to allow a standard interop for Reactive Stream IO.

It is not expected to have anything more than interfaces defined, but a reference implementation with unit tests to prove functionality should be included.

JavaScript Interfaces and Reference Implementation

Similar desire as for Java above.

Network TCK

Along with the network protocol I would expect a test suite to validate implementations.

Moving Forward

As a first step I'd like to determine if there is sufficient interest and that this is not insane, completely naive and wrong, or reinventing something that already exists.

If we get through that part, I'll work with you all to create more concrete Github issues to start designing and making this happen.

benjchristensen commented 9 years ago

cc @reactive-streams/contributors

benjchristensen commented 9 years ago

cc @blesh as this relates to the discussion about involving JS in Reactive Streams https://github.com/reactive-streams/reactive-streams-jvm/issues/45#issuecomment-75654936

benjchristensen commented 9 years ago

Also related to this is the newly created repo for collaborating on Reactive Streams for Javascript: https://github.com/reactive-streams/reactive-streams-js

tmontgomery commented 9 years ago

I am in. I'll help!

jbrisbin commented 9 years ago

:+1:

maniksurtani commented 9 years ago

Interesting. Count me in.

benlesh commented 9 years ago

In full disclosure, personally I am building systems that need polyglot, stream-oriented network interaction, primarily between Java and JavaScript in the near future. I prefer to collaborate and design the solution openly rather than reinvent yet another competing solution.

Ditto... but completely different app.

We have a primitive implementation in our project at Netflix, (unrelated to @benjchristensen's project), but it's very primitive and I don't think it's directly useable in a generic way in it's current form.

Count me in.

JakeWharton commented 9 years ago

binary and text (for example, into JavaScript apps it may be valuable to support text/JSON whereas interprocess Java/C/Go/etc would benefit from binary)

Be very careful with this distinction, and, in fact, I think you want a regular content type mechanism instead. One of the things WebSockets got horribly wrong is this arbitrary separation of text and binary frames with no additional metadata. I would stick to just saying data and allowing the request to negotiate the format of that data.

jbrisbin commented 9 years ago

I agree with @JakeWharton about treating content as simply data and figuring out what to do with it based on the negotiated Content-Type. There's no benefit IMO to designating some payloads String and others SomethingElse[]. The need to do charset decoding in many situations means one must rely on the full power of the Content-Type.

I don't just main the simple type and subtype, either. I mean the full capability with x-custom+json and quality factors, etc...

benjchristensen commented 9 years ago

I would stick to just saying data and allowing the request to negotiate the format of that data.

Great feedback @JakeWharton and thanks for getting involved,

danarmak commented 9 years ago

I think to begin with, the scope of the project should be better defined. At one extreme, it could be a very barebones protocol that doesn't do anything beyond RS semantics, message framing and maybe multiplexing. At the other extreme, it could know about Content-Types, bidi-ness negotiation, publisher discovery and creation (e.g. 'give me a publisher that reads this file'), pushing events without subscriptions, optional features that depend on the underlying protocol...

benjchristensen commented 9 years ago

@tmontgomery Todd, how do you recommend we start defining this? I think you're the right one to start defining the network protocol. I opened https://github.com/reactive-streams/reactive-streams-io/issues/2 for that discussion to begin and would appreciate you taking a lead role in that.

benjchristensen commented 9 years ago

@danarmak What do you recommend the scope be define as to achieve the types of use cases I mentioned?

danarmak commented 9 years ago

@benjchristensen will reply in #3

tmontgomery commented 9 years ago

For a protocol, there is a nice tool that can be used to specify behavior and test against implementations. K3PO.

I.e. if you want a TCK-like test kit, specify your k3po scripts and any implementation can test against it.

MarkusJais commented 9 years ago

I like the idea of including TCP as a base protocol. Not everything has to be built on top of HTTP :-)

experquisite commented 9 years ago

I'm psyched, I'll be following this closely, if not helping out. I have been thinking about my need to do this in Scala; I was planning on using scodec to serialize my messages and then persist/replicate my reactive streams using Chronicle or perhaps Aeron. However, it would be much more convenient if subscription and back-pressure were all integrated.

moredip commented 9 years ago

Would it make sense to build Rx stream semantics on top of a well-established base protocol, rather than starting from first principles?

ZeroMQ springs to mind. It's well established and seems to have good language support. An issue is that there doesn't seem to be any support for ZeroMQ over HTTP, and I'm not sure how baked the ZeroMQ over WebSocket spec is.

jbrisbin commented 9 years ago

@moredip I agree with you. When you look at the basic interaction patterns of TCP communication, they all fall into one of the categories implemented in ZMQ. IMO having PUSH/PULL and REQ/REP embedded at a level higher than the underlying transport library would make using those patterns much easier in composition libraries.

Someone will inevitably want to use REQ/REP in TCP using the Netty transport layer, so why not encode that interaction in a reusable way that is Reactive?

moredip commented 9 years ago

at a minimum, perhaps building a reference protocol implementation on top of ZeroMQ would be a cheap way to flush out some ideas and/or get something concrete where some language binding APIs could be played with.

jbrisbin commented 9 years ago

I created an issue at reactive-ipc-jvm [1] to suggest ZeroMQ as a transport implementation in the Reactive IPC kernel. I think working backwards from the implementation there would suggest the appropriate ways of exposing those patterns in a more consumable way via RS.IO.

[1] - https://github.com/reactive-ipc/reactive-ipc-jvm/issues/12

danarmak commented 9 years ago

ZMQ has some wonderful features: disconnected sockets (i.e. transparent recovery from underlying transport errors), framing (atomic message delivery), and transport independence. The first one would be a lot of work to specify and implement ourselves, and the second makes our protocol simpler.

However, ZMQ also leaves a lot of freedom for specifying e.g. the reliability semantics (queueing and blocking). I tend to think the RS.io protocol should standardize these as far as possible (and I hope that's very far), otherwise the spec will become either very complex or fragmented.

@jbrisbin how do you see PUSH/PULL or REQ/REP being used for an RS abstraction? Naively, it seems to me that only an async bidi message stream like DEALER/DEALER would work. With PUSH/PULL you'd need a separate socket for each direction. But I don't have a lot of experience with zmq patterns, so maybe I'm missing something.

pidster commented 9 years ago

Per @benjchristensen's original post, that explored the intended behaviour, would you consider that logical functions detailed in the 'The Scalability Protocols' detailed on the Nanomsg homepage are of interest?

The communication patterns, also called "scalability protocols", are basic blocks for building distributed systems. By combining them you can create a vast array of distributed applications. The following scalability protocols are currently available:

PAIR - simple one-to-one communication BUS - simple many-to-many communication REQREP - allows to build clusters of stateless services to process user requests PUBSUB - distributes messages to large sets of interested subscribers PIPELINE - aggregates messages from multiple sources and load balances them among many destinations SURVEY - allows to query state of multiple applications in a single go

Scalability protocols are layered on top of the transport layer in the network stack. At the moment, the nanomsg library supports the following transports mechanisms:

INPROC - transport within a process (between threads, modules etc.) IPC - transport between processes on a single machine TCP - network transport via TCP"

See also: Nanomsg vs ZeroMQ.

Offering a Reactive-based implementation of these would be interesting, because it would enable higher level functionality to be assembled on top, e.g. HTTP on top of REQREP, or membership functions over SURVEY.

In this case the reactive processing mechanism is orthogonal to the choice of network protocol (and probably transport).

tmontgomery commented 9 years ago

I believe that it is important to leave as little dependency on underlying transport as possible. ZeroMQ or Nanomsg, while being tremendously awesome, would be adding some dependencies that, while possibly convenient, limit the use cases slightly. Especially for running over top of some useful protocols like CoAP, MQTT, etc. later. Excluding them by adding dependencies up front would seem to be unnecessary and paint us into a corner.

Dependency on the underlying transport protocol should be minimal for a well designed protocol at this layer... not sure if that is Layer 5 or 7, actually.

Due to the nature of the use cases outlined, I think the transport can only be assumed to provide simplex, best effort delivery.

This means that the only assumption would be:

Why not bi-directional? Some "protocols" like ZeroMQ, JMS, and most streaming messaging systems don't have easy to define bi-directional semantics. I believe it is possible to accommodate a lot of these systems by specifying a separation of "UP" and "DOWN" as @benjchristensen mentions that can be mapped to 2 transport sessions or to the same one... i.e. TCP, for example, by utilizing bi-directionality.

The fewer dependencies required, the better we protocol we will have. A clear separation of concerns is going to make for a much more robust and simpler solution.

rlankenau commented 9 years ago

I agree with @tmontgomery regarding the separation of concerns and only relying on simplex communication, but the abstractions that @pidster posted are great.

I don't know that they all have to be presented as first-class actions in the protocol, but they provide a very nice way of considering systems that will use the protocol.

danarmak commented 9 years ago

@tmontgomery Using two sessions, one in either direction, tends to create network routing problems. Often A can connect to B, but not B to A, due to NAT or firewalling. Even if both A and B can connect to the other, when A connects to B, the messages B receives won't necessarily be seen to come from the public address it should use to connect back to A. A node may not even know its own public IP address or whether or not it has one (e.g. in an AWS VPC).

RS semantics require duplex communication: elements in one direction, demand the other way. How can you do RS over a single simplex channel? By requiring the underlying transport to handle backpressure, like TCP does? Ot do you mean that it would always require two channels, but they might be separate?

Incidentally, glancing at the MQTT docs, it seems it's duplex?

I think the transport can only be assumed to provide simplex, best effort delivery. the transport will attempt to recover lost data within a session invocation

How can you track RS demand on a best-delivery transport, without notifications of whether delivery actually occurred? The session might get stuck with the publisher waiting for more demand, and the subscriber waiting for more items, because the earlier items were lost in transmission.

rkuhn commented 9 years ago

@danarmak It would of course be trivial to implement RS semantics on a lossless duplex transport, but as far as I understand this is not the only goal of this effort. It is meaningful to layer RS semantics on top of a transport that is by itself not yet reliable or back-pressured—I’d venture to say that this is the more interesting goal.

danarmak commented 9 years ago

@rkuhn could you please present a motivating use case of a transport that is not lossless and/or not duplex and why it's needed to use it?

The one obvious example from my personal experience is HTTP/1, which is not fully duplex (and apparently CoAP which has similar semantics). I listed some reasons to use HTTP at the start of #6: it can be used from inside browsers, the server components can be hosted in existing HTTP frameworks alongside other software, and it's already integrated with SSL, authentication etc.

Are there other prominent non-duplex use cases?

rkuhn commented 9 years ago

@danarmak Yes, the prototypical example is UDP: not having to maintain a lot of connection-based state in network endpoints as well as intermediate components can be imperative, there are limits of scale implied by using TCP as the basis for everything you do in a large system.

tmontgomery commented 9 years ago

@danarmak was thinking of the semantics on top of MQTT... i.e. basic messaging simplex operation. I forgot it actually has request/response. My fault. I blame my cold. :)

I wasn't think of the direction of connectivity being an issue. I.e. connectivity can happen in either direction as needed by the network and data flow can happen in whatever direction is needed. In this way, NAT traversal would not be an issue. I.e. both TCP sessions, for example, would connect in the same direction, but flow would go in each respective direction. TCP is a bad example since it wouldn't be necessary to have 2 connections, they can use bi-directionality.

@rkuhn I would be OK with not back-pressured. Non-reliable is problematic.

HTTP/1 and HTTP/2 (depending on how it is mapped) are ones that are problematic to handle. HTTP/1 will require 2 TCP connections as you point out in #6 . So might HTTP/2 depending on implementation limitations. Any messaging system using JMS would have to map onto topics. By extension, CoAP would be another one. Although, it does have Request/Response as well. But it might be simpler to map it as two publish/subscribes instead.

danarmak commented 9 years ago

@tmontgomery in simplex protocols, which side initiates the connection is usually tied to the directionality, which side can send messages. In a req/rep model like HTTP/1, the client would have to constantly poll for server messages. This is of course inefficient. So for real duplex transports we would still need to specify a different protocol.

@rkuhn with UDP, how do you propose to handle the routing issue?

The most general option is for the initiator to tell the other side, in its first message, what address to send replies to. If it doesn't know, it defaults to saying 'null', in which case the second party sends to the address it sees the messages as coming from.

But it's hard, sometimes impossible, to figure out dynamically what address to specify. And with a transport like UDP where there are no delivery acks, if the initiator isn't getting replies, it can't even tell if it's a routing problem in the forward direction, or in the other direction, or if the other party is just down.

Or we could punt it to the user and not worry about it, if they want to use routed UDP for bidi communications, it's on their own head :-)

tmontgomery commented 9 years ago

@danarmak I'm not a fan of long polling. But just playing devils advocate, wouldn't HTTP streaming be a way to do the backchannel? Inefficient, yes, but better than some options.

For UDP, the receiver can send back to the senders IP and port. As DNS and other protocols do. This is how DNS traverses firewalls.

danarmak commented 9 years ago

We've been discussing this from two directions at once. On the one hand, how easy different transports are to support. On the other hand, the usecases that require specific transports. should one consideration trump the other?

What are the protocols that you all think we must support even at very high cost, and not just if it's easy and neat? (For myself, lossless duplex and HTTP/1 would be enough.)

Conversely, at what point does the scope or difficulty become too great, so the project would likely stall or last too long, and so we should not try to support such scenarios? Would we know it without trying out a protocol draft / implementation? (For myself, I don't have a good idea.)

Finally, should we try to release and implement a restricted, focused first version quickly, and plan to support other usecases later? Assuming the protocol would be specified a bit differently for different transports in any case.

tmontgomery commented 9 years ago

I would be OK with a lossless duplex first version (suitable for TCP, WebSocket, and possibly HTTP/2). But would like to see us extend to HTTP/1 and other not-fully-duplex transports (like Aeron) afterward. My motivation is that I want to run this over Aeron. But realistically, I know that is a minor use case for many.

danarmak commented 9 years ago

@tmontgomery with HTTP request+response streaming, I feel you'd give up some of HTTP's semantics. For instance, if there was a network error (as there eventually will be for a very long lived connection), you wouldn't know which messages were received. So you'd have to implement explicit recovery/negotiation semantics, instead of just mapping RS messages to HTTP ones. This is tricky if the client sends a lot of demand messages, but it doesn't actually calculate what the total current demand should be.

(Actually, my own #6 has the same problem in miniature, because it allows for message batching. I'll have to add a note that the server must buffer the whole batch and resend it if it's not delivered safely the first time.)

tmontgomery commented 9 years ago

Doesn't TCP have the same problem, though. A TCP connection can be cut in the middle of a frame. Would seem to be the same issue. Maybe there is a subtlety I am missing...

danarmak commented 9 years ago

I thought that with UDP, the only reason services like DNS work is that the firewall (and all routers along the way) allow them specifically and track request/reply state. Or, more commonly, there's a DNS server inside the local network. Or DNS over TCP is used.

If a host doesn't even have a public IP address, how could a reply UDP packet possibly reach it, no matter what address it's sent to?

tmontgomery commented 9 years ago

Usually, and this is not the case with all firewalls or configs, the outbound UDP packet creates forwarding state in the router that has a lifetime. If it doesn't see a UDP message back or more UDP packets sent out to the same destination, then it deletes the forwarding state. However, if it sees a UDP packet back, it then forwards on to original source. Notice the firewall can just use it's own IP and manage it's own UDP port just as it would with a TCP connection.

Lots of UDP-based games would not work if this didn't work...

A local DNS cache (like one inside the firewall) still normally uses UDP for outbound queries. TCP is normally only used (my experience may be dated, though) for zone transfers and other bulk requests.

UDP is treated exactly like TCP wrt IP address and port usage.

NB: this assumes UDP is allowed through the firewall.

danarmak commented 9 years ago

@tmontgomery It's true, TCP has a similar problem. Which is why #4 doesn't specify recovery in case the connection is broken, because there's no way for each side to know for sure what data the other side received before the error. To overcome this we'd need to add something like explicit numbering of the frames, so after reconnecting each side could tell the other which frames it received. (Tempting to just use ZMQ which already implements frame re-delivery...)

Also, each side needs to buffer the messages it sent until the other side acknowledges them, in case it needs to resend. These explicit acks should be included in other messages and also in a new message type in case there's no other message going that way.

I didn't include this in #4 because it adds a lot of complexity. Plenty of TCP-based protocols use long-lived sessions without recovery support, like SSH for instance, and I thought we could get away with it too, but maybe there are use cases where it's important to have. What do you think?

My point about HTTP was that one of the reasons to use it instead of TCP was to reuse its semantics, like framing messages and only acting on completely delivered ones. HTTP streaming is a step back in that sense. But many other HTTP features remain, so it might well still be better than using websockets.

danarmak commented 9 years ago

@tmontgomery If we can just assume UDP (and any other transport under consideration) is routed correctly both ways, and not address routing at all in the protocol, that would be great. Presumably this depends on the usecases involved.

@rkuhn using UDP over TCP for performance implies allowing some messages to be lost and not retransmitted. But then how do you calculate demand correctly, assuming the goal is to layer any valid RS traffic over RS.io? If a stream element (onNext) is lost, the session would deadlock with the publisher waiting for more demand and the subscriber waiting for more items.

tmontgomery commented 9 years ago

I think we should consider if recovery across transport sessions is in scope... my opinion is that it is a higher level concern. i.e. shouldn't be handled in RS.io. It's best handled closer to the application were semantics can be integrated cleaner. Doing it generically for all applications is a nightmare.

danarmak commented 9 years ago

I agree. I just have a nagging suspicion that long-lived HTTP streaming is likely to induce errors due to HTTP proxies or HTTP-aware firewalls applying default timeout rules and such, if both sides don't transmit anything for a while.

rkuhn commented 9 years ago

As you guys have mentioned already there are very different transport mechanisms that we might consider, and I concur that starting out with a relatively simple one is probably a good strategy. There is a cost to using a fairly complex protocol like TCP or even HTTP, though, in that these address some concerns that overlap with what we want to achieve. It might boil down to the good old difference between ease and simplicity. To answer your point about UDP’s lossy nature, TCP’s head of the line blocking and the “TCP incast” problem illustrate why building upon a simpler foundation with different trade-offs than TCP might be worthwhile. Another consideration is that streaming use-cases that do not pass network boundaries (firewalls, NAT, “the internet”) are numerous enough to warrant a closer look.

tmontgomery commented 9 years ago

You are right. They will. Hostile intermediaries will terminate HTTP sessions they "think" are hung. WebSocket deployments have shaken a lot of that out. HTTP/2 will shake out more of them. But they exist. But in that case, I think it should be treated as a transport session being cut unexpectedly and handled by the application.

At the end of the day, the application is the one that is dealing with its own semantics. What does the end of data look like? Have I processed this before? etc.

pidster commented 9 years ago

So I was trying to ask a question, I will try to be more specific.

I read the initial post to include some kind of pubsub pattern.

Is the intent to offer reactive implementations of certain patterns, or various functions over HTTP? Or is the intent to offer functionality over a given transport?

On 13 Mar 2015, at 20:35, Todd L. Montgomery notifications@github.com wrote:

You are right. They will. Hostile intermediaries will terminate HTTP sessions they "think" are hung. WebSocket deployments have shaken a lot of that out. HTTP/2 will shake out more of them. But they exist. But in that case, I think it should be treated as a transport session being cut unexpectedly and handled by the application.

At the end of the day, the application is the one that is dealing with its own semantics. What does the end of data look like? Have I processed this before? etc.

— Reply to this email directly or view it on GitHub https://github.com/reactive-streams/reactive-streams-io/issues/1#issuecomment-79359955 .

danarmak commented 9 years ago

@rkuhn I'm afraid I don't understand what you're proposing. Is it to to build our own ack mechanism on top of UDP, but more lightweight than TCP? If so, wouldn't one of the existing UDP-based protocols be a better starting point?

danarmak commented 9 years ago

@tmontgomery some RS use cases don't allow to easily tell the publisher where to start the stream, or at least that requires explicit support in the publisher implementation (and also getting the needed arguments to the publisher factory). If the underlying connection is often broken, RS.io over HTTP won't work as a generic RS transport in some scenarios.

If the suggested solution in those cases is to use something else like websockets, then I ask myself if people in practice would always use websockets as a matter of habit once they notice HTTP is sometimes unreliable.

experquisite commented 9 years ago

@tmontgomery just FYI, I too am interested in Rx over Aeron, or some other chronicled reliable UDP.

rkuhn commented 9 years ago

@danarmak You are understanding correctly, although I am not proposing any specific solution. I am just raising possible use-cases in order to determine the scope of this effort.

viktorklang commented 9 years ago

Sorry for being late to the party, but I brought beer....

How about we take a little step back and start by defining goals, prioritizing them and then see where the scope ends up, then we can start looking at solutions to address it.