Papooch / nestjs-cls

A continuation-local storage (async context) module compatible with NestJS's dependency injection.
https://papooch.github.io/nestjs-cls/
MIT License
434 stars 28 forks source link

Cannot setup correctly to use with nestjs + bullmq #76

Closed eliotik closed 1 year ago

eliotik commented 1 year ago

Hello,

We recently added bullmq into our nestjs project. Our project has ClsModule configured in AppModule imports as:

  imports: [
    ClsModule.forRoot({
      guard: {
        generateId: true,
        mount: true,
      },
      global: true,
    }),
  ],

And we have custom ClsStore interface:

export interface DataStore extends ClsStore {
  testRunSuffix: string;
}

it is used in our service as:

@Injectable()
export class DataStoreService {
  constructor(private readonly cls: ClsService<DataStore>) {}

  public get testRunSuffix(): string {
    return this.cls.get('testRunSuffix');
  }

  public set testRunSuffix(suffix: string) {
    this.cls.set('testRunSuffix', suffix);
  }
}

We have a controller with ClsGuard as:

@UseGuards(ClsGuard)
@Controller()
export class JobsController {
//...
}

We dont use decorators, or middlewares. We inject class DataStoreService to other providers/services where we want to use store. Before introducing BullMQ our flow was next:

Our listeners have many other services injected. Each service represents some feature domain. In each domain feature service we have injected DataStoreService too. Listener can set in DataStoreService some data before executing feature services. Feature services can read and set some data to store too so other feature services can access this data from the DataStoreService. It worked well.

Problem was that with eventemmitter we were not able to control how many events we want to process concurrently. We added BullMQ(not bull), implemented queues, processors(workers). Flow now is next:

Processor has DataStoreService injected. Processor has all the feature services injected. Processor sets data to DataStoreService. Feature services try to read data from DataStoreService and it's undefined.

I tried to add @UseCls decorator to the Processor->process method and also to all feature service method which executed from Processor but it didn't help.

We have another flow where instead of Controller endpoint we have a @Cron decorator on specific method in Controller. On the schedule this method does the same as endpoint described above:

Non of the described flows work with DataStoreService. Data which we set in Processor->process method is undefined when we try to read it from feature service. Code how our processor and feature services look like:

@UseGuards(ClsGuard)
@Controller()
export class JobsController {
  constructor(
    private readonly jobService: JobsService,
  ) {}

  @Cron(CronExpression.EVERY_DAY_AT_NOON)
  async executeScheduledJobs(): Promise<void> {
    await jobService.created({
      id: job.id,
    });
  }
}

@Injectable()
export class JobService
  extends BaseQueueConsumerService
  implements JobPlugin
{
  constructor(
    @InjectQueue(JOB_QUEUE_NAME) protected queue: Queue,
  ) {
    super();
  }

  async created(data: JobCreatedRequestPayload): Promise<void> {
    await this.queue.add(
      JobQueueTopics.JOB_CREATED,
      data,
      {
        removeOnComplete: true,
        removeOnFail: true,
      },
    );
  }
}

@Processor(JOB_QUEUE_NAME, {
  concurrency: JOB_TOPIC_WORKERS_AMOUNT,
})
export class JobProcessor extends BaseJobProcessor<
  JobQueueTopic,
  JobTopicPayload
> {
  constructor(
    private readonly store: DataStoreService, // <----- store described above the code example
    private readonly test: TeamsService, // <---- feature service
  ) {
    super();
  }

  @UseCls()
  async process({
    data,
    name,
  }: Job): Promise<void> {
    this.store.jobId = data.id;

    this.logger.log(`????? ${this.store.jobId}`); // output: ????? 1234567890

    await this.test.test();
  }
}

@Injectable()
export class TeamsService {
  constructor(
    private readonly store: DataStoreService,  // <----- store described above the code example
  ) {}

  @UseCls()
  async test(): Promise<void> {
    this.logger.log(`????? ${this.store.jobId}`); // output: ????? undefined
  }
}

const providers = [DataStoreService];

@Module({
  providers,
  exports: providers,
})
export class DataStoreModule {}

@Injectable()
export class DataStoreService {
  constructor(private readonly cls: ClsService<DataStore>) {}

  public get jobId(): string {
    return this.cls.get('jobId');
  }

  public set jobId(jobId: Uuid) {
    this.cls.set('jobId', jobId);
  }
}

export interface DataStore extends ClsStore {
  jobId: Uuid;
}

const providers = [TeamsService];

@Module({
  imports: [
    DataStoreModule, // <---  Module with DataStoreService
  ],
  providers,
  exports: providers,
})
export class TeamsModule {}

const providers = [
  JobService,
  JobProcessor,
];

@Module({
  imports: [
    BullModule.registerQueue({
      name: JOB_QUEUE_NAME,
    }),
    DataStoreModule, // <---  Module with DataStoreService
    TeamsModule, // <--- Module with feature service
  ],
  providers,
  exports: [JobService],
})
export class JobModule {}

