Using pg.zig with multiple threads or workers with Zap #15

richard-powers closed 7 months ago

richard-powers commented 7 months ago

(using zig 0.11.0)

I've been working on a zap project that utilizes pg.zig to pull data from a database. Everything seems to work fine, unless I try using multiple threads or workers.

I assume there is a way to do this, but I'm likely not taking the correct approach.

I'm using a Pool with size = 10, and hitting the server with 9 concurrent queries. This works perfectly fine when using only 1 thread and 1 worker via zap. If I use more than 1 thread or worker, I start seeing messages like:

Segmentation fault at address 0x78e8fee44000
error: PG bind message has 4 result formats but query has 3 columns

Error GET /products - error.PG - - [Tue, 26 Mar 2024 21:28:56 GMT] "GET /configurations/products HTTP/1.1" 500 46b 97331us
Segmentation fault at address 0x78e8fee44000
/home/user/zig/0.11.0/files/lib/compiler_rt/memcpy.zig:19:21: 0x68c650 in memcpy (compiler_rt)
            d[0] = s[0];

The segfault only happens when using multiple workers, but I'll still get the error: PG bind message has 4 result formats but query has 3 columns message when only using more than 1 "thread".

Source code of one of the locations where this error occurs (it can happen for any endpont...):

pub fn queryProducts(alloc: std.mem.Allocator, companyId: usize) ![]Product {
    const conn = try pool.aquire();
    defer conn.release();

    const sql =
        \\ SELECT * FROM public."Product" p
        \\ WHERE p."companyId"=$1
        \\ ORDER BY p."name" ASC;

    var result = conn.query(sql, .{companyId}) catch |err| {
        if (err == error.PG) {
            if (conn.err) |pge| {
                std.log.err("PG {s}\n", .{pge.message});
        return err;
    defer result.deinit();

    var arr = std.ArrayList(Product).init(alloc);
    while (try result.next()) |row| {
        const description = row.get(?[]const u8, 3);
        try arr.append(.{
            .id = row.get(i32, 0),
            .companyId = row.get(i32, 1),
            .name = try alloc.dupe(u8, row.get([]const u8, 2)),
            .description = if (description) |desc| try alloc.dupe(u8, desc) else null,
    return arr.toOwnedSlice();

Again, there's no error when only using one zap "thread" and one "worker".

Any thoughts?

karlseguin commented 7 months ago

May or may not be 2 separate issues. The project isn't available for me to look at by any chance?

I'm curious how pool is initialized and stored.

I assume Product looks like:

id integer,
companyId integer,
name text,
description text null

I can tell by your code that it has 4 columns, so pg.zig was correct in providing 4 format types and its weird that PG is complaining that there are only 3. What version of PG?

It might help to see the network trace. sudo tcpdump -A -i localhost port 5432 -s 65535 -w trace.pcap and then providing the generated trace.pcap..that's assuming pg is on port 5432..and this would also capture any user/passwords.

richard-powers commented 7 months ago

It's private but I can share a good bit of the code.

pub const Product = struct {
    id: i32,
    companyId: i32,
    name: []const u8,
    description: ?[]const u8,

Using port 6543 for postgres (it's using supabase's pooler). Postgres version:

I have the pool in its own file, so it can be referenced by all of our endpoints. Maybe this is an issue?

// database.zig

const std = @import("std");
const pg = @import("pg");
const Pool = pg.Pool;
const Result = pg.Result;
const Conn = pg.Conn;

pub const Self = @This();

var pool: *Pool = undefined;

pub const DbOptions = struct {
    host: []const u8,
    port: u16,
    database: []const u8,
    username: []const u8,
    password: []const u8,
    timeout: u32 = 10_000,

pub fn init(allocator: std.mem.Allocator, opts: DbOptions) !void {
    pool = try Pool.init(allocator, .{
        .size = 10,
        .connect = .{
            .host = opts.host,
            .port = opts.port,
        .auth = .{
            .username = opts.username,
            .database = opts.database,
            .password = opts.password,
            .timeout = opts.timeout,

pub fn deinit() void {

pub fn aquireConnection() !*Conn {
    return pool.acquire();

This is how the pool is initialized:

// main.zig

fn initDatabase(alloc: std.mem.Allocator) !void {
    var envMap = try std.process.getEnvMap(alloc);
    defer envMap.deinit();

    const dbOpts = DbOptions{
        .host = envMap.get("DB_HOST").?,
        .port = try std.fmt.parseInt(u16, envMap.get("DB_PORT").?, 10),
        .database = envMap.get("DB_DATABASE").?,
        .username = envMap.get("DB_USERNAME").?,
        .password = envMap.get("DB_PASSWORD").?,

    try database.init(alloc, dbOpts);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{
        .thread_safe = true,
        .stack_trace_frames = 100,
    const alloc = gpa.allocator();

    // Scope everything that can allocate within this block for leak detection
        // Database initialization
        try initDatabase(alloc);
        // ...

    // ...

If anything here isn't super obvious, I'll collect a pcap and share it here

karlseguin commented 7 months ago

This looks ok.

I'm going to play with it a bit and see if I can reproduce the issue. Just so I'm clear, you have other functions being called, right, not just queryProducts and at least one of those functions is doing a query that returns 3, not 4, columns? I'd be pretty shocked if that wasn't the case.

richard-powers commented 7 months ago

Most other endpoints actually return 3 columns, Product is the only one that returns 4.

This seems to only happen when I'm hitting multiple different endpoints, if I only hit the products endpoint, there are no issues

I can try creating a min-repro tomorrow if that would help

karlseguin commented 7 months ago

This is what i wrote to try to simulate the issue, but it hasn't failed yet:

const std = @import("std");
const pg = @import("pg");

const ITERATIONS = 10_000;

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var pool = try pg.Pool.init(allocator, .{
        .size = 5,
        .auth = .{
            .database = "postgres",
            .username = "postgres",
    defer pool.deinit();

        var conn = try pool.acquire();
        defer conn.release();
        _ = try conn.exec("drop table if exists thread_test", .{});
        _ = try conn.exec("create table thread_test (a integer primary key, b integer, c integer, d integer)", .{});
        _ = try conn.exec("insert into thread_test values (1, 2, 3, 4), (10, 11, 12, 13), (20, 21, 22, 23)", .{});

    const t1 = try std.Thread.spawn(.{}, thread1, .{pool});
    const t2 = try std.Thread.spawn(.{}, thread2, .{pool});
    const t3 = try std.Thread.spawn(.{}, thread3, .{pool});
    const t4 = try std.Thread.spawn(.{}, thread4, .{pool});


fn thread1(pool: *pg.Pool) !void {
    for (0..ITERATIONS) |_| {
        var conn = try pool.acquire();
        defer conn.release();

        var rows = conn.query("select a from thread_test order by a", .{}) catch |err| {
            std.debug.print("T1 {s}", .{conn.err.?.message});
            return err;

        defer rows.deinit();
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 1);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 10);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 20);

fn thread2(pool: *pg.Pool) !void {
    for (0..ITERATIONS) |_| {
        var conn = try pool.acquire();
        defer conn.release();

        var rows = conn.query("select a, b from thread_test order by a", .{}) catch |err| {
            std.debug.print("T2 {s}", .{conn.err.?.message});
            return err;

        defer rows.deinit();
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 1);
            std.debug.assert(row.get(i32, 1) == 2);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 10);
            std.debug.assert(row.get(i32, 1) == 11);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 20);
            std.debug.assert(row.get(i32, 1) == 21);

fn thread3(pool: *pg.Pool) !void {
    for (0..ITERATIONS) |_| {
        var conn = try pool.acquire();
        defer conn.release();

        var rows = conn.query("select a, b, c from thread_test order by a", .{}) catch |err| {
            std.debug.print("T3 {s}", .{conn.err.?.message});
            return err;

        defer rows.deinit();
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 1);
            std.debug.assert(row.get(i32, 1) == 2);
            std.debug.assert(row.get(i32, 2) == 3);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 10);
            std.debug.assert(row.get(i32, 1) == 11);
            std.debug.assert(row.get(i32, 2) == 12);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 20);
            std.debug.assert(row.get(i32, 1) == 21);
            std.debug.assert(row.get(i32, 2) == 22);

fn thread4(pool: *pg.Pool) !void {
    for (0..ITERATIONS) |_| {
        var conn = try pool.acquire();
        defer conn.release();

        var rows = conn.query("select a, b, c, d from thread_test order by a", .{}) catch |err| {
            std.debug.print("T4 {s}", .{conn.err.?.message});
            return err;

        defer rows.deinit();
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 1);
            std.debug.assert(row.get(i32, 1) == 2);
            std.debug.assert(row.get(i32, 2) == 3);
            std.debug.assert(row.get(i32, 3) == 4);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 10);
            std.debug.assert(row.get(i32, 1) == 11);
            std.debug.assert(row.get(i32, 2) == 12);
            std.debug.assert(row.get(i32, 3) == 13);
            var row = (try rows.next()).?;
            std.debug.assert(row.get(i32, 0) == 20);
            std.debug.assert(row.get(i32, 1) == 21);
            std.debug.assert(row.get(i32, 2) == 22);
            std.debug.assert(row.get(i32, 3) == 23);
richard-powers commented 7 months ago

I've recreated a small example: https://github.com/richard-powers/pg.zig-error-repro

I spin up the server with zig build run -freference-trace and then run the queries against it with node benchmark.js. It may work a couple times, but eventually you'll see error: PG bind message has 4 result formats but query has 3 columns and sometimes it'll segfault.

I'm wondering if it has more to do with using Zap, since your example works. It's using facil.io under the hood, which spins up new workers and threads...

karlseguin commented 7 months ago

I don't think it's related to Zap. Each execution seems self-contained, except for that shared pool (which I think you're using correctly).

I can't reproduce it with your example. I have it looping 100000 times, and from how you describe the issue, I feel like it should be happening pretty often. I had a thought that maybe latency was part of the issue, but even testing with a remote database, I can't get it to fail.

I created a debug branch. It prints data out to stderr, it would capture a subset of what tcpdump would...specifically the describe message that's sent, the reply to the message and the bind message, along with the thread id and address of the conn. Any chance you can run using that branch and capture stderr?

richard-powers commented 7 months ago

Error occurred on the 7th attempt:

Listening on
karlseguin commented 7 months ago

Solved on discord.

Turns out this is an issue with Zap, it can be configured to spawn multiple "workers" (aka processes), and then that doesn't play very nicely with a global pool, of course.

IbrahimOuhamou commented 2 weeks ago

@karlseguin and what is the solution? I use jetzig

I have in db.zig

var pool: *pg.Pool = undefined;
pub fn init() !void {
    pool = pg.Pool,init(...);

pub fn acquire() !*pg.Conn {
    return try pool.acquire();

it segfaults at request

karlseguin commented 2 weeks ago

The issue was specific to Zap which will launch multiple processes and thus doesn't work with globals (which is well documented in Zap also).

There should be no such issue with JetZig. Are you sure init is being called? Can you share some code that I can try to reproduce it? Are you able to debug the code using gdb/lldb?

You haven't provided much information here, so it's hard to know what's going on.

IbrahimOuhamou commented 2 weeks ago


in src/main.zig

try db.init(allocator2, 4);
defer db.deinit();

in src/app/lib/db.zig

pub var pool: *pg.Pool = undefined;

pub fn init(allocator: std.mem.Allocator, size: u8) !void {
    pool = try pg.Pool.init(allocator, .{ .size = size, .connect = .{
        .port = 5432,
        .host = "",
    }, .auth = .{
        .username = "postgres",
        .database = "bismi_allah_db",
        .password = "bismi_allah",
        .timeout = 10_000,
    } });

    // to test if it works I did this
    var conn = try pool.acquire();

pub fn deinit() void {

/// you need to call `conn.deinit()`
/// on the result
pub fn acquire() !*pg.Conn {
    std.debug.print("alhamdo li Allah called aquire\n", .{});
    return try pool.acquire();

in a view function

    // here it segfaults while accessing
    // `pool._conns`
    var conn = try db.acquire();
    defer conn.deinit();

it segfaults on acquire

karlseguin commented 2 weeks ago

I don't see anything wrong with the above. If you can't debug it with a debugger, can you put a debug statement atop pool.deinit to make sure it isn't being called prematurely?

IbrahimOuhamou commented 2 weeks ago

@karlseguin how do I add a debug statement in zig? if you mean std.debug.print() then I did and it prints nothing

karlseguin commented 2 weeks ago

Yes, that's what I meant. Are you able to produce a full reproducible example or share the full program?

IbrahimOuhamou commented 2 weeks ago

the code from before was kinda of the whole db code. it crashed at acquire().

just run jetzig init copy the src/app/lib/db.zig and then just add the two lines in main.zig then call it in src/app/views/root.zig or any other view

karlseguin commented 2 weeks ago

@IbrahimOuhamou I've talked to @bobf (creator of JetZig) about this, and he's pushed a change to JetZig which fixes this and he's created a branch for a demo which shows how to use it:

  1. Define pub const Global = pg.Pool; in src/main.zig.
  2. Create a pool and pass to app: app.start(routes, .{ .global = pool });
  3. Use request.global.acquire (request.global is the *anyopaque cast/aligned to Global)

If you're still having issues, it might be best to take it up in the JetZig discord, or to open an issue with JetZig.

IbrahimOuhamou commented 2 weeks ago

@karlseguin thank you, you helped me a lot