Odonno / ngrx-signalr-core

A library to handle realtime SignalR (.NET Core) events using @angular, rxjs and the @ngrx library
https://www.npmjs.com/package/ngrx-signalr-core
MIT License
27 stars 13 forks source link

"Uncaught (in promise): Error: Cannot send data if the connection is not in the 'Connected' State" Error #12

Closed patch321 closed 4 years ago

patch321 commented 4 years ago

Getting this error when trying to perform signalR functions. Everything seems to work properly so i'm wondering if there's a race condition somewhere?

contract.effect.ts

@Injectable()
export class ContractEffects {
    contractHub = {
        hubName: 'contractHub',
        url: `${environment.apiUrl}/contractHub`,
        options: { accessTokenFactory: () => this.authService.getToken() },
    };

    loadContractHistory$ = createEffect(() =>
        this.actions$.pipe(
            ofType(contractActions.loadChangeHistory),
            switchMap((action) =>
                this.contractService.getContractHistory(action.contractId).pipe(
                    map((response) => contractActions.loadChangeHistoryComplete({ changeList: response })),
                    catchError((error) => of(toastActions.showToast({ message: error.message, error: true })))
                )
            )
        )
    );

    attachRealTime$ = createEffect(() => this.actions$.pipe(ofType(signalrHubUnstarted), map(startSignalRHub)));

    mapSignalREvents$ = createEffect(() =>
        this.actions$.pipe(
            ofType(signalrHubUnstarted),
            mergeMapHubToAction(({ hub }) => {
                const whenContractInitiallyOpened$ = hub
                    .on('ContractOpenedBy')
                    .pipe(map((x: ActiveSignalRUser[]) => contractActions.updateContractInitiallyOpened({ users: x })));
                const whenContractOpened$ = hub
                    .on('ContractOpened')
                    .pipe(map((x: ActiveSignalRUser) => contractActions.updateContractOpened({ user: x })));
                const whenContractClosed$ = hub
                    .on('ContractClosed')
                    .pipe(map((x: ActiveSignalRUser) => contractActions.updateContractClosed({ user: x })));
                return merge(whenContractInitiallyOpened$, whenContractOpened$, whenContractClosed$);
            })
        )
    );

    contractOpened$ = createEffect(
        () =>
            this.actions$.pipe(
                ofType(contractActions.openContract),
                delay(3000),
                map((params) => {
                    const foundHub = findHub(this.contractHub);
                    if (!foundHub) {
                        return of(hubNotFound(this.contractHub));
                    }
                    foundHub.send('OpenContract', params.contractId);
                })
            ),
        { dispatch: false }
    );

    contractClosed$ = createEffect(
        () =>
            this.actions$.pipe(
                ofType(contractActions.closeContract),
                map((params) => {
                    const foundHub = findHub(this.contractHub);
                    if (!foundHub) {
                        return of(hubNotFound(this.contractHub));
                    }
                    foundHub.send('CloseContract', params.contractId);
                })
            ),
        { dispatch: false }
    );

    constructor(private actions$: Actions, private contractService: ContractDetailsService, private authService: AuthService) {}
}
Odonno commented 4 years ago

Hi @patch321

I don't see anything wrong in your code. The attachRealTime$ effect is not what I recommend but I do not see why it could not work.

In order to debug this, I would be curious to see what you see in your Redux DevTools. If you didn't do it until now, I strongly recommend you to add it in your app (at least in dev mode). Here is the tutorial: https://ngrx.io/guide/store-devtools You will need to add some code in your AppModule and install the Redux DevTools extension, available in Chrome, Edge Chromium et Firefox I think.

Also, I can suggest you some refactoring in your code:

  1. The mergeMapHubToAction operator also return the previous action, not only the hub. So:
contractOpened$ = createEffect(
    () =>
        this.actions$.pipe(
            ofType(contractActions.openContract),
            delay(3000),
            mergeMapHubToAction(({ hub, action }) => {
                hub.send('OpenContract', action.contractId);
            })
        ),
    { dispatch: false }
);
  1. The hub.on function can be typed. So you can simplify your code a little:
mapSignalREvents$ = createEffect(() =>
    this.actions$.pipe(
        ofType(signalrHubUnstarted),
        mergeMapHubToAction(({ hub }) => {
            const whenContractInitiallyOpened$ = hub
                .on<ActiveSignalRUser[]>('ContractOpenedBy')
                .pipe(map(users => contractActions.updateContractInitiallyOpened({ users })));
            const whenContractOpened$ = hub
                .on<ActiveSignalRUser>('ContractOpened')
                .pipe(map(user => contractActions.updateContractOpened({ user })));
            const whenContractClosed$ = hub
                .on<ActiveSignalRUser>('ContractClosed')
                .pipe(map(user => contractActions.updateContractClosed({ user })));
            return merge(whenContractInitiallyOpened$, whenContractOpened$, whenContractClosed$);
        })
    )
);
patch321 commented 4 years ago

Here's the redux tools on initial load of the app. image

and here's after selecting a contract (using signalR to keep track of multiple users accessing the same contract in order to avoid stepping on toes) image

Also might be worth mentioning that the following code is executed on contract load, rather than the launch of the app.

const hub = { hubName: 'contractHub', url: ${environment.apiUrl}/contractHub, options: { accessTokenFactory: () => this.tokenService.getToken() }, }; this.store.dispatch(createSignalRHub(hub));

Odonno commented 4 years ago

Ah, yes. I recommend you create your hubs somewhere else. I also had a bug where my hub object was undefined when I expected it to be defined. You can create another service and instantiate your hubs in the constructor of this new service. And then inject your new service in the Effect service.

Also, inspect all your actions to see that they send the correct data (Action tab of your Redux DevTools).

Tell me if it changes anything.

patch321 commented 4 years ago

I have tried to create and start the hub of my app.component.ts withing the ngOnInit to no avail. but I can try setting up a service and see if it fairs any better. Thank you.

patch321 commented 4 years ago

Ok. Tried instantiating the hub in a service and injecting that service into the effects. Still getting the same error

signal-r.service.ts

constructor(private authService: AuthService) {
        this.hub = {
            hubName: 'contractHub',
            url: `${environment.apiUrl}/contractHub`,
            options: { accessTokenFactory: () => this.authService.getToken() },
        };
    }
}

contract.effect.ts

@Injectable()
export class ContractEffects {
    contractHub = this.signalRService.hub;

    loadContractHistory$ = createEffect(() =>
        this.actions$.pipe(
            ofType(contractActions.loadChangeHistory),
            switchMap((action) =>
                this.contractService.getContractHistory(action.contractId).pipe(
                    map((response) => contractActions.loadChangeHistoryComplete({ changeList: response })),
                    catchError((error) => of(toastActions.showToast({ message: error.message, error: true })))
                )
            )
        )
    );

    attachRealTime$ = createEffect(() => this.actions$.pipe(ofType(signalrHubUnstarted), map(startSignalRHub)));

    mapSignalREvents$ = createEffect(() =>
        this.actions$.pipe(
            ofType(signalrHubUnstarted),
            mergeMapHubToAction(({ hub }) => {
                const whenContractInitiallyOpened$ = hub
                    .on('ContractOpenedBy')
                    .pipe(map((x: ActiveSignalRUser[]) => contractActions.updateContractInitiallyOpened({ users: x })));
                const whenContractOpened$ = hub
                    .on('ContractOpened')
                    .pipe(map((x: ActiveSignalRUser) => contractActions.updateContractOpened({ user: x })));
                const whenContractClosed$ = hub
                    .on('ContractClosed')
                    .pipe(map((x: ActiveSignalRUser) => contractActions.updateContractClosed({ user: x })));
                return merge(whenContractInitiallyOpened$, whenContractOpened$, whenContractClosed$);
            })
        )
    );

    contractOpened$ = createEffect(
        () =>
            this.actions$.pipe(
                ofType(contractActions.openContract),
                delay(1000),
                map((params) => {
                    const foundHub = findHub(this.contractHub);
                    if (!foundHub) {
                        return of(hubNotFound(this.contractHub));
                    }
                    foundHub.send('OpenContract', params.contractId);
                })
            ),
        { dispatch: false }
    );