Could you please advise what we are doing incorrectly? And if it's possible at all in this setup to have shared local storage for the Processor and its dependencies?

Papooch commented 1 year ago

Using CLS in your application is definitely possible, but I see multiple issues with your code. Fixing those should make it work like you want.

It's important to understand that the @UseCls decorator defines the entry point which sets up the context for all future calls from that method. If you call a method that is decorated with @UseCls when you're already inside an active CLS context, then it gets overridden with a new one (this is something that needs addressing in the docs).

Btw, if you use mount: true, you don't need @UseGuards(ClsGuard) anymore. Also, CRON controllers don't support enhancers, that's why you need the @UseCls decorator on top of the @Cron decorator (of using cls.run to wrap the call) and not the method that is being called from it - again, that is your entry point to the context.

eliotik commented 1 year ago

Hi @Papooch thank you for your prompt response. I tried to follow your advise and faced a new issue:

@Injectable() export class JobService extends BaseQueueConsumerService implements JobPlugin { constructor( @InjectQueue(JOB_QUEUE_NAME) protected queue: Queue, ) { super(); }

async created(data: JobCreatedRequestPayload): Promise { await this.queue.add( JobQueueTopics.JOB_CREATED, data, { removeOnComplete: true, removeOnFail: true, }, ); } }

@Processor(JOB_QUEUE_NAME, { concurrency: JOB_TOPIC_WORKERS_AMOUNT, }) export class JobProcessor extends BaseJobProcessor< JobQueueTopic, JobTopicPayload

{ constructor( private readonly store: DataStoreService, // <----- store described above the code example private readonly test: TeamsService, // <---- feature service ) { super(); }

async process({ data, name, }: Job): Promise { this.store.jobId = data.id; // <---- Code execution is stuck now, if I comment this line, process method will continue and finish code and ofcourse next lin ewill be undefined and TeamsService.test method output also will be undefined

this.logger.log(`????? ${this.store.jobId}`); // output: ????? 1234567890

await this.test.test();

} }

@Injectable() export class TeamsService { constructor( private readonly store: DataStoreService, // <----- store described above the code example ) {}

async test(): Promise { this.logger.log(????? ${this.store.jobId}); // output: ????? undefined } }

const providers = [DataStoreService];

@Module({ providers, exports: providers, }) export class DataStoreModule {}

@Injectable() export class DataStoreService { constructor(private readonly cls: ClsService) {}

public get jobId(): string { return this.cls.get('jobId'); }

public set jobId(jobId: Uuid) { this.cls.set('jobId', jobId); } }

export interface DataStore extends ClsStore { jobId: Uuid; }

const providers = [TeamsService];

@Module({ imports: [ DataStoreModule, // <--- Module with DataStoreService ], providers, exports: providers, }) export class TeamsModule {}

const providers = [ JobService, JobProcessor, ];

@Module({ imports: [ BullModule.registerQueue({ name: JOB_QUEUE_NAME, }), DataStoreModule, // <--- Module with DataStoreService TeamsModule, // <--- Module with feature service ], providers, exports: [JobService], }) export class JobModule {}



Added a comment inside JobProcessor.process method. Now when I try to assign in this setup value to the property in cls code execution halts, no error, it just hangs and that's it. If I comment this line, code runs ok but all the values in cls are undefined.
I definitely miss something. 
Should I create repository with project code example?
Thank you for your help.
eliotik commented 1 year ago

Tried in controller to add UseCls decorator to endpoint method too but it didn't help, code stuck on the assign value to property in Cls.

  @UseCls()
  @GrpcMethod(JOB_SERVICE_NAME, 'CreateJob') // <---- Added this as example of how endpoint looks like, we use grpc with protobufs
  async createJobs(job: CreateJobRequest): Promise<void> {
    await jobService.created({
      id: job.id,
    });
  }
Papooch commented 1 year ago

Could you please create a minimum reproduction and send it my way (a link to a public GH repo would be best), so I can try debugging it on my computer?

That's probably going to be much more effective than trying to make sense of code snippets.

Papooch commented 1 year ago

@eliotik Since you haven't provided the reproduction, do you require any further assistance, or can I close this issue?

Btw, controller methods decorated with GrpcMethod do support Nest's enhancers, so using an interceptor there instead of the decorator would be a better choice (UseInterceptors(ClsInterceptor)). The UseCls decorator actually replaces the method's implementation by one that is wrapped in cls.run, which might be messing with Nest's reflection - so the order of the decorators might need to be swapped for it to work properly.

eliotik commented 1 year ago

Hi, please excuse me, but I didn't have time, we had a release and temporarily replaced CLS with another solution. I will prepare a repository on the weekend.

Papooch commented 1 year ago

Ok, no worries. I'm prepared to support you on this, just wanted to know if you're still interested.

Papooch commented 1 year ago

I'm going to go ahead and close this issue. If you happen to have more time in the future, feel free to tag me here with some clarifying information and I'll reopen it, so we can continue the discussion.