tsedio / tsed

:triangular_ruler: Ts.ED is a Node.js and TypeScript framework on top of Express to write your application with TypeScript (or ES6). It provides a lot of decorators and guideline to make your code more readable and less error-prone. ⭐️ Star to support our work!
https://tsed.io/
MIT License
2.86k stars 284 forks source link

BullMQ - Flow #2600

Open jonathanroze opened 8 months ago

jonathanroze commented 8 months ago

Is your feature request related to a problem? Please describe.

No response

Describe the solution you'd like

Hi,

I think it will be a good idea to implement new Flow systeme provided by BullMQ in Tsed.

Here documentation: https://docs.bullmq.io/guide/flows

Not enough comfortable with TSED.IO to implement it myself for now, so if someone is interested, it's a pretty cool feature :)

Describe alternatives you've considered

No response

Additional context

No response

Acceptance criteria

No response

Romakita commented 8 months ago

Hello @jonathanroze It's a good idea. I already tried to do that, but after a quick try, I realized that the flows feature cannot be reflected easily using decorators. Currently, you can implement flow programmatically using the $OnInit:

import {FlowProducer} from "bullmq";

@Module()
export class FlowProducerModule {
  $onInit() {
    const flowProducer = new FlowProducer();

    const flow = await flowProducer.add({
      name: "renovate-interior",
      queueName: "renovate",
      children: [
        {name: "paint", data: {place: "ceiling"}, queueName: "steps"},
        {name: "paint", data: {place: "walls"}, queueName: "steps"},
        {name: "fix", data: {place: "floor"}, queueName: "steps"}
      ]
    });
  }
}

or maybe using custom async factory ?

I think the only missing thing is the documentation.

Maybe you have in mind something better ?

See you

jonathanroze commented 8 months ago

Hi @Romakita !

I don't know how to integrate it properly in TSED directly, but I used a service to deal with it.

import { Injectable } from "@tsed/di";
import { FlowJob, FlowProducer } from "bullmq";
import { config } from "../config";

@Injectable()
export class BullMqService {
  private flowProducer: FlowProducer;

  constructor() {
    this.flowProducer = new FlowProducer({
      connection: {
        host: config.redis.host,
        port: config.redis.port,
      },
    });
  }

  async addFlow(flow: FlowJob) {
    try {
      return await this.flowProducer.add(flow);
    } catch (error) {
      console.error("Error adding flow", error);
    }
  }
}

Probably not the best way but it's working great!

Romakita commented 6 months ago

Hello @jonathanroze

Sorry for the late answer.

Instead of using a injectable class to wrap your Flow, you can use a custom factory:

import {Configuration, Injectable, registerProvider} from "@tsed/di";
import { FlowJob, FlowProducer } from "bullmq";

registerProvider({
  provide: FlowProducer,
  deps: [Configuration],
  useFactory: (config: Configuration) => {
    return new FlowProducer({
      connection: {
        host: config.get("redis.host"),
        port: config.get("redis.port"),
      },
    });
  },
})

Note: Ts.ED provide a @tsed/ioredis package,you can use it.

Also,Ts.ED provide Configuration decorator and injectable service. It let you to give the configuration to the server and retrieve it using injectable service. it avoid multiple config import and allow a better separation of concern (and easy to run test).

We can add this example on the doc if the solution is ok for you ;) (PR welcome)

See you