    contractClosed$ = createEffect(
        () =>
            this.actions$.pipe(
                ofType(contractActions.closeContract),
                map((params) => {
                    const foundHub = findHub(this.contractHub);
                    if (!foundHub) {
                        return of(hubNotFound(this.contractHub));
                    }
                    foundHub.send('CloseContract', params.contractId);
                })
            ),
        { dispatch: false }
    );

    constructor(
        private actions$: Actions,
        private contractService: ContractDetailsService,
        private authService: AuthService,
        private signalRService: SignalRService
    ) {}
}

also adding some of the .net files to provide a better context

SingalRMiddleware.cs

namespace Contract.API.Hubs
{
    public class SignalRMiddleware
    {
        private readonly RequestDelegate _next;

        public SignalRMiddleware(RequestDelegate next)
        {
            _next = next;
        }

        public async Task Invoke(HttpContext httpContext)
        {
            var request = httpContext.Request;

            // web sockets cannot pass headers so we must take the access token from query param and
            // add it to the header before authentication middleware runs
            if (request.Path.StartsWithSegments("/contractHub", StringComparison.OrdinalIgnoreCase) &&
                request.Query.TryGetValue("access_token", out var accessToken))
            {
                request.Headers.Add("Authorization", $"Bearer {accessToken}");
            }

            await _next(httpContext);
        }
    }
}

ContractHub.cs

namespace Contract.API.Hubs
{
    [Authorize]
    public class ContractHub : Hub
    {
        private readonly ContractContext _context;

        private static ConcurrentDictionary<string, List<ActiveSignalRUser>> connectedUsers = new ConcurrentDictionary<string, List<ActiveSignalRUser>>();

        public ContractHub(ContractContext context)
        {
            _context = context;
        }

        public async Task OpenContract(int contractId)
        {

            var contractGroup = GetContractGroup(contractId);

            // All users can watch the contracts to get refresh reminders
            await Groups.AddToGroupAsync(Context.ConnectionId, contractGroup);

            // Check for authentication and write access to send watched by requests
            var userInfo = this.GetUserObjectFromContext();
            if(userInfo?.CanWrite == true)
            {
                var signalRUser = new ActiveSignalRUser()
                {
                    ConnectionId = Context.ConnectionId,
                    UserName = userInfo.DisplayName,
                    LastHeartbeat = DateTime.Now
                };
                if (!connectedUsers.ContainsKey(contractGroup))
                {
                    // Add to dictionary to maintain list throughout lifecycle
                    connectedUsers.TryAdd(contractGroup, new List<ActiveSignalRUser>());
                }

                await Clients.Caller.SendAsync("ContractOpenedBy", connectedUsers[contractGroup].Where(u => u.UserName != userInfo.DisplayName));
                await Clients.GroupExcept(contractGroup, signalRUser.ConnectionId).SendAsync("ContractOpened", signalRUser);

                if (!connectedUsers[contractGroup].Contains(signalRUser))
                {
                    connectedUsers[contractGroup].Add(signalRUser);
                }

            }
        }

        public async Task CloseContract(int contractId)
        {
            var contractGroup = GetContractGroup(contractId);
            await Groups.RemoveFromGroupAsync(Context.ConnectionId, contractGroup);

            // Check for authentication and write access to send watched by requests
            var userInfo = this.GetUserObjectFromContext();
            if (userInfo?.CanWrite == true)
            {
                // remove from dictionary
                var userRecord = connectedUsers[contractGroup].Where(c => c.ConnectionId == Context.ConnectionId).FirstOrDefault();
                if(userRecord != null)
                {
                    connectedUsers[contractGroup] = connectedUsers[contractGroup].Where(c => c.ConnectionId != Context.ConnectionId).ToList();
                }
                await Clients.Group(contractGroup).SendAsync("ContractClosed", userRecord);
            }
        }

