node-ts / bus

A typescript based enterprise service bus framework based on enterprise integration patterns
https://bus.node-ts.com/
MIT License
270 stars 26 forks source link

Workflows horizontal scaling #134

Closed valdestron closed 3 years ago

valdestron commented 3 years ago

Lets say I have a workflows service.

In there i have 1 workflow with some handlers.

I deployed this service in k8s, scaled up to 2 pods, rabbitmq as a message broker, postgres as db.

Start 2 or more workflows at the time, pods will try to upsert/retrieve workflowdata.

You will see the following errors:

Basically race condition

Although seems with all the errors and retries service will finish the job of the workflow, it takes lots of time to complete with all the retries, also very confusing and does not look like scalable.

Any ideas how to scale horizontally ?

adenhertog commented 3 years ago

It does seem something's not quite right. Could you provide a copy of what your workflow looks like? I'm curious to know how you're seeing stale update errors as there's only a couple of ways this might occur.

Also is your postgres persistence shared amongst all pods?

valdestron commented 3 years ago

Yes both pods are identical they connect to the same postgress conneciton pool.

import { Workflow, StartedBy, Handles } from '@node-ts/bus-workflow'
import { injectable, inject } from 'inversify'
import { GitopsInstallWorkflowData } from './Data'
import { LOGGER_SYMBOLS, Logger } from '@node-ts/logger-core'
import { BUS_SYMBOLS, Bus } from '@node-ts/bus-core'
import {
  GitopsInstallWorkflowStart,
  GitopsInstallWorkflowHandleValidation,
  GitopsInstallWorkflowValidationHandled,
  GitopsInstallWorkflowHandlePreparation,
  GitopsInstallWorkflowPreparationHandled,
  GitopsInstallWorkflowMCLDUpdateHandled,
  GitopsInstallWorkflowHandleMCLDUpdate,
  GitopsInstallWorkflowHandleMCLDUpdateWatch,
  GitopsInstallWorkflowMCLDUpdateWatchHandled,
  GitopsInstallWorkflowStatus,
  GitopsWorkflowStages,
} from '@commons/events-commands'

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

@injectable()
export class GitopsWorkflow extends Workflow<GitopsInstallWorkflowData> {
  stagesCount: number = Object.keys(GitopsWorkflowStages).length / 2
  workflowName = 'Atlantis Gitops Workflow'

  constructor(
    @inject(BUS_SYMBOLS.Bus) private readonly bus: Bus,
    @inject(LOGGER_SYMBOLS.Logger) private readonly logger: Logger,
  ) {
    super()
  }

