sst / ion

SST v3
https://sst.dev
MIT License
1.58k stars 201 forks source link

Queue message getting lost #992

Closed joao-vitor-felix closed 1 week ago

joao-vitor-felix commented 1 week ago

Hey! I'm going through a problem and I'm putting it out to see if someone can help me.

Before anything, the sst's version on this project is 0.1.21. I'll show you some logs down below, and I've switched sensitive data to dummy data.

So I have a cron job called NotifyComingUpBills.

const {
        AWS_ACCESS_KEY_ID,
        AWS_SECRET_ACCESS_KEY,
        AWS_DEFAULT_REGION,
        ...environment
      } = env

      new sst.aws.Cron('NotifyComingUpBills', {
        job: {
          handler: 'infra/crons/notify-coming-up-bills/cron.handler',
          environment,
          link: [queues.SendWhatsAppMessageRequested],
          timeout: `${5} minute`,
        },
        schedule: 'cron(0 13 * * ? *)',
      })

Here's a piece of the code inside this cron job, it basically fetches users with bills expiring on the determined day and sends a message to them.

  async function notifyComingUpTransactions(upToDays: number) {
    let startAt = 0
    const limit = 100
    let hasMore = true

    while (hasMore) {
      const today = dayjs()
      const paydayDate = today.add(upToDays, 'day').format('YYYY-MM-DD')

      const response = await api.get<CelcashResponse>(
        `/transactions?status=pendingBoleto&payDayFrom=${paydayDate}&payDayTo=${paydayDate}&startAt=${startAt}&limit=${limit}`,
        { headers }
      )

      if (response.data.Transactions.length > 0) {
        for (const transaction of response.data.Transactions) {
          const invalidStatus: Transaction['Subscription']['status'][] = [
            'canceled',
            'closed',
            'inactive',
          ]

          if (
            !transaction.Subscription ||
            invalidStatus.includes(transaction.Subscription.status)
          )
            continue

          console.info('Transaction:', transaction)

          await sqs.send(
            makeSendWhatAppMessageRequestedCommand({
              url: env.BOTCONVERSA_NOTIFY_ON_BILL_DUE_DATE_COMING_UP_WEBHOOK_URL,
              body: {
                'nome-completo': transaction.Subscription.Customer.name,
                telefone:
                  transaction.Subscription.Customer.phones[0].toString(),
                'boleto-vencimento': transaction.payday
                  .split('-')
                  .reverse()
                  .join('/'),
                'boleto-valor': formatCurrency(transaction.value / 100),
                'boleto-link': transaction.Boleto.pdf,
                'boleto-qrcode-pix': transaction.Pix.qrCode,
                'dias-para-vencimento': upToDays,
              },
            })
          )

          console.info(
            `Notified ${transaction.Subscription.Customer.name} through WhatsApp on phone ${transaction.Subscription.Customer.phones[0]} about pending bill expiring within ${upToDays} days.`
          )
        }

        startAt += limit
      } else {
        hasMore = false
      }
    }
  }

  await Promise.allSettled([
    notifyExpiredTransactions(3),
    notifyComingUpTransactions(1),
    notifyComingUpTransactions(5),
  ])

console.info('All pending bills coming up have been notified.')

notifyExpiredTransactions is just a slight variation of notifyComingUpTransactions. I'll show you the queue I'm calling inside the cron job. This queue just sends a message to someone through WhatsApp.

import { SendMessageCommand } from '@aws-sdk/client-sqs'
import { Resource } from 'sst'

import {
  SendWhatsAppMessageRequestedSchema,
  sendWhatsAppMessageRequestedSchema,
} from './schema'

export const makeSendWhatAppMessageRequestedCommand = (
  message: SendWhatsAppMessageRequestedSchema
) => {
  sendWhatsAppMessageRequestedSchema.parse(message)
  return new SendMessageCommand({
    QueueUrl: Resource.SendWhatsAppMessageRequested.url,
    MessageBody: JSON.stringify(message),
  })
}
import { SQSHandler } from 'aws-lambda'
import axios from 'axios'

import {
  SendWhatsAppMessageRequestedSchema,
  sendWhatsAppMessageRequestedSchema,
} from './schema'

export const handler: SQSHandler = async (event) => {
  const message: SendWhatsAppMessageRequestedSchema = JSON.parse(
    event.Records[0].body
  )

  console.info('Send WhatsApp Message Requested:', message)

  sendWhatsAppMessageRequestedSchema.parse(message)

  await axios.post(message.url, {
    ...message.body,
  })

  console.info('WhatsApp message sent to:', message.body.telefone)
}

