timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
2.04k stars 157 forks source link

How to use database operations #203

Closed Israel001 closed 3 years ago

Israel001 commented 3 years ago

Please can you show an example of how to connect to PostgreSQL in a job handler. I am using this library with NestJS and i want to connect to PostgreSQL and perform database operations inside a job handler but i can't find how to do that anywhere in the "usage" documentation.

I am using TypeORM to connect to PostgreSQL in the app, so if it supports TypeORM, a TypeORM example will be nice (:, if it doesn't, any example that works is fine.

Thanks in advance and great library by the way!

timgit commented 3 years ago

pg-boss is only meant to offer an abstraction over its small set of tables, not a generic way of querying your database. Does that answer your question? You should use TypeORM to issue arbitrary queries in your job handlers

Israel001 commented 3 years ago

@timgit Okay, i haven't really tried that so i didn't know if it will work or not. That's why i asked.

Anyway, while setting up the library in my NestJS project, i got an error that says TypeError: this.workerInstance.publish is not a function

This is my setup:

providers/queue.provider.ts

import PgBoss from 'pg-boss';
import { Provider } from '@nestjs/common';
import { queueConfig } from '../config/queue.config';

export class Queue {
  private static boss: PgBoss = new PgBoss(queueConfig);

  static async startWorker() {
    await Queue.boss.start();
  }

  static getWorkerInstance() {
    return Queue.boss;
  }
}

export const QueueProvider: Provider = {
  provide: Queue,
  useValue: Queue,
};

user/user.module.ts

import { Module } from '@nestjs/common';
import { UserService } from './user.service';
import { UserResolver } from './user.resolver';
import { TypeOrmModule } from '@nestjs/typeorm';
import { UserRepository } from './user.repository';
import { QueueProvider } from '../providers/queue.provider';
import { PgBossProvider } from '../providers/pg-boss.provider';

@Module({
  imports: [TypeOrmModule.forFeature([UserRepository])],
  providers: [PgBossProvider, QueueProvider, UserResolver, UserService],
})
export class UserModule {}

user/user.service.ts

import { Injectable } from '@nestjs/common';
import { UserRepository } from './user.repository';
import { InjectRepository } from '@nestjs/typeorm';
import { User } from './user.entity';
import PgBoss from 'pg-boss';
import { Queue } from '../providers/queue.provider';

@Injectable()
export class UserService {
  constructor(
    @InjectRepository(UserRepository)
    private userRepository: UserRepository,
    private workerInstance: PgBoss = Queue.getWorkerInstance(),
  ) {}

  async getAllNames(): Promise<User[]> {
    return await this.userRepository.find();
  }

  async saveName(name: string): Promise<User> {
    const user: User = new User();
    console.log(this.workerInstance);
    await this.workerInstance.publish({
      name: 'TEST-JOB-QUEUE',
      data: { name },
    });
    await this.workerInstance.subscribe('TEST-JOB-QUEUE', async (job: any) => {
      console.log(
        `We got into the queue with the following data: ${job.data.name}`,
      );
      user.jobName = `Job Name - ${job.data.name}`;
      await this.workerInstance.unsubscribe('TEST-JOB-QUEUE');
      job.done();
    });
    user.name = name;
    return await this.userRepository.save(user);
  }
}

console.log(this.workerInstance) prints the following into the console:

[class PgBoss extends EventEmitter] {
  states: {
    created: 'created',
    retry: 'retry',
    active: 'active',
    completed: 'completed',
    expired: 'expired',
    cancelled: 'cancelled',
    failed: 'failed'
  }
}

I already called await Queue.startworker() in the bootstrap function.

Please do you know why i am getting this error.

Israel001 commented 3 years ago

Okay, i found the issue, i had to call start() right before publish(). Everything is working fine now, and i can still use the TypeORM objects to save data into PostgreSQL.