speed2exe / myzql

MySQL and MariaDB driver in native Zig
MIT License
32 stars 1 forks source link

Does it support concurrency? An error occurs when querying the same table concurrently ! #16

Closed zdzDesigner closed 1 month ago

zdzDesigner commented 1 month ago

like this :

ping .......ok!
SQL:select id,user_id,stage_id,update_time,parent_id,title,data from stage
ping .......ok!
thread 1230842 panic: integer overflow
SQL:select id,user_id,stage_id,update_time,parent_id,title,data from stage
/home/zdz/Documents/Try/Zig/fork/myzql/src/protocol/packet_reader.zig:94:23: 0x1072b7d in expandBufIfNeeded (bitou)
        if (p.buf.len - p.len >= req_n) {
                      ^
/home/zdz/Documents/Try/Zig/fork/myzql/src/protocol/packet_reader.zig:66:32: 0x1073de8 in readToBufferAtLeast (bitou)
        try p.expandBufIfNeeded(at_least);
                               ^
/home/zdz/Documents/Try/Zig/fork/myzql/src/protocol/packet_reader.zig:51:42: 0x1074717 in readPacket (bitou)
                try p.readToBufferAtLeast(payload_length - n_valid_unread);
                                         ^
/home/zdz/Documents/Try/Zig/fork/myzql/src/conn.zig:269:47: 0x1085e87 in init (bitou)
        const packet = try c.reader.readPacket();
                                              ^
/home/zdz/Documents/Try/Zig/fork/myzql/src/conn.zig:140:34: 0x10868f5 in prepare (bitou)
        return PrepareResult.init(c, allocator);
                                 ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/db/mysql/sqler.zig:296:47: 0x1097b31 in select__anon_11792 (bitou)
            const pre_res = try client.prepare(self.allocator, SQL);
                                              ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/db/mysql/sqler.zig:265:39: 0x1098d44 in selectSlice__anon_11737 (bitou)
            var list = try self.select(keys);
                                      ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/route/stage.zig:40:79: 0x106b1c1 in list (bitou)
    const stages = sqler.in("stage_id", try sqler.toIn(stage_ids)).selectSlice(null) catch |err| {
                                                                              ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/route/manager.zig:122:30: 0x10e843a in f (bitou)
                return handle(&ctx) catch |err| {
                             ^
/home/zdz/Documents/Try/Zig/fork/zig-webui/src/webui.zig:82:17: 0x1092de1 in handle (bitou)
            func(Event.convertWebUIEventT2(tmp_e));
                ^
/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c:9275:33: 0x11d276b in _webui_ws_process (/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c)
                                win->cb[cb_index](&e);
                                ^
/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c:9591:5: 0x11d2d91 in _webui_ws_process_thread (/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c)
    _webui_ws_process(arg->win, arg->client, arg->connection_id, arg->ptr, arg->len, arg->recvNum, arg->event_type);
    ^
???:?:?: 0x758d9c8ab39c in ??? (libc.so.6)
Unwind information for `libc.so.6:0x758d9c8ab39c` was not available, trace may be incomplete

???:?:?: 0x758d9c93049b in ??? (libc.so.6)
thread 1230841 panic: reached unreachable code
/home/zdz/zig/0.13.0/files/lib/std/debug.zig:412:14: 0x106f65c in assert (bitou)
    if (!ok) unreachable; // assertion failure
             ^
/home/zdz/Documents/Try/Zig/fork/myzql/src/conn.zig:313:25: 0x10864ee in prepare (bitou)
        std.debug.assert(c.reader.pos == c.reader.len);
                        ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/db/mysql/sqler.zig:296:47: 0x1097b31 in select__anon_11792 (bitou)
            const pre_res = try client.prepare(self.allocator, SQL);
                                              ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/db/mysql/sqler.zig:265:39: 0x1098d44 in selectSlice__anon_11737 (bitou)
            var list = try self.select(keys);
                                      ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/route/stage.zig:40:79: 0x106b1c1 in list (bitou)
    const stages = sqler.in("stage_id", try sqler.toIn(stage_ids)).selectSlice(null) catch |err| {
                                                                              ^
/home/zdz/Documents/Try/Zig/zig-pro/bitou/src/route/manager.zig:122:30: 0x10e843a in f (bitou)
                return handle(&ctx) catch |err| {
                             ^
/home/zdz/Documents/Try/Zig/fork/zig-webui/src/webui.zig:82:17: 0x1092de1 in handle (bitou)
            func(Event.convertWebUIEventT2(tmp_e));
                ^
/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c:9275:33: 0x11d276b in _webui_ws_process (/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c)
                                win->cb[cb_index](&e);
                                ^
/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c:9591:5: 0x11d2d91 in _webui_ws_process_thread (/home/zdz/.cache/zig/p/1220f00a2d3eb6ed190b21c2d3f2864a27a2bead20b8ee64c3d707c948f2a41b5fa3/src/webui.c)
    _webui_ws_process(arg->win, arg->client, arg->connection_id, arg->ptr, arg->len, arg->recvNum, arg->event_type);
    ^
???:?:?: 0x758d9c8ab39c in ??? (libc.so.6)
Unwind information for `libc.so.6:0x758d9c8ab39c` was not available, trace may be incomplete

However, concurrent queries on different tables work fine.

ping .......ok!
ping .......SQL:select id,user_id,stage_id,update_time,parent_id,title,data from stage
ok!
SQL:select id,user_id,device_id,update_time,action_type,action_entity,action_entity_id from operation
===== send end =====
===== send end =====
speed2exe commented 1 month ago

@zdzDesigner If I am understanding correctly, you spawn 2 threads which uses the same connection? If that is the case then it is expected. The operations on a connection are not thread safe.

zdzDesigner commented 1 month ago

I added logs for the thread IDs, but they don't seem to be from the same connection

myzql:thread_id:3511020
myzql:thread_id:3511021
ping .......ok!
thread_id:3511020
SQL:select id,user_id,stage_id,update_time,parent_id,title,data from stage
ping .......ok!
thread_id:3511021
SQL:select id,user_id,stage_id,update_time,parent_id,title,data from stage
thread 3511020 panic: integer overflow
/home/zdz/Documents/Try/Zig/fork/myzql/src/protocol/packet_reader.zig:96:23: 0x14c789d in expandBufIfNeeded (bitou)
        if (p.buf.len - p.len >= req_n) {
                      ^
/home/zdz/Documents/Try/Zig/fork/myzql/src/protocol/packet_reader.zig:66:32: 0x14c8b08 in readToBufferAtLeast (bitou)
        try p.expandBufIfNeeded(at_least);
                               ^
/home/zdz/Documents/Try/Zig/fork/myzql/src/protocol/packet_reader.zig:51:42: 0x14c9437 in readPacket (bitou)
                try p.readToBufferAtLeast(payload_length - n_valid_unread);
                                         ^
speed2exe commented 1 month ago

@zdzDesigner Couple of questions to understand this better:

  1. Does this always give you the same error every time you run?
  2. Are you spawning another zig thread or zig process that shares the same underlying Conn in src/conn.zig?
  3. Are you consuming all the results before the next execution?

If you have same sample code to show how the library is used, it will be helpful too.

zdzDesigner commented 1 month ago
  1. yes
  2. The current thread_id is printed from myzql. How can I verify that it "shares the same underlying Conn in src/conn.zig"? image
  3. If they are different threads, would consuming the allocator after use also cause similar issues?

I think a minimal test demo is needed.

zdzDesigner commented 1 month ago

You are right. I wrote a minimal test demo, and everything is working fine.

const std = @import("std");
const mem = std.mem;
const myzql = @import("myzql");
const Conn = myzql.conn.Conn;
const ResultSet = myzql.result.ResultSet;
const TextResultRow = myzql.result.TextResultRow;
const BinaryResultRow = myzql.result.BinaryResultRow;
const ResultRowIter = myzql.result.ResultRowIter;
const TextElems = myzql.result.TextElems;
const PreparedStatement = myzql.result.PreparedStatement;

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    const thread = try std.Thread.spawn(.{}, fetchData, .{allocator});
    const thread2 = try std.Thread.spawn(.{}, fetchData, .{allocator});
    const thread3 = try std.Thread.spawn(.{}, fetchData, .{allocator});
    const thread4 = try std.Thread.spawn(.{}, fetchData, .{allocator});

    thread.join();
    thread2.join();
    thread3.join();
    thread4.join();

    std.time.sleep(std.time.ns_per_s * 3);
}

fn fetchData(allocator: mem.Allocator) !void {
    var client = try init(allocator);
    const pre_res = try client.prepare(allocator, "select id,user_id,stage_id,update_time,parent_id,title,data from stage");
    defer pre_res.deinit(allocator);

    // exec ===================
    const res = try client.executeRows(&try pre_res.expect(.stmt), .{});
    const rows: ResultSet(BinaryResultRow) = try res.expect(.rows);
    const rows_iter: ResultRowIter(BinaryResultRow) = rows.iter();

    var rets = std.ArrayList(Stage).init(allocator);
    while (try rows_iter.next()) |row| {
        const op = try row.structCreate(Stage, allocator);
        try rets.append(op.*); // 拷贝[]const u8
    }
    std.debug.print("rets.items.len:{d}\n", .{rets.items.len});
}

fn init(allocator: mem.Allocator) !Conn {
    // const bef_time = std.time.microTimestamp();
    var client = try Conn.init(allocator, &.{
        .username = "xxx",
        .password = "xxx",
        .database = "xxx",
    });
    std.debug.print("ping .......", .{});
    try client.ping();
    std.debug.print("ok!\n", .{});
    // const aft_time = std.time.microTimestamp();
    // std.debug.print("time:{d},{d}\n", .{ std.time.microTimestamp(), aft_time - bef_time });
    return client;
}

pub const Stage = struct {
    id: u32 = 0,
    user_id: u32 = 0,
    stage_id: u32 = 0,
    update_time: u32 = 0,
    parent_id: u32,
    title: []const u8,
    data: []const u8,
};

Maybe it's an issue with the arena allocator I'm using

speed2exe commented 1 month ago
  1. Looking at the logs and code, it seems like each thread calls the init, which returns a new Conn for each thread, so this is fine.
  2. Even on a single thread, you should always consume all results (if it returns rows) that comes after every query. This is because the result may not be returned all at once to the client (if it's large enough).
speed2exe commented 1 month ago

@zdzDesigner I'm not sure if zig's general purpose allocator is thread-safe based on your machine. When initializing, you may do: var gpa = std.heap.GeneralPurposeAllocator(.{ .thread_safe = true }){}; to enforce thread safety

zdzDesigner commented 1 month ago

@speed2exe Thanks a lot. I allocated a GeneralPurposeAllocator separately when calling myzql, and everything worked fine.