Closed letrthang closed 2 years ago
Hey @letrthang , thanks for stopping by! Push support is definitely on our radar. We want to have a proper support for it, including support for Push servers (so your application server can be stateless) by Vaadin 23. The plans can change of course - please follow us on the official roadmap.
I did an example of an implementation here: https://github.com/jcgueriaud1/vaadin-fusion-mobx/ Unfortunately it's not documented.
In the app, the important parts are:
As it's based on web socket, the example is not stateless.
Design based on the current hillapush
feature branch prototype
https://projectreactor.io/ defines "Reactive Streams for building non-blocking applications on the JVM". This has a few basic concepts:
Subscribers can subscribe and the publisher can push data to them.
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
A subscriber for a publisher that can receive data (onNext
), a complete event if the stream ever completes (onComplete
), an error message (onError
) if something fails
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t); // When data is available
public void onError(Throwable t); // If an error occurred. No data is sent after this
public void onComplete(); // If the stream ends. No data is sent after this
}
Can decide how many items to request at a time and/or cancel the subscription
public interface Subscription {
public void request(long n); // The number data objects to request from a publisher
public void cancel(); // Ask publisher to stop sending data
}
There are two basic types of publishers: Mono
and Flux
.
Publishes 0 or 1 data objects, ending with onComplete
If an error occurs, only onError
is called
Publishes 0-N data objects. If the stream ends, then calls onComplete
. Many streams are infinitely long e.g. tracking new sensor readings.
If something fails, calls onError
and stops emitting data objects
On the Java side of endpoints, we should support Mono
and Flux
separately.
An example of a simple Mono
endpoint that would send Hello
after 1s would be:
@Endpoint
@Nonnull
public Mono<String> helloMono() {
return Mono.defer(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return Mono.just("Hello");
});
}
An example of a simple Flux
endpoint that sends the numbers from 1 to 10 with 1s interval would be:
@Nonnull
public Flux<@Nonnull Integer> helloFlux() {
return Flux.range(1, 10).delayElements(Duration.ofMillis(1000));
}
On the TypeScript side, we should map Mono
and Flux
in different ways.
From the client point of view, an endpoint returning a Mono<String>
is the same as an endpoint returning a String
. The value will not be available when the endpoint call is made but later at some point. For both cases, we can use the current approach of returning a Promise<String>
which is resolved when the value becomes available or then an exception is called if an error occurs (onError
is called on the Mono
).
You would this consume the helloMono
endpoint e.g. as
const message = await SomeEndpoint.helloMono();
exactly the same as if the endpoint method was defined as
@Nonnull
public String helloMono() {
For a Flux we map it to a combination of the Java Subscriber
and Subscription
interfaces so that the generated helloFlux
endpoint would return a Subscriber
. This TS interface would be
export interface Subscription<T> {
cancel: () => void; // cancels the subscription
onNext: (callback: (value: T) => void) => Subscription<T>;
onError: (callback: () => void) => Subscription<T>;
onComplete: (callback: () => void) => Subscription<T>;
}
This allows you to use the methods in the simplest case as:
SomeEndpoint.helloFlux().onNext(number => {
this.numbers = [...this.numbers, number];
});
Then to handle the error or complete event you can chain on
SomeEndpoint.helloFlux().onNext(...).onError(...).onComplete(...);
To abort the subscription, you store the return value so you later can call cancel() on it:
const subscription = SomeEndpoint.helloFlux().onNext(...);
...
subscription.cancel();
Note that unlike cancel() in the Java Subscription, this will prevent any more events from being delivered, i.e. prevent onNext
from being called after cancel()
.
As a very typical case is that your subscription is tied to a UI element and should only be active when the UI element is attached. To make subscription management easier, we generate an additional method for each endpoint method:
function _helloFlux(): Subscription<number>
function _helloFlux(context: ReactiveElement): Subscription<number>
The latter variant connects the subscription to a LitElement
(implements ReactiveElement
) so that when the LitElement
is disconnected from the document, cancel()
is called on the subscription. A typical usage pattern then becomes:
async connectedCallback() {
super.connectedCallback();
FluxEndpoint.helloFlux(this).onNext(value => {
this.numbers = [...this.numbers, number];
}
The communication will be implemented over a websocket channel but this is considered an implementation detail that you as a user cannot see or affect.
One additional opportunity could be to make the client-side Subscription
type also define a Symbol.asyncIterator
method so that you could use for await
to iterate over the items provided by a subscription. Similarly to how you in simple cases can use the regular await
to wait for a Promise, you could then in simple cases use for await
with a Subscription.
The overall pattern then becomes like this
try {
for await (value of SomeEndpoint.helloFlux()) {
// onNext
}
// onComplete
} catch (error) {
// onError
}
The typical usage pattern example would be like this:
for await ( number of FluxEndpoint.helloFlux()) {
this.numbers = [...this.numbers, number];
}
for await ( number of FluxEndpoint.helloFlux()) {
this.numbers = [...this.numbers, number];
}
I wonder if that typical usage pattern is problematic as it is in many cases it is an "infinite loop". So if you add any code after it, that won't be run unless the Flux actually completes or fails. So far the fluxes I have encountered are mostly infinite
One additional implementation consideration:
In order for reconnecting to work, which can happen a lot with a websocket channel even if it is not idle and the network is good, the server needs to be able to buffer message up to a given limit so that the messages are delivered when the client reconnects. If the client is unable to reconnect in a timely manner or if the server tells that the buffer has been dropped when the client reconnects, then onError
should be called on the client.
Also needs to handle for case of hybrid mode which combines of Vaadin flow and Hilla code and @Push is enabled on Flow.
@letrthang: I would spontaneously assume that it would work nicely to let each framework handle server push independently of each other instead of combining them. Do you have any specific use case in mind that would require a unified approach?
@letrthang: I would spontaneously assume that it would work nicely to let each framework handle server push independently of each other instead of combining them. Do you have any specific use case in mind that would require a unified approach?
i don't have specific use case so far
Just think can we have a way to enable server Push with Vaadin Fusion (now is Hilla) like what Vaadin Flow can do with @Push ?
Steps maybe:
just have very basic and brief thinking :))