thenativeweb / node-cqrs-saga

Node-cqrs-saga is a node.js module that helps to implement the sagas in cqrs. It can be very useful as domain component if you work with (d)ddd, cqrs, eventdenormalizer, host, etc.
http://cqrs.js.org/pages/saga.html
MIT License
61 stars 16 forks source link

removeTimeout stops after getTimeoutCommands and getOlderSagas #42

Closed edro closed 6 years ago

edro commented 6 years ago

When execute saga.removeTimeout() then commit(cb) after getTimeoutCommands() call stops with message: Use commit only to remove a saga or to addCommandToSend! and after call getOlderSagas() with message: Use commit only to remove a saga!'

adrai commented 6 years ago

Can you provide the full snipped?

edro commented 6 years ago

Here is part of my project typescript class with this issue:

class SagaHandler {

...

public handleEvent(evt: any) {
    this.pm.handle(evt, err => {
      if (err) {
        this._logger.debug(err);
      }
    });
  }

  public addSchedule(at: Date): void {
    this._schedule.queue(new Date(at.getTime() + 100));
  }

  private constructor(public readonly pm: ProcessManagement,
                      private readonly _watchInterval: number,
                      private readonly _sagaExpirationSeconds: number,
                      private readonly _commandHandler: (Command) => void,
                      private readonly _logger: ILogger) {
    this._watchId = setInterval(async () => await this._nextTick(), WATCH_INTERVAL);
  }

  private async _nextTick() {
    let now_ms = Date.now();
    if (this._schedule.length === 0) {
      let watch = new Date(now_ms + this._watchInterval);
      this._schedule.queue(watch);
    } else {
      let watch = this._schedule.peek();
      if (watch <= new Date(now_ms)) {
        this._schedule.dequeue();
        await this._executeTimeoutSagas(false);
        await this._removeOlderSagas(new Date(now_ms - this._sagaExpirationSeconds * 1000));
      }
    }
  }

  private _removeOlderSagas(date: Date): Promise<void> {
    return this._getOlderSagas(date)
      .then(sagas => sagas.forEach(saga => {
        this._logger.debug("...process expired saga: %s", saga.id);
        this._runTimeoutCommands(saga);
        this.pm.removeSaga(saga, _dummy);
      }));
  }

  private _executeTimeoutSagas(removeAfterTimeout: boolean): Promise<void> {
    return this._getTimeoutedSagas()
      .then(sagas => sagas.forEach(saga => {
        this._logger.debug("...process timeout saga: %s", saga.id);
        this._runTimeoutCommands(saga);
        saga.removeTimeout();
        if (removeAfterTimeout) {
          saga.destroy();
        }
        saga.commit(_dummy);
      }));
  }

  private _getTimeoutedSagas(): Promise<SagaModel[]> {
    return new Promise<SagaModel[]>((resolve, reject) => this.pm.getTimeoutedSagas((err, sagas) => {
      return err ? reject(err) : resolve(sagas);
    }));
  }

  private _getOlderSagas(date: Date): Promise<SagaModel[]> {
    return new Promise<SagaModel[]>((resolve, reject) => this.pm.getOlderSagas(date, (err, sagas) => {
      return err ? reject(err) : resolve(sagas);
    }));
  }

  private _runTimeoutCommands(saga): void {
    let commands = saga.getTimeoutCommands() || [];
    this._logger.debug("timeout saga commands: %s", commands.length);
    this._dispatchCommands(saga, commands);
  }

  private _dispatchCommands(saga, commands: any[]) {
    for (let cmd of commands) {
      // send command to domain because sagas target is this internal domain
      let cmdName = cmd.command;
      this._logger.debug("saga - push command %s to domain", cmdName);
      this._commandHandler(cmd);
      this.pm.setCommandToDispatched(cmd.id, saga.id, _dummy);
    }
  }
adrai commented 6 years ago

v1.9.1