jackc / pgx

PostgreSQL driver and toolkit for Go
MIT License
10.51k stars 832 forks source link

Wrapper for asynchronous DB queries #1351

Open alexnivanov opened 1 year ago

alexnivanov commented 1 year ago

Is your feature request related to a problem? Please describe. There's a simple use case: run several DB queries in parallel instead of sequentially for better response time.

Describe the solution you'd like I've create a wrapper like this:

func withDbAsync[T any](dbPool *pgxpool.Pool, handler func(*pgxpool.Conn) (*T, error)) (chan *T, chan error) {
    resultChan := make(chan *T)
    errorChan := make(chan error)

    go func() {
        conn, err := dbPool.Acquire(context.Background())
        if err != nil {
            errorChan <- err
            return
        }
        defer conn.Release()

        res, err := handler(conn)
        if err != nil {
            errorChan <- err
            return
        } else {
            errorChan <- nil
            resultChan <- res
        }
    }()

    return resultChan, errorChan
}

(handler function does actual querying synchronously).

Then it's possible to combine 2 (and more) queries like this:

res1Chan, err1Chan := withDbAsync(
    dbPool,
    func(conn *pgxpool.Conn) (*int32, error) {
        var count int32

        row := conn.QueryRow(context.Background(), "SELECT COUNT(*) FROM item1")
        if err := row.Scan(&count); err != nil {
            return nil, err
        }
        return &count, nil
    },
)

res2Chan, err2Chan := withDbAsync(
    dbPool,
    func(conn *pgxpool.Conn) (*int32, error) {
        var count int32

        row := conn.QueryRow(context.Background(), "SELECT COUNT(*) FROM item2")
        if err := row.Scan(&count); err != nil {
            return nil, err
        }
        return &count, nil
    },
)

err1 := <- err1Chan
if err1 != nil {
    // return err1
}

err2 := <- err2Chan
if err2 != nil {
    // return err2
}

// no errors
res1 := <- res1Chan
res2 := <- res2Chan

The task of querying DB asynchronously seems to be a must have in modern times...

jackc commented 1 year ago

Technically, one connection cannot run multiple queries in parallel. I think what you are asking for is pipelining.

And you are partially in luck. Pipeline support is available in pgconn: https://pkg.go.dev/github.com/jackc/pgx/v5@v5.0.3/pgconn#Pipeline. However, that is a much lower level interface. pgx uses pipelining to implement batch support but does expose pipelining directly. pgx does provide the helper function RowsFromResultReader to aid 3rd party integrations.

There are a number of issues involved that make a one size fits all high level pipelined query API difficult to design. Just a few of which are:

  1. Context / query cancellation becomes extremely tricky.
  2. Queries can get stuck behind long running queries.
  3. Transactions are difficult or impossible.
  4. Synchronization is challenging.
  5. If queries also need to be multiplexed across multiple connections then we need load balancing of some sort.

I wouldn't rule out adding it to core pgx, but finding a broadly usable API might be tricky. I'd probably want to see a 3rd party implementation and get some user experience reports.

alexnivanov commented 1 year ago

But the idea is to use multiple connections! Like in my code snippet we acquire connection for each query, and they can run in parallel.

I got this pattern from Slick (https://scala-slick.org), where there's actually a pool of ~20 DB connections, and all DB queries return Future result (somewhat similar to Golang channel) to allow for parallel execution.

jackc commented 1 year ago

But the idea is to use multiple connections! Like in my code snippet we acquire connection for each query, and they can run in parallel.

So you just want to run multiple queries 1 to 1 with connections (i.e. no pipelining)? Then I don't understand how pgxpool.Pool and goroutines aren't enough.

alexnivanov commented 1 year ago

Well, they are enough, you just have to write a lot of boilerplate code which seems to be inseparable from the framework.

jackc commented 1 year ago

🤷 AFAICT there is nothing that would need to change in core pgx to allow this in a 3rd party library.

That said, I suppose it might be convenient to have this sort of thing in core. So I guess if someone wants to try this I would be open to merging it, but I don't have any immediate plans to do it myself.

Also, even though your request is simpler than I originally thought, there are still edge cases and some interesting API design decisions to make such as:

  1. Should / how to limit the number of connections consumed by one set of these requests.
  2. What would be the interface? Channels, promises, something like the current batch system, etc.?


For a really crazy idea, I could almost see some sort of query executor / manager that could handle parallel queries like you requested here, and automatic pipelining like I originally thought you wanted, along with batching and normal queries. Might be too complicated, but the idea of something that you can throw a bunch of queries at and it figures out how best to execute them is intriguing. 🤔

alexnivanov commented 1 year ago
  1. I think there should be a configurable fixed pool size, smith around 10-20 should be a good default value.
  2. I'm new in Go, but seems like channel return value seems to be the most simple/idiomatic, like a replacement of Java/Scala Future.

As for more complex manager with pipelining - well sounds cool, but my expertise doesn't go that far :)

alexnivanov commented 1 year ago

Btw: I googled and found this library which is very similar to what I wanted: https://github.com/naoina/asynql Perhaps I'd create a similar library for pgx...