Papooch / nestjs-cls

A continuation-local storage (async context) module compatible with NestJS's dependency injection.
https://papooch.github.io/nestjs-cls/
MIT License
437 stars 28 forks source link

Prisma transactional plugin connection usage #181

Closed xtrinch closed 1 day ago

xtrinch commented 3 days ago

I'm using the prisma transactional plugin, and spawning 100+ transactions in an async context (with bull jobs & processors). Not using the @Transactional() decorator, everything is fine and dandy, but as soon as I add it I start getting this error:

    Timed out fetching a new connection from the connection pool. More info: http://pris.ly/d/connection-pool (Current connection pool timeout: 10, connection limit: 13)

I am not sure how to go about diagnosing this issue. My async jobs are not database heavy, one select and one small update statement. My database is also pretty empty (max 100 entities per table) since I can reproduce this in e2e tests even.

My jobs would be somewhat like this:

@Transactional<TransactionalAdapterPrisma>({}) // <---- If I remove this everything is OK
  async processPerks(data: { user: IUser }): Promise<void> {
    const user = await this.userService.findById(data.user.id); // <---- This is where it throws the connection pool error

    await this.emailService.sendEmail(...);

    await this.userService.update(data.user.id, { sent: true });
  }

I do use an extended PrismaClient:

@Injectable()
export class PrismaService
  extends PrismaClient<Prisma.PrismaClientOptions, 'info' | 'query' | 'warn' | 'error'>
  implements OnModuleInit
{
  constructor(
    private readonly prismaConfig: PrismaConfig,
    private readonly logger: LoggerService,
  ) {
    super({
      log: prismaConfig.log?.map((level) => ({ emit: 'event', level })),
      transactionOptions: {
        maxWait: 20000, // default: 2000
        timeout: 60000, // default: 5000
        isolationLevel: Prisma.TransactionIsolationLevel.ReadCommitted,
      },
    });
  }

  async onModuleInit() {
    if (this.prismaConfig.autoConnect) {
      await this.$connect();
    }
  }

  async disconnect() {
    await this.$disconnect();
  }
}

PrismaModule:

@Module({})
@Global()
export class PrismaModule {
  static forRootAsync(): DynamicModule {
    const prismaServiceProvider: Provider = {
      provide: 'PRISMA_SERVICE',
      useFactory: async (appConfig: AppConfig, prismaConfig: PrismaConfig, loggerService: LoggerService) => {
        const prismaService = new PrismaService(prismaConfig, loggerService);

        return prismaService;
      },
      inject: [AppConfig, PrismaConfig, LoggerService],
    };

    return {
      module: PrismaModule,
      global: true,
      providers: [prismaServiceProvider, getConfigFactory(PrismaConfig), getConfigFactory(AppConfig)],
      exports: [prismaServiceProvider],
    };
  }
}

and ClsModule:

@Module({
  imports: [
    ClsModule.forRoot({
      plugins: [
        new ClsPluginTransactional({
          adapter: new TransactionalAdapterPrisma<PrismaService>({
            prismaInjectionToken: 'PRISMA_SERVICE',
          }),
        }),
      ],
    }),
  ],
  providers: [],
})
export class AppClsModule {}

What's happening under the hood here? I've used transactions at scale with typeorm many times and never had any sort of connection pool issues.

Papooch commented 3 days ago

Hi, the only magic that happens under the hood is the Transactional plugin wrapping the method in prismaClinent.$transaction((tx) => yourMethod ) and storing the tx in AsyncLocalStorage to be retrieved by downstream components. There's no manual management of the transaction by the plugin, so the transaction client should be released back to the pool after the transaction callback returns.

The options:

      transactionOptions: {
        maxWait: 20000, // default: 2000
        timeout: 60000, // default: 5000
      },

only apply to row/table locks created within the transaction and not to obtaining the pool client in the first place.

The error you're getting (Timed out fetching a new connection from the connection pool) seems to be caused simply by not having big enough connection pool, or too small timeout.

To rule out any influence from the plugin, you can try manually wrapping the body of the function with prisma.$transaction to simulate holding a transaction open for the entire duration of the method, without actually caring if it is propagated or not.

xtrinch commented 1 day ago

Yeah same thing seems to happen if I wrap in a transaction manually so doesn't look to be an issue of this plugin. I can reproduce this down to 20 async jobs total. I don't presume there's settings for obtaining the pool client in the first place? This is really quite strange, it's unrealistic to expect to have enough pool clients to satisfy 1000s of async jobs..

The behaviour is quite strange too, when I surpass the number of clients only one of them successfully obtains the client while all the rest throw this error. When I'm within range, all of them pass.

Also if I remove let's say sendEmail, which brings our async job down to one select + one really simple update, I can also reproduce the issue. Without transactions I can get 1000+ jobs through instantaneously, but with transactions, apparently only below 20. This just doesn't sound right to me, because I was able to do the same with typeorm without an issue without modifying any default settings.

Papooch commented 1 day ago

As per Prisma's docs

When using Prisma ORM, the database connections are handled on an engine-level. This means they're not exposed to the developer and it's not possible to manually access them.

And also

The default number of connections (pool size) is calculated with the following formula: num_physical_cpus * 2 + 1

If you need to run thousands of simultaneous transactions, then you might need to bump it up, or increase the pool timeout significantly.

Anyway, since this is obviously not an issue with this library, I'll go ahead and close the issue and advise you to bring it to the Prisma team's attention.