        //public async Task OpenContractHeartbeat(int contractId)
        //{
        //    var contractGroup = GetContractGroup(contractId);
        //    var user = connectedUsers[contractGroup].Where(c => c.ConnectionId == Context.ConnectionId).FirstOrDefault();
        //    if(user != null)
        //    {
        //        user.LastHeartbeat = DateTime.Now;
        //    }
        //    foreach(var item in connectedUsers[contractGroup].Where(c => c.LastHeartbeat < DateTime.Now.AddSeconds(-20)))
        //    {

        //    }
        //}

        public async Task ContractUpdated(int contractId)
        {
            await Clients.Group(GetContractGroup(contractId)).SendAsync("ContractUpdated");
        }

        public override Task OnDisconnectedAsync(Exception exception)
        {
            foreach(var key in connectedUsers.Keys)
            {
                var connection = connectedUsers[key].Where(u => u.ConnectionId == Context.ConnectionId).FirstOrDefault();
                if(connection != null)
                {
                    connectedUsers[key].Remove(connection);
                    Clients.Group(key).SendAsync("ContractClosed", connection).Wait();
                }

            }
            return null;
        }

        private UserInfo GetUserObjectFromContext()
        {

            var identity = Context.User.Identity.Name;
            var appUser = _context.Users
                                  .Where(a => a.UserName.Equals(identity, StringComparison.CurrentCultureIgnoreCase))
                                  .Include(a => a.Roles)
                                  .Include(a => a.FirewallWhiteList)
                                  .Include(a => a.AssignedStatus).FirstOrDefault();
            if(appUser == null)
            {
                return null;
            }
            return new UserInfo(appUser.UserName, appUser.DisplayName, appUser);
        }

        private string GetContractGroup(int contractId)
        {
            return $"ContractOpen-{contractId}";
        }
    }
}

and finally, the errors i've been experiencing

image

Odonno commented 4 years ago

Well, thanks for sharing the detailed error.

I can see now you have 2 errors:

  1. you can't sucessfully connect to the SignalR server (due to CORS issue I suppose)
  2. you invoke/send data the SignalR hub but it is not in the connected state (must a mistake in your app where you don't check/wait until the hub is connected)

So, I cannot do anything for the second issue but for the 1st one, if it is really due to CORS, please check this file:

https://github.com/Odonno/ngrx-signalr-core/blob/master/samples/realtimeFeed/RealtimeFeed.Api/Startup.cs#L35-L42

If it does not, there is an option to skipNegotiation in the client-side of SignalR Core.

Can you try?

patch321 commented 4 years ago

Here's my Startup.cs

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, Db.ContractContext context)
        {
            loggerFactory.AddConsole(Configuration.GetSection("Logging"));
            loggerFactory.AddDebug();

            context.Database.Migrate();

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
                app.UseCors(builder =>
                            {
                                builder
                                    .WithOrigins("http://localhost:57704", "http://localhost:4200")
                                    .SetIsOriginAllowed(_ => true)
                                    .AllowAnyOrigin()
                                    .AllowAnyHeader()
                                    .AllowAnyMethod()
                                    .AllowCredentials();
                            });

            }
            else
            {
                app.UseExceptionHandler("/Home/Error");
            }

            app.ConfigureSwaggerApp();

            app.UseDefaultFiles();
            app.UseStaticFiles();
            app.UseMiddleware<SignalRMiddleware>();
            app.UseAuthentication();
            app.UseSignalR(routes =>
            {
                routes.MapHub<ContractHub>("/contractHub");
            });
            app.UseMvc();
            app.UseWebSockets();
        }

and my SignalRMiddleware.cs

public class SignalRMiddleware
    {
        private readonly RequestDelegate _next;

        public SignalRMiddleware(RequestDelegate next)
        {
            _next = next;
        }

        public async Task Invoke(HttpContext httpContext)
        {
            var request = httpContext.Request;

            // web sockets cannot pass headers so we must take the access token from query param and
            // add it to the header before authentication middleware runs
            if (request.Path.StartsWithSegments("/contractHub", StringComparison.OrdinalIgnoreCase) &&
                request.Query.TryGetValue("access_token", out var accessToken))
            {
                request.Headers.Add("Authorization", $"Bearer {accessToken}");
            }

            await _next(httpContext);
        }
    }

