Applied-Duality / ReactiveRemote

Observables in the Cloud
Apache License 2.0
1 stars 0 forks source link

drafted the Glossary #2

Open rkuhn opened 10 years ago

rkuhn commented 10 years ago

In order to find out how far we are from being on the same page I drafted a first sketch of the Glossary. Please check for obvious errors as well as contentious points so that we can converge on a defined vocabulary soon. I’d have like to post this as a pull request for better commenting facilities, but unfortunately this is not possible with the wiki repo.

viktorklang commented 10 years ago

One thing I think it's extremely important to address is:

"Consumer/Observer

The consumer of a stream is receives elements as they are produced to the stream. The consumer may have the ability to signal to the producer when it can accept further elements. This still leaves the consumer as the weaker party in the exchange because the producer may choose to ignore that signal.

A consumer can process the received elements either

synchronously (a synchronous observer), or asynchronously (an asynchronous observer) This distinction is important when it comes to flow control (see there). An asynchronous consumer will typically have to employ a queue for decoupling the reception of the next element from its processing; this queue can have capacity for multiple elements."

I think it's crucial that one knows what to expect, is it synchronous or asynchronous. See: http://blog.ometer.com/2011/07/24/callbacks-synchronous-and-asynchronous/

(this is why Scala Promises and Futures are Async across the board)

benjchristensen commented 10 years ago

I like the glossary. I'd like to clarify something about synchronous/asynchronous consumers:

This distinction is important when it comes to flow control (see there). An asynchronous consumer will typically have to employ a queue for decoupling the reception of the next element from its processing; this queue can have capacity for multiple elements.

Would it be correct in a client/server relationship over a network connection to understand that unless every event is acked from from the consumer that it is by default an async relationship because of the queueing that happens at multiple steps along the path (network layers)?

For example, in the following slow consumer client code it does everything in the client "synchronously" in the sense that it doesn't explicitly queue or add concurrency, but it definitely results in queueing somewhere before it (Netty, JVM, OS, network devices).

        RemoteSubscription s = RxNetty.createTcpClient("localhost", 8181, ProtocolHandlers.stringLineCodec())
                .onConnect({ TcpConnection<String, String> connection ->
                    return connection.getChannelObservable().map({ String msg ->
                        // simulate slow processing
                        Thread.sleep(1000)
                        return msg.trim()
                    });
                }).subscribe({ String o ->
                    println("onNext event => " + o + "\n")
                }, {Throwable e ->
                    println("error => " + e); e.printStackTrace()
                });

The server just keeps emitting regardless of what the client consumes:

    public static Observable<String> createServer(final int port) {
        return RxNetty.createTcpServer(port, ProtocolHandlers.stringLineCodec())
        .onConnect({ TcpConnection<String, String> connection ->
            return getEventStream(connection);
        });
    }

    public static Observable<Void> getEventStream(final TcpConnection<String, String> connection) {
        return Observable.interval(10, TimeUnit.MILLISECONDS)
        .flatMap({ Long interval ->
            System.out.println("Writing event: " + interval);
            // emit the interval to the output and return the notification received from it
            return connection.write("data: {\"type\":\"Command\",\"name\":\"GetAccount\",\"currentTime\":1376957348166,\"errorPercentage\":0,\"errorCount\":0,\"requestCount\":" + interval + "}\n").materialize();
        })
        .takeWhile({ Notification<Void> n ->
            // unsubscribe from interval if we receive an error
            return !n.isOnError();
        }).finallyDo({ println(" --> Closing connection and stream") })
    }

In an execution of this code the consumer will only consume ~2632 events by the time the server has emitted 263,782 events, yet this happens without code on either end explicitly defining a queue.

benjchristensen commented 10 years ago

Regarding the sync/async consumers, I'm thinking through the various higher-order functions and want to confirm my interpretation of what the glossary is saying.

Synchronous (but still non-blocking) functions would include things like map, flatMap, merge, take, filter since they do not queue anything and synchronously execute their logic before emitting the value and returning to the producer for each onNext.

Asynchronous functions include zip, window, buffer which either explicitly or implicitly queue events received from the producer and decouple delivery to the consumer.

According to the blog referenced above the argument is that any given function should only ever be one or the other, async or sync so as to be clear in its behavior - and that makes sense.

Operators like zip can perhaps be overloaded with different strategies (unbounced queue and be async, block and be sync, bounded queue up to a point and block, bounced queue up to a point and drop events, etc). In that case each overload may be sync or async but it would be explicitly chosen and always be one or the other depending on chosen behavior.

Is this a correct interpretation?

benjchristensen commented 10 years ago

I like these sections:

An important property of back-pressure is that in order to transport it through a network every single subject must apply it. If unbounded buffering or throttling is used by a subject within the network, then a bound on the emission rate of the network will in general not place a bound on the ingestion rate of that network. The inverse is that a network which applies back-pressure on all its streams and with a bounded emission rate will also have a bounded ingestion rate (unless it contains subjects whose input to output ratio is not bounded).

Another observation is that a stream with back-pressure is driven by the producer (“push”) as long as the consumer can process faster than the producer can emit elements, and it is driven by the consumer (“pull”) as long as the producer can emit faster than the consumer can process elements. During a concrete flow of this stream the rates may change, leading to dynamic switching between “push” and “pull” flow.

This line "if unbounded buffering or throttling is used by a subject within the network" is in particular an important and challenging one, as application developers can easily do this within a transformation/combinatorial graph without considering the impact. The use of functions like window/buffer/zip/observeOn without consciously choosing what to do about queueing immediately breaks the entire back-pressue chain. It suggests that any functions that perform queueing should require explicit choice of behavior and bounds.

rkuhn commented 10 years ago

@viktorklang Thanks for pointing out that article! It matches my intuitive understanding quite naturally, but good that Havoc has already written it up for reference :-)

@benjchristensen We seem to be thinking along the same lines here: we should make it hard for the user to mix up synchronous and asynchronous operations unintentionally, which is best achieved by separating them within the type system (i.e. having something like SyncObserver/AsyncObserver).

One intuition I feel is important is that SyncObserver does not participate in explicit back-pressure handling because it implicitly exerts back-pressure due to its mode of execution, which means that AsyncObserver is the natural place where the need for feedback is expressed. The Observable to which such an AsyncObserver subscribes must be aware of the feedback channel because otherwise it will overwhelm the observer. Beyond this theoretical reason there is also the practical implication that the asynchronous version will have a slightly different interface—the feedback must be communicated via a method call somehow—and therefore it makes sense to require an adequate AsyncObservable.

Since the function of the sync and async variants depends on a matching counterpart there cannot be a subtype relationship between them, neither can pass for the other. The precise details of the rules relating to which operations mediate between which processing modes elude me at this point, Friday evening taking its toll; I’ll keep thinking about it over the next days. Another intuition I have is that what I call “plan” in the glossary might well summarize all those operations which keep sync/async nature as it is while changes between them might happen at the Subject level.

Your example of the streams connected via RxNetty shows that transmission of streams by other means (e.g. Netty/NIO/TCP/IP/Ethernet in this case) needs to respect and transport back-pressure as well, which must be actively arranged (by telling Netty not to buffer writes without bound—all other parts have bounds AFAICS). The consumer in the example is completely synchronous and not guilty of buffering; queueing is introduced by the extra non-Rx layers. If we were to express the RxNetty-Netty-NIO-TCP-IP-Ethernet-IP-TCP-NIO-Netty-RxNetty pipeline as a Subject then it would for sure need to be an asynchronous one.

Closing thought: we should create a table listing all the stream transformation vocabulary so that we gain an overview of what we need to categorize.