  /**
   * Starts new Gitops Installation to AWS Account workflow
   */
  @StartedBy<GitopsInstallWorkflowStart, GitopsInstallWorkflowData, 'startGitopsWorkflow'>(GitopsInstallWorkflowStart)
  async startGitopsWorkflow(event: GitopsInstallWorkflowStart): Promise<Partial<GitopsInstallWorkflowData>> {
    const { username, awsAccount, awsAccountEmail } = event
    const description = `Atlantis Gitops Workflow started by ${username} and will be isntalled to AWS account ${awsAccount}`
    this.logger.info(description)

    await this.bus.send(new GitopsInstallWorkflowHandleValidation(awsAccount))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.started,
        this.stagesCount - 1,
        'Initial Start',
        description,
      ),
    )

    return {
      username,
      awsAccount,
      awsAccountEmail,
      workflowValid: false,
      accountVariables: {},
      mcldGitopsPRUpdateUrl: '',
      mcldGitopsPRMerged: false,
    }
  }

  @Handles<
    GitopsInstallWorkflowValidationHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowValidationResponse'
  >(GitopsInstallWorkflowValidationHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowValidationResponse(
    eventData: GitopsInstallWorkflowValidationHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received validation response: ${awsAccount} validity is: ${eventData.workflowValid}`
    this.logger.info(description)
    if (!eventData.workflowValid) {
      // TODO: send command to complete workflow if eventData.workflowValid = false
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }
    await sleep(10000)
    // If validation passed, send message to preparation handler and update workflow state data with validity
    await this.bus.send(new GitopsInstallWorkflowHandlePreparation(awsAccount))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.validated,
        this.stagesCount - 2,
        'Account Validation',
        description,
      ),
    )
    return { workflowValid: eventData.workflowValid }
  }

  @Handles<
    GitopsInstallWorkflowPreparationHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowPreparationResponse'
  >(GitopsInstallWorkflowPreparationHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowPreparationResponse(
    eventData: GitopsInstallWorkflowPreparationHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received preparation response for ${awsAccount}`
    this.logger.info(description)
    if (!eventData.accountVariables) {
      // TODO: send command to complete workflow if eventData.accountVariables = null
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }
    await sleep(10000)
    // If preparation passed, send message to update MCLD handler and update workflow state data with validity
    await this.bus.send(new GitopsInstallWorkflowHandleMCLDUpdate(awsAccount, eventData.accountVariables))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.prepared,
        this.stagesCount - 3,
        'Account Preconfiguration',
        description,
      ),
    )
    return {
      accountVariables: eventData.accountVariables,
    }
  }

  @Handles<
    GitopsInstallWorkflowMCLDUpdateHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowMCLDUpdateResponse'
  >(GitopsInstallWorkflowMCLDUpdateHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowMCLDUpdateResponse(
    eventData: GitopsInstallWorkflowMCLDUpdateHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received update MCLD response for ${awsAccount}`
    this.logger.info(description)

    if (!eventData.mcldGitopsPRUpdateUrl) {
      // TODO: send command to complete workflow if eventData.accountVariables = null
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }
    await sleep(10000)
    // If mcld update passed, send message to update MCLD watch handler and update workflow state data with validity
    await this.bus.send(new GitopsInstallWorkflowHandleMCLDUpdateWatch(awsAccount, eventData.mcldGitopsPRUpdateUrl))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.iacUpdated,
        this.stagesCount - 4,
        'MCLD Infrastructure As Code',
        description,
      ),
    )

    return {
      mcldGitopsPRUpdateUrl: eventData.mcldGitopsPRUpdateUrl,
    }
  }

  @Handles<
    GitopsInstallWorkflowMCLDUpdateWatchHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowMCLDWatchResponse'
  >(GitopsInstallWorkflowMCLDUpdateWatchHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowMCLDWatchResponse(
    eventData: GitopsInstallWorkflowMCLDUpdateWatchHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received MCLD watch update response for ${eventData.awsAccount}`
    this.logger.info(description)

    if (!eventData.mcldGitopsPRMerged) {
      // TODO: send command to complete workflow if eventData.accountVariables = null
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }

    this.logger.info(`Atlantis Gitops Workflow was completed. Initiated by: ${username}, Aws Account: ${awsAccount} `)
    await sleep(10000)
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.completed,
        this.stagesCount - 5,
        'Infrastructure Changes landed',
        description,
      ),
    )

    return this.complete({ mcldGitopsPRMerged: eventData.mcldGitopsPRMerged })
  }
}
adenhertog commented 3 years ago

Cool, I had a look through it and have a couple of thoughts:

Would the sleep() calls inside the handlers be the cause of any of the stale data errors? Whilst a handler is sleeping, any other message that is handled by the workflow in the meantime could cause the underlying data to change and throw this error when the handler finally resumes and returns.

Another thought is how an event is mapped to a workflow:

(GitopsInstallWorkflowPreparationHandled, (event) => event.awsAccount, 'awsAccount')

For this to work without collisions, the assumption is that only one workflow instance is ever running for a single awsAccount. If this is not the case (ie: you can have multiple concurrent gitops workflows per aws account), i'd suggest mapping based on a workflow-specific correlation id in the message attributes.

Interested to know your thoughts

valdestron commented 3 years ago

Thanks a lot, yes you are right the problem was with the unique id, I was testing on the same id which will never be like that!

Now everything works as expected.