Closed Megasware128 closed 6 years ago
We actually discussed this Friday. We're not going to take a dependency on Rx. We're going to make sure the API shape matches but nothing more.
So how will the integration work in the case of TypeScript? Because right now I have to do a cast to mix RxJs and SignalR client code.
@Megasware128 Do you have sample code (both the casting version and full RxJs integration version) code you could share?
Hi @jwooley I got the "casting" sample over here.
This is the cast I was talking about. I'm trying to merge a local message stream with a SignalR message stream through the Rx merge operator. I use a cast so TypeScript will accept it, but at runtime it is still incompatible.
I also got a workaround using Subject's from Rx. This does work at compile-time and runtime. But I believe it should be possible to use the SignalR streams directly. Also there is no easy way to use RxJs operators on SignalR streams directly, from TypeScript at least.
@anurse @BrennanConroy we need to figure this out without a dependency. Maybe another package?
I'm using angular/typescript with signalr at the moment and would personally prefer just one rx type of observable to deal with rather than having to mix two types. I guess if the API shape matches, typescript can just treat it as an rxjs observable type?
[@davidfowl] Maybe another package?
From my understanding, that's our only solution.
[@coffeymatt] I guess if the API shape matches, typescript can just treat it as an rxjs observable type?
The problem is that JavaScript doesn't really have a clean way to do that and get all the useful Rx operators. In C#, you can just implement the IObservable
interface and then Rx.Net adds a ton of useful extension methods, but JavaScript doesn't really work that way. In JavaScript, even if we provide a type with .subscribe
that matches what RxJs has, we don't get the operators for free, someone needs to attach those methods to the object. We can't really do it because we don't have a reference to Rx if it's brought in via a module (which is common when using Angular, for example). I think our best solution will be to create something that can be quickly and easily adapted. We could have a wrapper library for Rx that wraps HubConnection
(perhaps ObservableHubConnection
) and changes .stream
to return RxJs-style observables.
I'm going through the source code and wondering if I should even use it at all or write my own RxJS wrapper around HubConnection
.
Are there any docs or examples using .stream
? I don't understand the intended use case or purpose.
https://github.com/aspnet/SignalR/blob/dev/samples/SocketsSample/wwwroot/streaming.html shows the client side
https://github.com/aspnet/SignalR/blob/dev/samples/SocketsSample/Hubs/Streaming.cs shows the server side
The scenario is mostly just another approach to server notifications. Instead of the server "invoking" methods on the client, the client can "subscribe" to a channel of messages. The underlying logic is the same as having a Hub method where the implementation just sits in a loop sending invocations to the client. It's somewhat of an experiment (we're always looking for feedback) to see if this kind of use case is something customers are interested in.
It's very common for users to use SignalR purely for this kind of "server-to-client" notifications channel, and this would allow them to use APIs like System.Threading.Channels
(sorry, there's no docs for that yet) to manage those notifications across their apps. You could have a DI service manage one of these Channels and provide access to the Write end of the channel in one place of your app, and return the Read end of the channel from a Hub method and boom, you have a notification channel with no coupling to SignalR concepts (like HubContext<T>
, which is how you do this today)
As for RxJs integration, we're not likely to be able to achieve something built-in for 2.1 (hence moving tot he backlog), but I think an RxJS wrapper around HubConnection
is a great idea, and what we might end up doing "officially" later via a separate package (I'm partial to the name @aspnet/signalrx
, but it's probably too similar to @aspnet/signalr
;))
@anurse, thanks for that info. Can you explain why the server might return IObservable vs ChannelReader? Seems like the client treats them in the same way.
It's just two different ways to do the streaming (like how IEnumerable
can be a List, Array, etc.). Use whichever fits into your app best. I tend to prefer ChannelReader because it has a clear Writer/Reader semantic. But Observables can be adapted into ChannelReaders and vice-versa (in fact, we do exactly that) so it's up to you.
The long term goal is to support IAsyncEnumerable<T>
as the common interface here, but since that doesn't exist, for now we accept IObservable<T>
and ChannelReader<T>
.
@anurse, good to know. I was also reading this post about the old SignalR library to wrap my mind around streaming. The author compares SignalR invocations to remote procedure calls and compares streaming to event handling.
Is it recommended to not mix and match the two styles? In the client and server examples you posted above, how would the client send data to the server? The client only has access to either an observable or a channel reader. Should the client invoke a normal (i.e. non-streaming) method on the server to send data, or is there some concept of two-way communication within a stream?
I was also reading this post about the old SignalR library to wrap my mind around streaming
It's important to note that streaming in SignalR for ASP.NET Core works differently from how it worked in SignalR for ASP.NET. In ASP.NET it was just a different way of handling the server-to-client invocations (basically a replacement for HubConnection.On
). In ASP.NET Core it's a different way of returning a value from a client-to-server invocation, so it works a little differently.
Is it recommended to not mix and match the two styles? .., Should the client invoke a normal (i.e. non-streaming) method on the server to send data, or is there some concept of two-way communication within a stream?
You can absolutely mix and match the styles! Streams are one way (from server to client) so yes, you'd use non-streaming methods to send data (which may trigger responses to be emitted on the streaming method). You could implement chat this way, for example, by having the client .stream
a "ReceiveMesssages" method and update the UI when new items appear, then use .invoke
on a "SendMessage" method to send messages on it's own.
@anurse In terms of feedback, I'd love to provide you with whatever I can. The signalrx concept sounds good to me. Since it doesn't exist, I'll have to write something myself.
I don't think I really understand the concept behind .stream()
vs .on()
. I think, by using some examples, I can explain my point. I'm going to convert an RxJS-like into an RxJS Observable. I'd like to know if I'm on the ball here or completely misunderstanding the use case.
With WebSockets, I can do something as simple as:
import { dispatchStateUpdate } from './dispatchStateUpdate'
import { webSocket } from 'rxjs/observable/dom/webSocket'
const webSocket$ = webSocket('ws://example.com')
webSocket$
.pipe(
// Transform methods
)
.subscribe(dispatchStateUpdate)
In this next example, I'll use hubConnection.on()
with a single message listener:
import { dispatchStateUpdate } from './dispatchStateUpdate'
import { HubConnection } from '@aspnet/signalr/dist/esm/HubConnection'
import { Observable } from 'rxjs/Observable'
const signalRx = path => (
Observable
.create(observer => {
const connection = new HubConnection(path)
connection
.on('message', message => observer.next(message))
connection.start()
})
)
const signalR$ = signalRx('/testHub')
signalR$
.pipe(
// Transform methods
)
.subscribe(dispatchStateUpdate)
From what I'm seeing, .stream()
is just a limited observable with no transform methods:
import { dispatchStateUpdate } from './dispatchStateUpdate'
import { HubConnection } from '@aspnet/signalr/dist/esm/HubConnection'
const connection = new HubConnection('/testHub')
connection.start()
connection
.stream('message')
// No way to use transform methods
.subscribe(dispatchStateUpdate)
I would instead want to write it like this:
import { dispatchStateUpdate } from './dispatchStateUpdate'
import { HubConnection } from '@aspnet/signalr/dist/esm/HubConnection'
import { Observable } from 'rxjs/Observable'
const signalRx = path => (
Observable
.create(observer => {
const connection = new HubConnection(path)
connection.start()
connection
.stream('message')
.subscribe(observer)
})
)
const signalR$ = signalRx('/testHub')
signalR$
.pipe(
// Transform methods
)
.subscribe(dispatchStateUpdate)
But then I don't really get any benefits from using .stream()
right? This is where I'm lost.
Going deeper into these examples, I should be using a Subject
because at some point, I need some way of invoking server-side functions just like the official RxJS WebSocket implementation.
With WebSockets, I can send data like so:
import { dispatchStateUpdate } from './dispatchStateUpdate'
import { webSocket } from 'rxjs/observable/dom/webSocket'
const webSocket$ = webSocket('ws://example.com')
webSocket$.next('someDataForServer')
So from that perspective, would a signalRx
be something like this?
import { dispatchStateUpdate } from './dispatchStateUpdate'
import { HubConnection } from '@aspnet/signalr/dist/esm/HubConnection'
import { Observable } from 'rxjs/Observable'
import { Subject } from 'rxjs/Subject'
const signalRx = path => (
const subject = new Subject()
Observable
.create(observer => {
const connection = new HubConnection(path)
connection.start()
connection
.stream('message')
.subscribe(observer)
subject
.subscribe(({ methodName, data }) => {
connection.invoke(methodName, data)
})
})
// I should be using `AnonymousSubject` and a bunch of extra code to avoid this mutation.
Observable.next = subject.next
return Observable
)
const signalR$ = signalRx('/testHub')
signalR$.next({
methodName: 'doSomething',
data: 'someDataForServer'
})
Hopefully these examples helped show where I'm missing something major!
From what I'm seeing, .stream() is just a limited observable with no transform methods:
This is purely because we don't want to depend directly upon the RxJS components. You could write an adaptor that converts it to an Rx observable and all the transforms would come over for free. Unlike C# where we implement an interface and you get extension methods, JavaScript requires merging the transform methods into the object itself.
But then I don't really get any benefits from using .stream() right?
I'm unclear what you're comparing to here? Your WebSocket example? Over that you get the benefit of support for Server-Sent Events and Long Polling transports on downlevel browsers/networks. The other difference is how the server side is implemented (you return the read end of a Channel rather than invoking Client hub methods)
I need some way of invoking server-side functions just like the official RxJS WebSocket implementation.
The HubConnection
provides you that way. If you'd like to wrap it in a Subject
, that's certainly your prerogative :).
Remember that .stream
is not an attempt to build a "RxJS"-y way to use SignalR, it's a very specific use case: Allowing the server to emit multiple results from an invocation over time. We use the Observer model for that because JavaScript requires callbacks. Going further is a matter for other libraries to build upon.
would a signalRx be something like this?
It looks pretty close. I don't have a lot of experience with RxJS to know the idiomatic way to multiplex multiple different Observables/Subjects.
I think I get what you mean where you've created connection.stream(methodName)
to be something generic that, if we wanted to pass in an RxJS Subject
or observer, it would work right out of the box. It isn't meant to have operators such as filter
and map
piped into it or any kind of transforms. You also called it .subscribe
for semantic meaning, not because it's supposed to resemble RxJS's .subscribe
method.
Is that correct?
I was using new WebSocket
just to show how RxJS handles wrapping something that can both receive and send data. It's strange because when you call webSocket$.next
, it actually does a connection.send
in the background whereas normally calling .next
on a Subject
actually sends data into the .subscribe
function.
@anurse
You can absolutely mix and match the styles! Streams are one way (from server to client) so yes, you'd use non-streaming methods to send data (which may trigger responses to be emitted on the streaming method).
So my streaming method is returning a ChannelReader<IEnumerable < MyType > > and it's implemented as shown here. Everything is working so far with the client.
Now, I've created the channel instance when the hub is instantiated (i.e. it's now a member field on the hub) and I've defined the following non-streaming method.
public async Task UpdateMyType(MyType myType)
{
await m_channel.Writer.WriteAsync(new[] { new MyType() });
}
I've confirmed that the client still receives data when calling the streaming method (so the channel is still working. I've also confirmed that the non-streaming method executes when called by the client, but the client is not receiving the new object on the same channel.
Any ideas?
Where is the channel instantiated?
The channel is instantiated inline with the m_channel field declaration within my hub class. I'm then sharing this private field between my streaming method and my non-streaming method.
Hubs are transient, they are constructed separately for each invocation and disposed of after each invocation. You should use a singleton DI service to host the Channel itself.
@anurse you beat me to it. That just occurred to me after my last post and I was about to try that. I'll post here once I've confirmed.
Making the channel lifetime independent of the hub lifetime solved my issue.
We settled on a design that's decoupled from the Rx library but should be easy to adapt.
@anurse, @bug84, Do you have a sample of doing this? I'm in a situation where I need to stream items from a queue. Given that hubs are transient, is streaming the best option for something like this or should I use hub context and use a client method?
In the Observable.ts file there is a TODO: Seamless RxJs integration.
Should the TS client just have a dependency to rxjs or is the rxjs library a too big of a dependency for SignalR?
On a fork I already tried replacing the Observable.ts file with the Subject, Observable and Observer from rxjs and it does not create any issues in my situations. It also solves the unsubscribe issue.
So should this be the way to go? Or is rxjs too big and should the streaming part of the TS SignalR client be split of in a standalone package to reduce size?
What are the decisions for rxjs integration?