stomp-js / rx-stomp

STOMP adaptor for RxJS
Apache License 2.0
112 stars 21 forks source link

ERROR Error: InvalidStateError: The connection has not been established yet #407

Closed hadestructhor closed 2 years ago

hadestructhor commented 2 years ago

Please see FAQs at https://stomp-js.github.io/faqs/2019/05/20/faqs.html before reporting.

I have tried to start a watch once and only once the websocket is connected.

This is the function I call once the user is authenticated:

  initStomp() {
    const stompConfig: InjectableRxStompConfig = Object.assign({}, RxStompConfig, {
      connectHeaders: {
          access_token: this.auth_token
      }
    });

    this.rxStompService.connected$.pipe(take(1)).subscribe(() => {
      this.isStompInitialized = true;
      this.nextIsStompInitialized();
    });

    this.rxStompService.configure(stompConfig);
    this.rxStompService.activate();
  }

I change the value of isStompInitialized to true in the connected pipe and this is how it's emitted to all the other services:

  isStompInitialized: boolean = false;
  isStompInitializedSource: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(this.isStompInitialized);
  currentIsStompInitialized$ = this.isStompInitializedSource.asObservable();

  nextIsStompInitialized() {
    this.isStompInitializedSource.next(this.isStompInitialized);
  }

Here's an extract of how I subscribe to currentIsStompInitialized$ in my other services and call the watch function on the rxStompService :

constructor(
    private http: HttpClient, 
    private rxStompService: RxStompService,
    private userService: UserService
  )
  {
    this.userSubscription = this.userService.currentIsStompInitialized$.subscribe(initialized => {
      if(initialized == true) this.initializePeopleAndSocket();
    })
  }

  initializePeopleAndSocket() {
    this.rxStompService.watch(environment.PEOPLE_NOTIFICATIONS).subscribe((message: Message) => {
      let parsedPeople : PersonDto[] = JSON.parse(message.body);
      parsedPeople.map(persDto => {
        this.deletePersonFromListById(persDto.id)
        this.addPersonToLists(PersonDto.toModel(persDto))
      })
    })
    })
  }

Most of the time this code works fine but sometimes the connection isn't set and I get the following errors repeated multiple times :

ERROR Error: InvalidStateError: The connection has not been established yet
    at SockJS.push.62809.SockJS.send (main.js:147)
    at StompHandler._transmit (stomp.umd.js:1812)
    at StompHandler.subscribe (stomp.umd.js:1882)
    at Client.subscribe (stomp.umd.js:573)
    at SafeSubscriber._next (rx-stomp.umd.js:710)
    at SafeSubscriber.__tryOrUnsub (Subscriber.js:183)
    at SafeSubscriber.next (Subscriber.js:122)
    at Subscriber._next (Subscriber.js:72)
    at Subscriber.next (Subscriber.js:49)
    at FilterSubscriber._next (filter.js:33)
hadestructhor commented 2 years ago

The error seemed to happen because my initStomp method fired three times. I have added a check when it's called to see if it's the first time or not and now it works fine.