metadb-project / metadb

Metadb extends PostgreSQL with features to support analytics such as streaming data sources, data model transforms, and historical data
Apache License 2.0
8 stars 4 forks source link

Support multiple partitions per topic in Kafka #82

Open nazgaret opened 3 months ago

nazgaret commented 3 months ago

Description: During the sync and resync process, we get fatal errors when read from topics with more than one partition.

What we do: We use a sync and resync process with Kafka with parallel reads. We add partitions to the topics that receive data to speed up write and read speeds with the existing 32 consumers.

What we expect: Normal MetaDB behavior.

What we obtain: Fatal errors look like race conditions when writing in DB.

Version: MetaDB main 835232d

Connector config:

"tasks.max": "1",
"include.schema.changes": "true", "truncate.handling.mode": "include", "publication.autocreate.mode": "filtered", “heartbeat.interval.ms": "30000", "heartbeat.action.query": "UPDATE admin.heartbeat set last_heartbeat = now();", "plugin.name": "pgoutput", "max.batch.size":"2048", "max.queue.size": "8129", "snapshot.mode": "initial", "auto.create.topics.enable": "false", "topic.creation.groups": "inventory", "topic.creation.enable": "true", "topic.creation.inventory.include": “EXAMPLE”, "topic.creation.inventory.partitions": "32", "topic.creation.default.replication.factor": "2", "topic.creation.default.partitions": "2", "topic.creation.default.cleanup.policy": "delete", "topic.creation.default.retention.ms": "8640000000" }, "kafkaCluster": { "capacity": { "provisionedCapacity": { "mcuCount": 4, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1",

Errors:

2024-07-31 09:01:27 UTC ERROR: executor: exec command: exec data: merge: matcher: reading matching current row: ERROR: cached plan must not change result type (SQLSTATE 0A000) 2024-07-31 09:01:27 UTC ERROR: executor: exec command: exec data: merge: matcher: reading matching current row: ERROR: cached plan must not change result type (SQLSTATE 0A000)

2024-07-30 20:47:03 UTC ERROR: executor: exec command: schema: creating table "folio_circulation.loan_policy": creating new table "folio_circulation.loan_policy": creating sync table for "folio_circulation.loan_policy": ERROR: relation "zzz_loan_policy___sync" already exists (SQLSTATE 42P07) 2024-07-30 20:48:29 UTC ERROR: executor: exec command: schema: creating table "folio_circulation.loan_policy": creating new table "folio_circulation.loan_policy": creating sync table for "folio_circulation.loanpolicy": ERROR: relation "zzzloan_policy___sync" already exists (SQLSTATE 42P07)

2024-07-30 20:16:15 UTC ERROR: executor: exec command: schema: adding partition for table "folio_agreements.custom_property_definition" year "2024": creating partition: ERROR: relation "zzz_custom_propertydefinition2024" already exists (SQLSTATE 42P07) 2024-07-30 20:16:15 UTC ERROR: executor: exec command: schema: adding partition for table "folio_agreements.custom_property_definition" year "2024": creating partition: ERROR: relation "zzz_custom_propertydefinition2024" already exists (SQLSTATE 42P07)

Stack:

2024-07-30 20:16:15 UTC DETAIL: goroutine 50 [running]: github.com/metadb-project/metadb/cmd/metadb/server.launchPollLoop.func1() /opt/metadb/cmd/metadb/server/poll.go:91 +0xf0 panic({0x11a3980?, 0xc005f27ec0?}) /usr/local/go/src/runtime/panic.go:770 +0x132 github.com/metadb-project/metadb/cmd/metadb/server.launchPollLoop({0x145f060?, 0xc0001a0910?}, 0xc00038e8f9?, 0x5?, 0x0?) /opt/metadb/cmd/metadb/server/poll.go:97 +0xa9 github.com/metadb-project/metadb/cmd/metadb/server.goPollLoop({0x145f060, 0xc0001a0910}, 0xc0002801e0, 0xc0001a0410) /opt/metadb/cmd/metadb/server/poll.go:56 +0x3ba created by github.com/metadb-project/metadb/cmd/metadb/server.mainServer in goroutine 1 /opt/metadb/cmd/metadb/server/server.go:188 +0x2d8

