crallen / nestjs-google-pubsub

Subscribe to Google Cloud PubSub topics using NestJS's EventMessage handlers.
MIT License
13 stars 5 forks source link

@EventPattern(topicName) after a while not receving any messages #8

Open iluchian opened 2 years ago

iluchian commented 2 years ago

Hi, i am using version nestjs-google-pubsub=0.1.2, its working as expected, but after a few days the subscription is not getting any messages, looks like is freezing, but publishing messages in same topic works, after a while getting like 1 or 2 messages and again freezing and can not understand what is going wrong.

Could you help to find out the problem.

here is the microservice creation

async function bootstrap() {
  const app = await NestFactory.create(AppModule)
  const {ENV_NAME, LISTEN_PORT, GOOGLE_APPLICATION_CREDENTIALS} = app.get(ConfigService).env

  const rawGoogleCredentials = Buffer.from(GOOGLE_APPLICATION_CREDENTIALS, 'base64')
  const googleCredentials: PubSubGoogleCredentials = JSON.parse(rawGoogleCredentials.toString('utf-8'))
  const subscriptionId = buildSubscription(ENV_NAME, GOOGLE_PUB_SUB_SUBSCRIPTIONS.formAnswerModeratedThemis)
  const topicId = buildTopic(ENV_NAME, GOOGLE_PUB_SUB_TOPICS.formAnswerModerated)

  app.connectMicroservice({
    strategy: new PubSubServer({
      projectId: googleCredentials.project_id,
      credentials: {
        client_email: googleCredentials.client_email,
        private_key: googleCredentials.private_key
      },
      autoRetry: true,
      topics: {
        [topicId]: {subscriptionId}
      },
    })
  });

  app.useGlobalPipes(
      new ValidationPipe({
        transformOptions: {
          enableImplicitConversion: true
        },
      }),
  );
  app.use(bodyParser.json({limit: '50mb'}))
  app.use(bodyParser.urlencoded({limit: '50mb', extended: true}))

  await app.startAllMicroservices();
  await app.listen(LISTEN_PORT)
}

bootstrap()

here is my function for listening the subscription

@EventPattern(topicName)
  async handleEvent(rawEventData: Message) {

  const eventDataString = Buffer.from(rawEventData.data).toString('utf-8')

  Logger.log({
    status: 'ok',
    description: `Received data from topic ${topicName}`,
  })

  // custom logic goes here

  rawEventData.ack()
}

Could u point me what is wrong with my code?

as well here are the list of my packages for my app

"dependencies": {
    "@hapi/joi": "16.1.7",
    "@nestjs/common": "8.1.1",
    "@nestjs/core": "8.1.1",
    "@nestjs/jwt": "6.1.1",
    "@nestjs/microservices": "8.1.1",
    "@nestjs/passport": "6.1.1",
    "@nestjs/platform-express": "6.7.2",
    "@nestjs/swagger": "4.8.0",
    "@nestjs/typeorm": "7.1.5",
    "@nestjsx/crud": "5.0.0-alpha.3",
    "@nestjsx/crud-typeorm": "5.0.0-alpha.3",
    "@tj-core/prepare-commit-msg": "4.1.0",
    "axios": "0.24.0",
    "class-transformer": "0.4.0",
    "class-validator": "0.13.1",
    "date-fns": "2.9.0",
    "dotenv": "8.2.0",
    "env-cmd": "10.1.0",
    "fastify-swagger": "4.7.0",
    "nestjs-command": "1.3.0",
    "nestjs-google-pubsub": "0.1.2",
    "passport": "0.4.0",
    "passport-jwt": "4.0.0",
    "passport-local": "1.0.0",
    "pg": "7.14.0",
    "reflect-metadata": "0.1.13",
    "rimraf": "3.0.0",
    "rxjs": "7.4.0",
    "swagger-ui-express": "4.1.6",
    "transliteration": "2.2.0",
    "tsc-watch": "4.0.0",
    "typeorm": "0.2.37",
    "typescript": "4.4.4"
  }

as well i need to publish messages in same topic as my subscriptio from above, for this i did a separate code for it, just pasting here because may be from second code is causing issue to listenng for messages

pub.sub.service.ts

import {Logger} from '@nestjs/common'
import { PubSub } from '@google-cloud/pubsub';
import {PubSubGoogleCredentials} from '../common/interfaces';
import {ConfigService} from '../config/config.service';
import {buildTopic} from '../common/helpers/tests/build-topic';

const {ENV_NAME, GOOGLE_APPLICATION_CREDENTIALS} = ConfigService.createFromDotEnvOrProcessEnv().env
const rawGoogleCredentials = Buffer.from(GOOGLE_APPLICATION_CREDENTIALS, 'base64')
const googleCredentials: PubSubGoogleCredentials = JSON.parse(rawGoogleCredentials.toString('utf-8'))

export class PubSubService {
  constructor(
    private readonly pubSubClient = new PubSub({
      projectId: googleCredentials.project_id,
      credentials: {
          client_email: googleCredentials.client_email,
          private_key: googleCredentials.private_key
        }
    }),
  ) {}

  public async publishMessage(data: object, topic: string): Promise<void> {
    const dataBuffer = Buffer.from(JSON.stringify(data));
    const topicName = buildTopic(ENV_NAME, topic)
    const date = new Date()
    try {
      const messageId = await this.pubSubClient.topic(topicName).publish(dataBuffer);
      Logger.log({
        status: 'ok',
        description: 'Message published',
        messageId,
        data,
        date
      });
    } catch (error) {
      Logger.log(`Received error while publishing: ${error.message}`);
      Logger.log({
        status: 'error',
        error: error.message,
        data,
        date
      });
      process.exitCode = 1;
    }
  }

}

pub.sub.module.ts

import {Module} from '@nestjs/common'
import {ConfigModule} from '../config/config.module'
import {PubSubService} from './pub.sub.service';

@Module({
  imports: [
    ConfigModule,
  ],
  controllers: [],
  providers: [PubSubService],
  exports: [PubSubService],
})
export class PubSubModule {}

app.module.ts

@Module({
  imports: [
    TypeOrmModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => typeormCreateConfig(configService),
      inject: [ConfigService],
    }),
    ConfigModule,
    PubSubModule
  ]
})
export class AppModule {}**

some more information when my subscription stops receiving messages, if i try to publish a message is working pub sub, only listening messages after a day something like this stops receiving messages, messages are in the subscription but not getting any messages.

Thanks a lot in advance for any help