Odonno / ngrx-signalr-core

A library to handle realtime SignalR (.NET Core) events using @angular, rxjs and the @ngrx library
https://www.npmjs.com/package/ngrx-signalr-core
MIT License
28 stars 13 forks source link

How to use signalr stream in Effects #1

Closed zyzhu closed 5 years ago

zyzhu commented 5 years ago

Great library! I am wondering how to use stream in Effects module. https://docs.microsoft.com/en-us/aspnet/core/signalr/streaming?view=aspnetcore-3.0

stream only returns IStreamResult and subscribe to it with three callback functions. I used it before in plain vanilla Angular app. But I'm struggling to put this to work in ngrx Effects. Any idea is greatly appreciated.

Odonno commented 5 years ago

Hi @zyzhu Thank you for your kind words.

About SignalR streaming, I never dealt with it but the documentation is well written and I will try to answer to your question precisely even though I do not know your use case. It seems that the JS client has two streaming strategy : client-to server and server-to-client streaming. I will show you how to deal with these 2 strategies.

Client-to server streaming

https://docs.microsoft.com/en-us/aspnet/core/signalr/streaming?view=aspnetcore-3.0#client-to-server-streaming-2

Based on the documentation, you can send streaming events to the server using a Subject. It seems that the send function can do the thing. Not sure if we should use send or stream function, I am a little confused here. So it has to be tested. But here is how I would do that:

sendStream$ = createEffect(() =>
    this.actions$.pipe(
        ofType(SIGNALR_HUB_CONNECTED), // when an action occured
        switchMapHubToAction(({ hub }) => {
            const subject = new signalR.Subject();
            yield hub.send("UploadStream", subject);

            var iteration = 0;
            const intervalHandle = setInterval(() => {
                iteration++;
                subject.next(iteration.toString());
                if (iteration === 10) {
                    clearInterval(intervalHandle);
                    subject.complete();
                }
            }, 500);
        })
    ),
    { dispatch: false }
);

It is clearly close to what they show on the documentation. I believe there must be a better way to write this but I do not have it for now.

Server-to-client streaming

https://docs.microsoft.com/en-us/aspnet/core/signalr/streaming?view=aspnetcore-3.0#server-to-client-streaming-2

Based on the documentation, you can receive events using the stream function. So, if you want to dispatch action from the stream, here is how I'd do it.

receiveStream$ = createEffect(() =>
    this.actions$.pipe(
        ofType(SIGNALR_HUB_CONNECTED), // when an action occured
        switchMapHubToAction(({ hub }) => {
            return hub.stream("Counter", 10, 500).pipe(
                map(item => itemReceived({ item })), // = next
                catchError(error => streamError({ error })), // = error
                endWith(streamCompleted()) // = complete
            );
        })
    )
);

I hope it will work for you. Keep me in touch.

zyzhu commented 5 years ago

Thanks for your detailed explanation. It's just stream does not return Observable, but IStreamResult

I googled and found a snippet that converts IStreamResult to Observable. Then everything works as expected. https://twitter.com/cecilphillip/status/998038138428805120?lang=en

import { IStreamResult } from '@aspnet/signalr'
import { Subject, Observable } from 'rxjs'

export function adapt<T = any>(stream: IStreamResult<T>): Observable<T> {
  const subject = new Subject<T>()
  stream.subscribe(subject)
  return subject.asObservable()
}
Odonno commented 5 years ago

Oh, I see. Sorry for the misunderstanding. So, what do you think if we add a fromStream function inside the package?

Odonno commented 5 years ago

You can now use fromStream as an rxjs operator using the latest of the library https://github.com/Odonno/ngrx-signalr-core/blob/620e517f17af2a0895196f2ee2cd71392408f984/src/operators.ts#L51