kysely-org / kysely

A type-safe typescript SQL query builder
https://kysely.dev
MIT License
10.68k stars 271 forks source link

How to ensure queries finish run before executing others #543

Closed mandaputtra closed 1 year ago

mandaputtra commented 1 year ago

I have some complex transaction. It looks like these

export async function getInverterYieldAndUpdate(
  db: Kysely<Database>,
  invertersStr: string,
  dateUnix: string
) {
  const date = new Date(+dateUnix);
  // We get 3 days of data because the server we used are on UTC
  // we can only get the full day data for our plant in
  // regional if we query 3 days straght
  const startDate = startOfDay(subDays(date, 1)).getTime();
  const endDate = endOfDay(addDays(date, 1)).getTime();
  const devices: {
    device_id: string; // Id to be requested to Huawei
    id: string;
    plant_id: string;
    timezone: Timezone;
    device_type: string;
  }[] = JSON.parse(invertersStr);

  const device_code = getDeviceCode('inverter');

  if (device_code === -1) return;

  const inverters = await getDeviceHistory(
    devices.map((d) => d.device_id).join(','),
    String(device_code),
    startDate,
    endDate
  );

  if (inverters.length < 1) return;

  await db.transaction().execute(async (trx) => {
    const PLTEReports = await trx
      .selectFrom('plant_lifetime_energy_managements')
      .selectAll()
      .where('year', '=', date.getFullYear())
      .where('month', '=', date.getMonth() + 1)
      .where('day', '=', date.getDate())
      .where(
        'plant_id',
        'in',
        devices.map((d) => d.plant_id)
      )
      .orderBy('collect_time', 'asc')
      .execute();

    const groupHistoryByDeviceId = A.groupBy(inverters, (i) => i.device_id);
    const groupPLTEByPlantId = A.groupBy(PLTEReports, (p) => p.plant_id);
    const deletedGroup: Array<any> = [];
    const updateInsertedPLTE: PlantLifeTimeEnergies['insert'][] = [];

    A.forEach(devices, (device) => {
      const PLTEByPlant = groupPLTEByPlantId[device.plant_id];
      const historyByDevice = groupHistoryByDeviceId[device.device_id];

      const timezone = device.timezone;
      const localDate = utcToZonedTime(+dateUnix, timezone);

      const localStartDate = getTime(
        zonedTimeToUtc(startOfDay(localDate), timezone)
      );
      const localEndDate = getTime(
        zonedTimeToUtc(endOfDay(localDate), timezone)
      );

      const day = localDate.getDate();
      const month = localDate.getMonth() + 1;
      const year = localDate.getFullYear();

      if (!PLTEByPlant || !historyByDevice) {
        return;
      }

      deletedGroup.push({
        day,
        month,
        year,
        plant_id: device.plant_id,
      });

      const sHistoryByDevice = pipe(
        historyByDevice,
        A.filter(
          (h) =>
            h.collect_time >= localStartDate && h.collect_time <= localEndDate
        ),
        A.sort((a, b) => a.collect_time - b.collect_time)
      );

      // map the inverter yield value to 24 hour format
      const mapHistoryData = pipe(
        sHistoryByDevice,
        A.mapWithIndex((index, history) => {
          const { hour, minutes } = utcConvert(
            device.timezone,
            history.collect_time
          );

          if (minutes === 55 && hour === 0) {
            return history.device_detail['day_cap'] ?? 0;
          } else if (minutes === 55 && hour === 23) {
            const leapIndex = index - 11;
            const firstHistory = checkValidHistoryData(
              sHistoryByDevice,
              leapIndex
            );

            if (!firstHistory) {
              return undefined;
            }

            const inverterYield =
              (history.device_detail['day_cap'] ?? 0) -
              (firstHistory.device_detail['day_cap'] ?? 0);
            return inverterYield;
          } else if (minutes === 0 && hour !== 0 && hour !== 23) {
            const leapIndex = index + 12;
            const leapHistory = checkValidHistoryData(
              sHistoryByDevice,
              leapIndex
            );

            if (!leapHistory) {
              return undefined;
            }

            const inverterYield =
              (leapHistory.device_detail['day_cap'] ?? 0) -
              (history.device_detail['day_cap'] ?? 0);
            return inverterYield;
          }
        }),
        A.filter((a) => a !== undefined)
      );

      // Merge the array
      const mapInsertData = pipe(
        PLTEByPlant,
        A.mapWithIndex((index, plte) => {
          // eslint-disable-next-line
          const { id: _, ...restData } = plte;
          return {
            ...restData,
            inverter_yield_kwh_hour: mapHistoryData[index] ?? 0,
          };
        })
      );

      updateInsertedPLTE.push(...mapInsertData);
    });

    // Delete related records from plants
    await Promise.all(
      deletedGroup.map(async (d) => {
        return trx
          .deleteFrom('plant_lifetime_energy_managements')
          .where('year', '=', d.day)
          .where('month', '=', d.month)
          .where('day', '=', d.year)
          .where('plant_id', '=', d.plant_id)
          .execute();
      })
    );    
   await trx
      .insertInto('plant_lifetime_energy_managements')
      .values(updateInsertedPLTE)
      .execute();
  });
}

At the bottom of the code there are await Promise.all the problem with this code are the Promise.all function never finishes before it run the insert so the insert table would error out duplicate unique key error. How do I ensure that the function on Promise.all finished running? Is there any technique like these where as it using Task and Batch or it's just not possible to wait the delete statement finish before running the insert?

igalklebanov commented 1 year ago

Hey 👋

Have you tried setting an isolation level to your transaction? https://kysely-org.github.io/kysely/classes/TransactionBuilder.html#setIsolationLevel

The insert query might not be seeing uncommitted changes so acts on database state before the deletions.

mandaputtra commented 1 year ago

Hi, silly me, you're correct 💯 setting it up to "read commited" resulted in correct result. I found a better solution since its key conflict I can use onConflict

      await trx
        .insertInto('plant_lifetime_energy_managements')
        .values(updateInsertedPLTE)
        .onConflict((oc) => {
          return oc.columns(['plant_id', 'collect_time']).doUpdateSet({
            inverter_yield_kwh_hour: (eb) =>
              eb.ref('excluded.inverter_yield_kwh_hour'),
          });
        })
        .execute();

Thanks!