oven-sh / bun

Incredibly fast JavaScript runtime, bundler, test runner, and package manager – all in one
https://bun.sh
Other
71.29k stars 2.5k forks source link

Bun project that process all kafka message sent by debezium #11151

Open Dave93 opened 2 weeks ago

Dave93 commented 2 weeks ago

How can we reproduce the crash?

Debezium streams postgres database to kafka and if there are a lot of not processed messages bun processes some of them and crashes. If you need backup of postgres database I can give that

JavaScript/TypeScript code that reproduces the crash?

import { Database } from "duckdb-async";
const db = await Database.create(process.env.DUCK_PATH!);
const kafka = new Kafka({
    clientId: "arryy-duckdb",
    brokers: ["127.0.0.1:9092"],
  });

  const consumer = kafka.consumer({
    groupId: process.env.KAFKA_GROUP_ID || "test-group",

  });

  await consumer.connect();
  await consumer.subscribe({
    topics: [/arryt\.public.*/]
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const object = JSON.parse(message.value?.toString() || "{}");
      // console.log("debezium event", object);
      if ('payload' in object) {
        const {
          payload: { before, after, source, op, ts_ms, source: { table } },
        } = object;
        console.log('table ', table)
        try {
          if (!before) {
            const existingRecord = await db.all(`SELECT id FROM ${table} WHERE id = '${after.id}'`);
            if (existingRecord.length === 0) {
              console.log(`
          INSERT INTO ${table} (${Object.keys(after).join(", ")}) VALUES (${Object.values(after).map((val) => (typeof val === "string" ? `'${val.replace(/'/g, "''")}'` : val)).join(", ")})
          `)
              db.all(`
          INSERT INTO ${table} (${Object.keys(after).join(", ")}) VALUES (${Object.values(after).map((val) => (typeof val === "string" ? `'${val.replace(/'/g, "''")}'` : val)).join(", ")})
          `)
            } else {
              console.log(`
          UPDATE ${table} SET ${Object.keys(after).map((key) => `${key} = ${typeof after[key] === "string" ? `'${after[key].replace(/'/g, "''")}'` : after[key]}`).join(", ")} WHERE id = '${after.id}'
          `)
              db.all(`
          UPDATE ${table} SET ${Object.keys(after).map((key) => `${key} = ${typeof after[key] === "string" ? `'${after[key].replace(/'/g, "''")}'` : after[key]}`).join(", ")} WHERE id = '${after.id}'
          `)
            }

          } else if (!after) {
            console.log(`
        DELETE FROM ${table} WHERE id = '${before.id}'
        `)
            db.all(`
        DELETE FROM ${table} WHERE id = '${before.id}'
        `)
          }

        } catch (e) {
          console.log('error', e)
        }
      }

    },
  });

Relevant log output

UPDATE order_actions SET id = 'd49099f9-8cf2-41fc-80db-dd3de8277d0d', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 3, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "Доставлен"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T07:08:43.32600Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = 'd49099f9-8cf2-41fc-80db-dd3de8277d0d'

table  order_actions

          UPDATE order_actions SET id = 'b80ac290-e5ec-44f8-8955-94ee6ba8cebf', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 587, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "В пути"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T07:08:33.48100Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = 'b80ac290-e5ec-44f8-8955-94ee6ba8cebf'

table  order_actions

          UPDATE order_actions SET id = 'abbffe4c-9707-41ef-909c-c0fd616f3952', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 6, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "Ожидает гостя"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T07:08:39.61400Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = 'abbffe4c-9707-41ef-909c-c0fd616f3952'

table  order_actions

          UPDATE order_actions SET id = '63fe42fa-9095-4b39-b089-9fd36bf51ecc', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 0, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "В филиале"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T06:58:46.27400Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = '63fe42fa-9095-4b39-b089-9fd36bf51ecc'

table  order_actions
============================================================
Bun v1.1.8 (89d25807) Linux x64
Args: "bun", "run", "src/index.ts"
Features: jsc dotenv http_server transpiler_cache(2) tsconfig_paths tsconfig(4) 
Builtins: "bun:main" "detect-libc" "node:assert" "node:buffer" "node:crypto" "node:events" "node:fs" "node:http" "node:net" "node:os" "node:path" "node:stream" "node:string_decoder" "node:tls" "node:url" "node:util" "node:util/types" "node:http2" 
Elapsed: 6735ms | User: 6700ms | Sys: 910ms
RSS: 1.05GB | Peak: 0.20GB | Commit: 1.05GB | Faults: 4

panic(main thread): Segmentation fault at address 0x4E6000080208
oh no: Bun has crashed. This indicates a bug in Bun, not your code.

Stack Trace (bun.report)

Bun v1.1.8 (89d2580) on linux x86_64 [RunCommand]

Segmentation fault at address 0x4E6000080208