goroutine 1 [sleep]: time.Sleep(0x12a05f200) /usr/local/go/src/runtime/time.go:195 +0x115 github.com/metadb-project/metadb/cmd/metadb/server.mainServer(0xc0001a0410, 0xc0002801e0) /opt/metadb/cmd/metadb/server/server.go:209 +0x2e9 github.com/metadb-project/metadb/cmd/metadb/server.launchServer(...) /opt/metadb/cmd/metadb/server/server.go:138 github.com/metadb-project/metadb/cmd/metadb/server.runServer(0xc0001a0410, 0xc0002801e0) /opt/metadb/cmd/metadb/server/server.go:125 +0x11a github.com/metadb-project/metadb/cmd/metadb/server.loggingServer(0xc0001a0410) /opt/metadb/cmd/metadb/server/server.go:110 +0x3f1 github.com/metadb-project/metadb/cmd/metadb/server.Start(0xc0000e3b80) /opt/metadb/cmd/metadb/server/server.go:80 +0xf8 main.run.func3(0xc00019e500?, {0x12fd4ab?, 0x4?, 0x12fd543?}) /opt/metadb/cmd/metadb/main.go:133 +0xeb github.com/spf13/cobra.(Command).execute(0xc0001b6908, {0xc0001a0370, 0x5, 0x5}) /home/metadb/go/pkg/mod/github.com/spf13/cobra@v1.8.0/command.go:983 +0xaca github.com/spf13/cobra.(Command).ExecuteC(0xc0001b7b08) /home/metadb/go/pkg/mod/github.com/spf13/cobra@v1.8.0/command.go:1115 +0x3ff github.com/spf13/cobra.(*Command).Execute(...) /home/metadb/go/pkg/mod/github.com/spf13/cobra@v1.8.0/command.go:1039 main.run() /opt/metadb/cmd/metadb/main.go:266 +0xa86 main.metadbMain() /opt/metadb/cmd/metadb/main.go:43 +0x4f main.main() /opt/metadb/cmd/metadb/main.go:35 +0x7d

goroutine 6 [select]: github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck(0xc0001cac80) /home/metadb/go/pkg/mod/github.com/jackc/pgx/v5@v5.5.5/pgxpool/pool.go:400 +0xca github.com/jackc/pgx/v5/pgxpool.NewWithConfig.func3() /home/metadb/go/pkg/mod/github.com/jackc/pgx/v5@v5.5.5/pgxpool/pool.go:264 +0x32 created by github.com/jackc/pgx/v5/pgxpool.NewWithConfig in goroutine 1 /home/metadb/go/pkg/mod/github.com/jackc/pgx/v5@v5.5.5/pgxpool/pool.go:262 +0x2b0

goroutine 82 [sleep]: time.Sleep(0x45d964b800) /usr/local/go/src/runtime/time.go:195 +0x115 github.com/metadb-project/metadb/cmd/metadb/server.goMaintenance({0x7ffc278faeb9, 0x10}, {{0xc000040901, 0x37}, {0xc0001c93f1, 0x4}, {0xc0001c9411, 0x6}, {0xc00003eac1, 0x18}, ...}, ...) /opt/metadb/cmd/metadb/server/server.go:258 +0x7a created by github.com/metadb-project/metadb/cmd/metadb/server.goPollLoop in goroutine 50 /opt/metadb/cmd/metadb/server/poll.go:53 +0x36c

goroutine 14 [chan receive]: github.com/metadb-project/metadb/cmd/metadb/server.mainServer.func1() /opt/metadb/cmd/metadb/server/server.go:145 +0x1d created by github.com/metadb-project/metadb/cmd/metadb/server.mainServer in goroutine 1 /opt/metadb/cmd/metadb/server/server.go:144 +0xc5

goroutine 13 [syscall]: os/signal.signal_recv() /usr/local/go/src/runtime/sigqueue.go:152 +0x29 os/signal.loop() /usr/local/go/src/os/signal/signal_unix.go:23 +0x13 created by os/signal.Notify.func1.1 in goroutine 1 /usr/local/go/src/os/signal/signal.go:151 +0x1f

goroutine 16 [IO wait]: internal/poll.runtime_pollWait(0x7f4290b28b60, 0x72) /usr/local/go/src/runtime/netpoll.go:345 +0x85 internal/poll.(pollDesc).wait(0x9?, 0x1ceae80?, 0x0) /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x27 internal/poll.(pollDesc).waitRead(...) /usr/local/go/src/internal/poll/fd_poll_runtime.go:89 internal/poll.(FD).Accept(0xc000296100) /usr/local/go/src/internal/poll/fd_unix.go:611 +0x2ac net.(netFD).accept(0xc000296100) /usr/local/go/src/net/fd_unix.go:172 +0x29 net.(TCPListener).accept(0xc0002880a0) /usr/local/go/src/net/tcpsock_posix.go:159 +0x1e net.(TCPListener).Accept(0xc0002880a0) /usr/local/go/src/net/tcpsock.go:327 +0x30 github.com/metadb-project/metadb/cmd/metadb/libpq.Listen({0x12fface?, 0x9?}, {0x12fd4e7?, 0x4?}, 0xc0000b8510, 0xc0001a0438) /opt/metadb/cmd/metadb/libpq/libpq.go:50 +0x30c created by github.com/metadb-project/metadb/cmd/metadb/server.mainServer in goroutine 1 /opt/metadb/cmd/metadb/server/server.go:184 +0x237

nassibnassar commented 3 months ago

Multiple partitions are not currently supported by Metadb. Changing this issue to a feature request.

nassibnassar commented 2 months ago

Supporting topics with more than one partition sounds to me like it would not preserve the ordering of messages. Could you explain why that is not the case?