Here are the logs from the cron job execution on 09/04.

image

So you can see that it logs the notified info, which means it sends to the WhatsApp queue, here's the logs from WhatsApp queue between 09/03 and 09/05 (Notice that there are only 2 executions from 09/04).

image

Here are both SendWhatsAppMessageRequested executions from 09/04

image

I called it SendWhatsAppMessageRequested 5 times inside my cron job on 09/04, but only 2 messages have arrived on the actual queue, and there are no error logs or anything, it has just disappeared.

To finish it, I'll let you know how the queue is being created (I've omitted all queues but SendWhatsAppMessageRequested just to demonstrate it).

export type QueueName =
  | 'AdmissionFormSendingRequested'
  | 'CreditAnalysisRequested'
  | 'CreditAnalysisSent'
  | 'AdmissionJourneyEventCreated'
  | 'CreditAnalysisResultReceived'
  | 'GuarantorCreditAnalysisRequested'
  | 'GuarantorCreditAnalysisSent'
  | 'UpsertPatientOnCelcoinRequested'
  | 'UpsertPatientOnCelcashRequested'
  | 'UpsertGuarantorOnCelcashRequested'
  | 'UpsertGuarantorOnCelcoinRequested'
  | 'SendWhatsAppMessageRequested'
  | 'UploadPaymentReceipt'

export type Queues = {
  [key in QueueName]: sst.aws.Queue
}

const createQueue = ({
  queueName,
  queueArgs,
}: {
  queueName: QueueName
  queueArgs?: QueueArgs
}): sst.aws.Queue => {
  const dlq = new sst.aws.Queue(`${queueName}Dlq`)
  const queue = new sst.aws.Queue(queueName, {
    transform: {
      queue: {
        name: queueName,
        redrivePolicy: pulumi.jsonStringify({
          deadLetterTargetArn: dlq.arn,
          maxReceiveCount: 5,
        }),
        ...queueArgs,
      },
    },
  })
  new aws.sqs.RedriveAllowPolicy(`${queueName}RedriveAllowPolicy`, {
    queueUrl: dlq.url,
    redriveAllowPolicy: pulumi.jsonStringify({
      redrivePermission: 'byQueue',
      sourceQueueArns: [queue.arn],
    }),
  })
  return queue
}

export const createQueues = (): Queues => {
  const visibilityTimeoutSeconds = 60

  const queueArgs: QueueArgs = {
    visibilityTimeoutSeconds,
  }

  return {
    SendWhatsAppMessageRequested: createQueue({
      queueName: 'SendWhatsAppMessageRequested',
      queueArgs,
    }),
  }
}

If you need further information, let me know! This issue is happening in production and I would appreciate your help!

thdxr commented 1 week ago

if i'm understanding right it looks like you're doing Records[0] - the messages are sometimes batched you have to iterate over all of them

joao-vitor-felix commented 1 week ago

if i'm understanding right it looks like you're doing Records[0] - the messages are sometimes batched you have to iterate over all of them

Thank you for your answer (and fast answer, hahaha)! It actually makes sense, I'll try it and I'll come back here to let you know if it works

joao-vitor-felix commented 1 week ago

@thdxr Hey, it seems like that's what was happening to me, I'll leave a pic down below.

I don't know why, but apparently, when I send many messages, sometimes it puts them all together into one single array instead of one message per time, some of them were send into a single array, though.

I didn't know that, is it a default AWS behavior? I mean batch them into one single array when it comes to many messages being sent? Do you know where I can read more about it?

Anyway, it seems like it'll solve my problem, I'll let you know if something goes wrong, but I don't think it will.

Once again, thank you!

image

Here's my final code for the queue:

import { SQSHandler } from 'aws-lambda'
import axios from 'axios'

import {
  SendWhatsAppMessageRequestedSchema,
  sendWhatsAppMessageRequestedSchema,
} from './schema'

export const handler: SQSHandler = async (event) => {
  const messages: SendWhatsAppMessageRequestedSchema[] = event.Records.map(
    (record) => JSON.parse(record.body)
  )

  console.info('Send WhatsApp Message Requested:', messages)

  for (const message of messages) {
    try {
      sendWhatsAppMessageRequestedSchema.parse(message)
      await axios.post(message.url, {
        ...message.body,
      })
      console.info('WhatsApp message sent to:', message.body.telefone)
    } catch (error) {
      console.error('Error processing message:', message, error)
    }
  }
}

EDIT: Found an explanation about batching behavior here, thank you, Dax!