prisma / prisma

Next-generation ORM for Node.js & TypeScript | PostgreSQL, MySQL, MariaDB, SQL Server, SQLite, MongoDB and CockroachDB
https://www.prisma.io
Apache License 2.0
39.56k stars 1.54k forks source link

Prisma reaching connection pool limits error while using an external pooler #24945

Open Holbanner opened 2 months ago

Holbanner commented 2 months ago

Bug description

While using Prisma and pg-bouncer via cloudnativePg, we configured our connection to use pgbouncer by passing the?pgbouncer=true parameter to the connection url

We have jobs running periodically, one of which runs an updateMany() on multiple rows based on dates. Around the time the day changes, this job logically sees a peak in activity. There can be multiple instances of this job running concurrently

When this happens we start to see this connection limits errors coming from prisma : Timed out fetching a new connection from the connection pool. More info: http://pris.ly/d/connection-pool (Current connection pool timeout: 10, connection limit: 9) While pgbouncer seems to be running normally

We've also notice Prisma throwing an error containing the following chunk of the client runtime library before the first timeout of the daily run (that's line 122 from prisma/client/library.js) :

)}var Zi=({clientMethod:e,activeProvider:r})=>t=>{let n="",i;if(Array.isArray(t)){let[o,...s]=t;n=o,i={values:Pt(s||[]),__prismaRawParameters__:!0}}else switch(r){case"sqlite":case"mysql":{n=t.sql,i={values:Pt(t.values),__prismaRawParameters__:!0};break}case"cockroachdb":case"postgresql":case"postgres":{n=t.text,i={values:Pt(t.values),__prismaRawParameters__:!0};break}case"sqlserver":{n=tl(t),i={values:Pt(t.values),__prismaRawParameters__:!0};break}default:throw new Error(`The ${r} provider does not support ${e}`)}return i?.values?sl(`prisma.${e}(${n}, ${i.values})`):sl(`prisma.${e}(${n})`),{query:n,parameters:i}},al={requestArgsToMiddlewareArgs(e){return[e.strings,...e.values]},middlewareArgsToRequestArgs(e){let[r,...t]=e;return new ie(r,t)}},ll={requestArgsToMiddlewareArgs(e){return[e]},middlewareArgsToRequestArgs(e){return e[0]}};function Xi(e){return function(t){let n,i=(o=e)=>{try{return o===void 0||o?.kind==="itx"?n??(n=ul(t(o))):ul(t(o))}catch(s){return Promise.reject(s)}};return{then(o,s){return i().then(o,s)},catch(o){return i().catch(o)},finally(o){return i().finally(o)},requestTransaction(o){let s=i(o);return s.requestTransaction?s.requestTransaction(o):s},[Symbol.toStringTag]:"PrismaPromise"}}}function ul(e){return typeof e.then=="function"?e:Promise.resolve(e)}var cl={isEnabled(){return!1},getTraceParent(){return"00-10-10-00"},async createEngineSpan(){},getActiveContext(){},runInChildSpan(e,r){return r()}},eo=class{isEnabled(){return this.getGlobalTracingHelper().isEnabled()}getTraceParent(r){return this.getGlobalTracingHelper().getTraceParent(r)}createEngineSpan(r){return this.getGlobalTracingHelper().createEngineSpan(r)}getActiveContext(){return this.getGlobalTracingHelper().getActiveContext()}runInChildSpan(r,t){return this.getGlobalTracingHelper().runInChildSpan(r,t)}getGlobalTracingHelper(){return globalThis.PRISMA_INSTRUMENTATION?.helper??cl}};function pl(e){return e.includes("tracing")?new eo:cl}function ml(e,r=()=>{}){let t,n=new Promise(i=>t=i);return{then(i){return--e===0&&t(r()),i?.(n)}}}var Zm=["$connect","$disconnect","$on","$transaction","$use","$extends"],dl=Zm;function fl(e){return typeof e=="string"?e:e.reduce((r,t)=>{let n=typeof t=="string"?t:t.level;return n==="query"?r:r&&(t==="info"||r==="info")?"info":n},void 0)}var Cn=class{constructor(){this._middlewares=[]}use(r){this._middlewares.push(r)}get(r){return this._middlewares[r]}has(r){return!!this._middlewares[r]}length(){return this._middlewares.length}};var hl=k(ui());function Sn(e){return typeof e.batchRequestIdx=="number"}function Rn(e){return e===null?e:Array.isArray(e)?e.map(Rn):typeof e=="object"?Xm(e)?ed(e):gr(e,Rn):e}function Xm(e){return e!==null&&typeof e=="object"&&typeof e.$type=="string"}function ed({$type:e,value:r}){switch(e){case"BigInt":return BigInt(r);case"Bytes":return Buffer.from(r,"base64");case"DateTime":return new Date(r);case"Decimal":return new Te(r);case"Json":return JSON.parse(r);default:rr(r,"Unknown tagged value")}}function gl(e){if(e.action!=="findUnique"&&e.action!=="findUniqueOrThrow")return;let r=[];return e.modelName&&r.push(e.modelName),e.query.arguments&&r.push(ro(e.query.arguments)),r.push(ro(e.query.selection)),r.join("")}function ro(e){return`(${Object.keys(e).sort().map(t=>{let n=e[t];return typeof n=="object"&&n!==null?`(${t} ${ro(n)})`:t}).join(" ")})`}var rd={aggregate:!1,aggregateRaw:!1,createMany:!0,createManyAndReturn:!0,createOne:!0,deleteMany:!0,deleteOne:!0,executeRaw:!0,findFirst:!1,findFirstOrThrow:!1,findMany:!1,findRaw:!1,findUnique:!1,findUniqueOrThrow:!1,groupBy:!1,queryRaw:!1,runCommandRaw:!0,updateMany:!0,updateOne:!0,upsertOne:!0};function to(e){return rd[e]}var An=class{constructor(r){this.options=r;this.tickActive=!1;this.batches={}}request(r){let t=this.options.batchBy(r);return t?(this.batches[t]||(this.batches[t]=[],this.tickActive||(this.tickActive=!0,process.nextTick(()=>{this.dispatchBatches(),this.tickActive=!1}))),new Promise((n,i)=>{this.batches[t].push({request:r,resolve:n,reject:i})})):this.options.singleLoader(r)}dispatchBatches(){for(let r in this.batches){let t=this.batches[r];delete this.batches[r],t.length===1?this.options.singleLoader(t[0].request).then(n=>{n instanceof Error?t[0].reject(n):t[0].resolve(n)}).catch(n=>{t[0].reject(n)}):(t.sort((n,i)=>this.options.batchOrder(n.request,i.request)),this.options.batchLoader(t.map(n=>n.request)).then(n=>{if(n instanceof Error)for(let i=0;i<t.length;i++)t[i].reject(n);else for(let i=0;i<t.length;i++){let o=n[i];o instanceof Error?t[i].reject(o):t[i].resolve(o)}}).catch(n=>{for(let i=0;i<t.length;i++)t[i].reject(n)}))}}get[Symbol.toStringTag](){return"DataLoader"}};var td=L("prisma:client:request_handler"),In=class{constructor(r,t){this.logEmitter=t,this.client=r,this.dataloader=new An({batchLoader:na(async({requests:n,customDataProxyFetch:i})=>{let{transaction:o,otelParentCtx:s}=n[0],a=n.map(p=>p.protocolQuery),l=this.client._tracingHelper.getTraceParent(s),u=n.some(p=>to(p.protocolQuery.action));return(await this.client._engine.requestBatch(a,{traceparent:l,transaction:nd(o),containsWrite:u,customDataProxyFetch:i})).map((p,m)=>{if(p instanceof Error)return p;try{return this.mapQueryEngineResult(n[m],p)}catch(f){return f}})}),singleLoader:async n=>{let i=n.transaction?.kind==="itx"?yl(n.transaction):void 0,o=await this.client._engine.request(n.protocolQuery,{traceparent:this.client._tracingHelper.getTraceParent(),interactiveTransaction:i,isWrite:to(n.protocolQuery.action),customDataProxyFetch:n.customDataProxyFetch});return this.mapQueryEngineResult(n,o)},batchBy:n=>n.transaction?.id?`transaction-${n.transaction.id}`:gl(n.protocolQuery),batchOrder(n,i){return n.transaction?.kind==="batch"&&i.transaction?.kind==="batch"?n.transaction.index-i.transaction.index:0}})}async request(r){try{return await this.dataloader.request(r)}catch(t){let{clientMethod:n,callsite:i,transaction:o,args:s,modelName:a}=r;this.handleAndLogRequestError({error:t,clientMethod:n,callsite:i,transaction:o,args:s,modelName:a})}}mapQueryEngineResult({dataPath:r,unpacker:t},n){let i=n?.data,o=n?.elapsed,s=this.unpack(i,r,t);return process.env.PRISMA_CLIENT_GET_TIME?{data:s,elapsed:o}:s}handleAndLogRequestError(r){try{this.handleRequestError(r)}catch(t){throw this.logEmitter&&this.logEmitter.emit("error",{message:t.message,target:r.clientMethod,timestamp:new Date}),t}}handleRequestError({error:r,clientMethod:t,callsite:n,transaction:i,args:o,modelName:s}){if(td(r),id(r,i)||r instanceof Oe)throw r;if(r instanceof V&&od(r)){let l=El(r.meta);Tn({args:o,errors:[l],callsite:n,errorFormat:this.client._errorFormat,originalMethod:t,clientVersion:this.client._clientVersion})}let a=r.message;if(n&&(a=Ar({callsite:n,originalMethod:t,isPanic:r.isPanic,showColors:this.client._errorFormat==="pretty",message:a})),a=this.sanitizeMessage(a),r.code){let l=s?{modelName:s,...r.meta}:r.meta;throw new V(a,{code:r.code,clientVersion:this.client._clientVersion,meta:l,batchRequestIdx:r.batchRequestIdx})}else{if(r.isPanic)throw new le(a,this.client._clientVersion);if(r instanceof B)throw new B(a,{clientVersion:this.client._clientVersion,batchRequestIdx:r.batchRequestIdx});if(r instanceof R)throw new R(a,this.client._clientVersion);if(r instanceof le)throw new le(a,this.client._clientVersion)}throw r.clientVersion=this.client._clientVersion,r}sanitizeMessage(r){return this.client._errorFormat&&this.client._errorFormat!=="pretty"?(0,hl.default)(r):r}unpack(r,t,n){if(!r||(r.data&&(r=r.data),!r))return r;let i=Object.values(r)[0],o=t.filter(a=>a!=="select"&&a!=="include"),s=Rn(Ii(i,o));return n?n(s):s}get[Symbol.toStringTag](){return"RequestHandler"}};function nd(e){if(e){if(e.kind==="batch")return{kind:"batch",options:{isolationLevel:e.isolationLevel}};if(e.kind==="itx")return{kind:"itx",options:yl(e)};rr(e,"Unknown transaction kind")}}function yl(e){return{id:e.id,payload:e.payload}}function id(e,r){return Sn(e)&&r?.kind==="batch"&&e.batchRequestIdx!==r.index}function od(e){return e.code==="P2009"||e.code==="P2012"}function El(e){if(e.kind==="Union")return{kind:"Union",errors:e.errors.map(El)};if(Array.isArray(e.selectionPath)){let[,...r]=e.selectionPath;return{...e,selectionPath:r}}return e}var bl="5.15.1";var wl=bl;function xl(e){return e.map(r=>{let t={};for(let n of Object.keys(r))t[n]=Pl(r[n]);return t})}function Pl({prisma__type:e,prisma__value:r}){switch(e){case"bigint":return BigInt(r);case"bytes":return Buffer.from(r,"base64");case"decimal":return new Te(r);case"datetime":case"date":return new Date(r);case"time":return new Date(`1970-01-01T${r}Z`);case"array":return r.map(Pl);default:return r}}var Sl=k(Hi());var q=class extends Error{constructor(r){super(r+

How to reproduce

Set up Prisma to use pgbouncer Run a large amount of queries in parallel

Expected behavior

We expect prisma to let pgbouncer handle the connection pooling

Prisma information

model MarketsGameInstances {
  marketGameInstanceId     Int                       @id(map: "primary_key_market_game_instance_id") @default(autoincrement()) @map("market_game_instance_id")
  eventGameId              Int                       @map("event_game_id")
  marketGameId             Int                       @map("market_game_id")
  offerType                OfferType                 @map("offer_type")
  status                   Int                       @db.SmallInt
  eventGame                EventsGame                @relation(fields: [eventGameId], references: [eventId], onDelete: Cascade, onUpdate: NoAction, map: "market_game_instance_event_game_foreign_key")
  marketGame               MarketsGame               @relation(fields: [marketGameId], references: [marketGameId], onDelete: Cascade, onUpdate: NoAction, map: "market_game_instance_market_game_foreign_key")
  outcomesGameInstances    OutcomesGameInstances[]

  @@unique([eventGameId, marketGameId, offerType], map: "market_game_instance_unique_event_market_offer_type")
  @@index([eventGameId], map: "market_game_instance_index_event_game_id")
  @@index([marketGameId], map: "market_game_instance_index_market_game_id")
  @@index([offerType], map: "market_game_instance_index_offer_type")
  @@map("market_game_instances")
}

model EventsGame {
  eventId              Int                    @id(map: "event_id_primary_key") @default(autoincrement()) @map("event_id")
  targetId             Int                    @map("target_id") @db.SmallInt
  name                 String                 @db.VarChar(120)
  sportId              Int                    @map("sport_id")
  internalId           String                 @map("internal_id") @db.VarChar(120)
  eventDate            DateTime?              @map("event_date") @db.Timestamptz(6)
  createdAt            DateTime               @default(now()) @map("created_at") @db.Timestamptz(6)

  @@unique([targetId, internalId], map: "events_unique")
  @@index([targetId], map: "events_index_target_id")
  @@index([sportId], map: "events_index_sport_id")
  @@index([competitionId], map: "events_index_competition_id")
  @@index([internalId], map: "events_index_internal_id")
  @@index([eventDate], map: "events_index_event_date")
  @@index([name], map: "events_index_name")
  @@map("events")
}
  async close(data: {
    offerType: ScrapableUrlType;
    marketGameInstanceIdsToExclude: number[];
    targetId: number;
    sportIds?: number[];
    eventIds?: number[];
    eventIdsToExclude?: number[];
  }): Promise<void> {
    const where: Prisma.MarketsGameInstancesWhereInput = {
      marketGameInstanceId: { notIn: data.marketGameInstanceIdsToExclude },
      offerType: data.offerType,
      eventGame: {
        targetId: data.targetId,
        eventDate: { gte: threeDaysAgo() },
      },
    };
    if (data.sportIds?.length > 0) {
      where.eventGame.sportId = { in: data.sportIds };
    }
    if (data.eventIds?.length > 0) {
      where.eventGameId = { in: data.eventIds };
    }
    if (data.eventIdsToExclude?.length > 0) {
      where.eventGameId = { notIn: data.eventIdsToExclude };
    }

    try {
      await this.prisma.marketsGameInstances.updateMany({
        where,
        data: { status: InstanceStatus.CLOSED },
      });
    } catch (e) {
      throw e;
    }
  }

Environment & setup

Prisma Version

5.15.1
Weakky commented 2 months ago

Hey @Holbanner, would you mind sharing with us a reproduction repository with detailed steps to observe your error? That'd be extremely helpful. Thank you

hassanamin994 commented 2 months ago

Can confirm this, running on v4.3.1 though. Thought that upgrading would help but apparently the problem still the same. One thing I noticed is that there're alot of COMMIT queries usually never gets terminated.

image
Holbanner commented 2 months ago

Hey @Holbanner, would you mind sharing with us a reproduction repository with detailed steps to observe your error? That'd be extremely helpful. Thank you

I would very much like to, but that's one of those issues that "work on my machine". I'm only facing this issue on my staging environment.

This problem is evolving somehow in weird ways.

I've tried playing around with pool_timeout and connection_pool and see the changes in the error. (to the extend that it displays whatever i have changed)

By slightly changing how the code runs, i've come to understand that the problem is not only this particular request but most of the requests using the model MarketsGameInstances

From what i've gathered a first request fails then the serve closes connection somehow creating the deadlock (with Server has closed the connection errors looping)

I'm currently investigating the model, but the only thing that sets it appart is that OfferType is an enum