Also, here's my network tab. It seems like the only endpoint getting blocked by CORS is my hub. It also seems to be the only endpoint that isn't routing through my frontend (localhost:4200). I'm not sure how relevant that is.

I'll look into how to add the 'skipNegotiation' option and report back. image

patch321 commented 4 years ago

Where would I add the 'skipNegotiation' option while still using ngrx-signlar-core?

Odonno commented 4 years ago

Good question. For this, you should use the createSignalRHub action. There is a property inside you can use to SignalR Core options. Within this property, you can set the options you normally pass to a SignalR Core hub (https://github.com/dotnet/aspnetcore/blob/3fe9012821e54797ac123bc8704106d24d1a1c1a/src/SignalR/clients/ts/signalr/src/IHttpConnectionOptions.ts).

Here are the details of the createSignalRHub action: https://github.com/Odonno/ngrx-signalr-core/blob/90938e6aa590b1e7c4762a57eb4f14f3a9ff85e8/src/projects/ngrx-signalr-core/src/lib/actions.ts#L9

So, based on your example:

const hub = { 
  hubName: 'contractHub', 
  url: ${environment.apiUrl}/contractHub, 
  options: { 
    skipNegotiation: true,
    accessTokenFactory: () => this.tokenService.getToken()
  }, 
};

this.store.dispatch(createSignalRHub(hub))
patch321 commented 4 years ago

Here's the error I'm getting when skipping the negotiation...

[2020-06-10T15:03:37.739Z] Error: Failed to start the connection: Error: Negotiation can only be skipped when using the WebSocket transport directly.

also still getting this error image but no more CORS errors!

Odonno commented 4 years ago

Ah, you also have to set the transport property in the options.

Odonno commented 4 years ago

It should be something like this:

transport: HttpTransportType.WebSockets

Odonno commented 4 years ago

So, if you still have the error, can you share the Redux DevTools output again? I believe that you dispatch the action openContract too soon.

patch321 commented 4 years ago

I've got a number of the errors going away but there still seems to be a race condition between establishing the connection and executing other signalR actions. Is there anyway to set up an observable that can be updated on a successful connection?

something like this.store.dispatch(createSignalRHub(this.signalRService.hub)).then(result => //update connection status);

Odonno commented 4 years ago

@patch321 I do not know how you architectured your application. The first thing you need to know is that you need to handle every event with rxjs, like in any @ngrx app.

So, one solution would be to skip actions until the hub is connected:

contractOpened$ = createEffect(
    () =>
        this.actions$.pipe(
            skipUntil(
                this.store.pipe(
                    select(selectAreAllHubsConnected),
                    filter(areAllHubsConnected => areAllHubsConnected)
                )
            ), // we skip every action until the hubs are in 'connected' state
            ofType(contractActions.openContract),
            mergeMapHubToAction(({ hub, action }) => {
                hub.send('OpenContract', action.contractId);
            })
        ),
    { dispatch: false }
);

For this, you will have to inject the store in your Effect.

But again, this is one of many solution. It depends what you want to do.

patch321 commented 4 years ago

So that took care of all the problems except for the race condition problem which i solved by creating an observable that fires on the 'signalRConnected' event. Everything looks to be working perfectly on local. However on our test environment after deploying to the server, i'm getting this error.

"Error: Failed to start the connection: Error: There was an error with the transport."

Odonno commented 4 years ago

Well, since you switched to websockets only, you may experience connection issues in some circumstances. What I suggest you is to have only websockets in local and enable all transports in every other environment. You can condition that with the help of environment.ts files.

Anyway, I suppose your test environment may not allow websockets. Check your server configuration if you absolutely want websockets to work.

patch321 commented 4 years ago

and with that, everything seems to be working properly. I can't tell you how much i appreciate your help. I understand that a lot of this probably fell out of the context of this npm package but you stuck with it and you're all the more awesome for it.

Odonno commented 4 years ago

Yes, like you said, there is a lot of transverse stuff to know and that means we need strong knowledge of SignalR Core, ngrx and rxjs. Each one does a really fantastic job: handling asynchronous streams, managing state and handling realtime events.

I am glad you made it!