jackc / pgx

PostgreSQL driver and toolkit for Go
MIT License
10.82k stars 845 forks source link

Multi-Tenant Databases Using pgx #288

Closed fluffybonkers closed 5 years ago

fluffybonkers commented 7 years ago

Hello all,

I hope this is not a question with an obvious answer, but I am a Go beginner interested in using Go and pgx to design a multi-tenant application with a Go REST API.

This obviously means I need a way of segregating tenant's data - all my tables will have a tenant_id. Now let's say that I include a tenant_id in each user's JWT being passed to the REST API. This tenant_id gets passed down the stack until I get to the methods accessing the Postgres database.

However, I don't want to append WHERE tenant_id = ? for every query, as this is error prone and makes for longer queries. To ensure that data are seamlessly dealt out to the correct tenant, I want to use Postgres's Row Level Security with a session context variable as outlined here. This means I need a way to set the user on every database connection. I want to use the stdlib to get back a database/sql connection to use with my ORM (the excellent SQLBoiler).

To do this, all I could think of was using pgx's AfterConnect hook on a connection pool and passing the tenant_id contained in the JWT into it to set the session variable on the connection for that user. However, the parameters of AfterConnect won't allow that and I want to try and avoid using AfterConnect accessing some global variable to set it (with methods further up the chain setting the variable) as then all the logic is not in the method. The idea would be that the tenant_id would have to be passed to the AfterConnect method with every call to OpenFromConnPool to ensure that programmers can't miss setting the Postgres session variable. Is it worth adding an empty interface to the AfterConnect method for these sorts of use cases?

I am sorry if this is a stupid question and I am more than open to other ideas that people have about solving this problem, as I am quite stuck.

ARolek commented 7 years ago

@fluffybonkers we implemented something similar in one of our apps and that exact article was really helpful during the design phase. Basically we acquire a connection from a connection pool and set the tenant before returning a reference to our PGClient for the application to work with. We have a simple method like the following:

func NewConn(tenantID string) (*PGClient, error) {
    conn, err := pool.Acquire()
    if err != nil {
        return nil, err
    }

    client := PGClient{
        driver: conn,
        tx:     nil,
        conn:   conn,
    }

    if len(tenantID) > 0 {
        client.SetSessionTenantID(tenantID)
    }

    return &client, nil
}

Where SetSessionTenantID() looks like:

func (pg *PGClient) SetSessionTenantID(tenantID string) error {
    var err error

    sql := `SELECT set_tenant($1)`

    _, err = pg.driver.Exec(sql, tenantID)

    return err
}
jackc commented 7 years ago

@fluffybonkers Anything done with AfterConnect and OpenFromConnPool is going to permanently change those connections. This would require a separate *sql.DB / pgx.ConnPool for each tenant.

What you really want is to check out a single connection for the duration of the HTTP request and set tenant ID at the start as @ARolek suggests above.

But it's a little trickier in your case because you are using the database/sql interface and database/sql does not directly expose individual connections. Here are some options:

  1. Use pgx native interface. This lets you manually check out and release connections from the pool. But you lose the ability to use SQLBoiler.
  2. Use a single transaction for the entire request. sql.Tx are locked to a single connection. You could start the Tx and set tenant at the start of each request. But this hampers any transaction management you may want to do in your request.
  3. Use Go 1.9 beta. It adds an ability to manually check out connections from the pool https://tip.golang.org/pkg/database/sql/#DB.Conn
fluffybonkers commented 7 years ago

Thank you both for your helpful advice.

@jackc I don't really want to stop using SQLBoiler and I can't really take option 2 because I don't think I can give up transaction management. Thus, option 3 seems like the only one, though I am strongly considering switching to something like PostgraphQL (rather than a pgx, SQLBoiler, Goa stack) given the speed with which I think it would let me develop and because multi-tenant seems to have been done with Node.js in a big way before (Auth0).

I feel a bit like I am in unchartered territory with Go, both because of my lack of experience with the language and because features in the language (like manual check out from a pool) are only coming now in a beta.

Decisions, decisions...

fluffybonkers commented 7 years ago

I have been using PostgraphQL and it is awesome for CRUD, however, I still want to use more traditional REST-based micro services for certain operations my application will need.

To this end, I am eventually going to circle back to the problems I was having with multi-tenancy in Go. I have been looking at source code in both SQLBoiler and pgx and I think I have a slightly better grip on the problems, but I am hoping the wise ones here can shed some more light.

  1. Contra @jackc advice, I am not sure I can use Go 1.9 and connection pooling because the function linked to returns a *Conn and this can't substitute for a *DB in SQLBoiler - see the all important Executor interface here - it doesn't support context and that's a problem. Is this right?
  2. I can almost use pgx natively, per your 1st suggestion @jackc because it almost conforms to the interface I linked to above (if you just change to pgx.Rows etc.). The only problem is that Exec from pgx returns a CommandTag and this is where I get confused - why can't pgx just conform to the same interface as sql.DB with just pgx.Result? What is a CommandTag? What is it for and what advantage does it offer over a simple Result type?
jackc commented 7 years ago
  1. Perhaps SQLBoiler will add more support for Go 1.9 in time.
  2. Regarding CommandTag vs Result, the command tag is what PostgreSQL actually returns as the result of a query. It can have a little more information than simply the number of rows affected. Also, the Result interface includes a number of items inapplicable to PostgreSQL/pgx. pgx's RowsAffected will never return an error, and LastInsertId is not supported by PostgreSQL.
mvrhov commented 6 years ago

@jackc Can we get AfterAcquire functionality into the pool? Or some other solution, where a "dynamic" query could be run immediately after transaction start or after the connection is acquired from the pool? The solution purposed by @ARolek is a bit annoying and also the docs state that if you manually acquire you also have to release. So this is pretty error prone.

jackc commented 6 years ago

What would AfterAcquire look like?

mvrhov commented 6 years ago

Something that would get a connection in so one could run arbitrary queries. However I've been thinking a lot about this and the problem is that the parameters to that queries would be dynamic e.g. they are different from request to request. So it would be impossible to have a generic function.

I don't know yet if is is feasible but IMO in the best way forward would probably be to copy the pool functionality into your own code and adapt it properly.

The use case is really almost the same as above. Run a query for each transaction implicit or explicit setting some variable depending on the current request.

mvrhov commented 6 years ago

Hm. It seems that's not possible to have a copy of Connection pooler in package that's under application control. The problem is that the original pooler uses unxeported variables.

../../internal/postgres/conn_pool_tenant.go:75:15: undefined: minimalConnInfo
../../internal/postgres/conn_pool_tenant.go:141:4: c.poolResetCount undefined (cannot refer to unexported field or method poolResetCount)
../../internal/postgres/conn_pool_tenant.go:176:4: c.poolResetCount undefined (cannot refer to unexported field or method poolResetCount)
../../internal/postgres/conn_pool_tenant.go:202:9: conn.ctxInProgress undefined (cannot refer to unexported field or method ctxInProgress)
../../internal/postgres/conn_pool_tenant.go:206:9: conn.txStatus undefined (cannot refer to unexported field or method txStatus)
../../internal/postgres/conn_pool_tenant.go:210:13: conn.channels undefined (cannot refer to unexported field or method channels)
../../internal/postgres/conn_pool_tenant.go:212:8: conn.die undefined (cannot refer to unexported field or method pgx.(*Conn).die)
../../internal/postgres/conn_pool_tenant.go:214:7: conn.channels undefined (cannot refer to unexported field or method channels)
../../internal/postgres/conn_pool_tenant.go:216:6: conn.notifications undefined (cannot refer to unexported field or method notifications)
../../internal/postgres/conn_pool_tenant.go:220:9: conn.poolResetCount undefined (cannot refer to unexported field or method poolResetCount)
../../internal/postgres/conn_pool_tenant.go:220:9: too many errors
mvrhov commented 6 years ago

This is what I've come up with, however this doesn't work because pgx pooler uses unexported fields.

edit moved code to gist

mvrhov commented 6 years ago

Can we make connPool on Rows Exported field or at least SetConnPool function so custom poolers are possible? And A NewBatch function for batches with conn, connPool, err parameters. Or do you have a different suggestion.

jackc commented 6 years ago

I am very much in favor of decoupling ConnPool from the rest of the system such that 3rd party poolers could be used.

However, when I tried to go that direction a year or two ago I ran into some issues. ConnPoll directly manipulates some of the internals of Conn, Rows, and Tx. I suppose the simplest way is to just export all the required fields, but I'm wary of leaking all that internal state to the public interace (and thereby making it more difficult to make non breaking changes in the future).

I think the real solution is a significant refactor such that ConnPool doesn't need to be inside the pgx package. If that was done then that would ensure that a custom connection pool could do anything the built in one can ... but would be a substantial amount of work.

glerchundi commented 6 years ago

In case someone is interested, we achieved multi-tenancy in SQLBoiler with lib/pq by using a request scoped context key and wrapping the driver. This should be easily portable to pgx.

The magic happens in ExecContext and QueryContext just before calling the real exec or query we run a USE 'tenant_db'.

https://gist.github.com/glerchundi/315be9ae9e4b72c467f4ef39d57ef004

The drawback is that per each query we do an extra roundtrip to the database just to USE the correct database in the current connection. I tried to do all in one by executing a multi-statement with USE 'tenant_db';SELECT * FROM users WHERE name = $1 buuut it seems that prepared statements are not supported in multi-statement queries pq: prepared statement had 2 statements, expected 1. I don't know if this is happening in this driver or it is a limitation of PostgreSQL itself, will try once i find a free time slot.

@fluffybonkers we're using the v3 branch of SQLBoiler and we're loving it, context support for the win.

glerchundi commented 6 years ago

Getting the same behaviour with pgx: ERROR: prepared statement had 2 statements, expected 1 (SQLSTATE 42P14). And as for this comment it seems that PostgreSQL doesn't support either.

mvrhov commented 6 years ago

@glerchundi I'm interested in achieving this with pgx only.

glerchundi commented 6 years ago

@mvrhov change "github.com/lib/pq" to "github.com/jackc/pgx/stdlib" and line 56 from c, err := pq.Open(name) to c, err := (&stdlib.Driver{}).Open(name).

jackc commented 5 years ago

The original functionality requested is now possible with v4. The connection pool has a BeforeAcquire hook. This hook receives the context from the original query so it would be possible in that hook to read the context and perform whatever connection setup you needed. There is also AfterRelease which could be used to cleanup.

johanbrandhorst commented 4 years ago

@jackc that is awesome, do you have an end to end example of this? In particular, what cleanup, if any is necessary?

jackc commented 4 years ago

I do not have an example of this type usage.

But presumably in the BeforeAcquire hook you would set whatever connection state is being used by your RLS rules. e.g. set role if you are using real PG users or set if you are using a custom variable to store your application user id.

In the AfterRelease hook you would undo whatever was done in BeforeAcquire. e.g. reset role or set my.var to default.

johanbrandhorst commented 4 years ago

Sorry to bump this issue but I dug into this a bit more and it appears to only be supported by pgxpool (https://pkg.go.dev/github.com/jackc/pgx/v4/pgxpool?tab=doc) at this point. Would it be possible to add the BeforeAcquire and AfterRelease hooks to pgx.ConnConfig or something similar, so that they can be used with stdlib? I can raise a separate issue if necessary.

jackc commented 4 years ago

I don't think it is possible. To my knowledge the database/sql pool does not provide any hooks for before acquire. The SessionResetter interface might be able to be used for an after release hook. But when I was experimenting with it recently even an empty implementation cost 4000-5000ns a query -- I don't think I want to make everyone pay that cost.

smiletrl commented 3 years ago

@jackc awesome job! This new pgxpool works indeed!

I have something like this to work:

  1. Enable row level security for table products, this table has a column tenant_id.
ALTER TABLE products ENABLE ROW LEVEL SECURITY;

CREATE POLICY product_isolation_policy ON products
USING (tenant_id = current_setting('app.current_tenant'));
  1. Create a function to set dynamic configuration like:
CREATE OR REPLACE FUNCTION set_tenant(tenant_id text) RETURNS void AS $$
BEGIN
    PERFORM set_config('app.current_tenant', tenant_id, false);
END;
$$ LANGUAGE plpgsql;
  1. Initialize the pgxpool like below when app starts. Pool only needs to be initialized once.
func NewPool(cfg config.PostgresqlConfig) (pool *pgxpool.Pool, err error) {
    name := cfg.Name
    user := cfg.TenantUser
    pass := cfg.TenantPassword
    host := cfg.Host
    port := cfg.Port

    dsn := fmt.Sprintf("user=%s password=%s dbname=%s host=%s port=%s sslmode=disable",
        user, pass, name, host, port)
    config, err := pgxpool.ParseConfig(dsn)
    if err != nil {
        return pool, err
    }
    config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
        // set the tenant id into this connection's setting
        tenantID := ctx.Value("tenant_id").(string)
        _, err := conn.Exec(ctx, "select set_tenant($1)", tenantID)
        if err != nil {
            panic(err) // or better to log the error, and then `return false` to destroy this connection instead of leaving it open.
        }
        return true
    }

    config.AfterRelease = func(conn *pgx.Conn) bool {
        // set the setting to be empty before this connection is released to pool
        _, err := conn.Exec(context.Background(), "select set_tenant($1)", "")
        if err != nil {
            panic(err) // or better to log the error, and then`return false` to destroy this connection instead of leaving it open.
        }
        return true
    }
    config.MaxConns = int32(20)
    config.MaxConnLifetime = time.Minute
    config.MaxConnIdleTime = time.Minute
    pool, err = pgxpool.ConnectConfig(context.Background(), config)
    return pool, err
}
  1. set tenant_id into the request context, and pass the request context.Context to pgxpool request. This code sample here uses "github.com/georgysavva/scany/pgxscan", and it tries to return the products.
        // this step usually is set in a http request middleware
        ctx = context.WithValue(ctx, "tenant_id", tenantID)

       s.pool = pool // pool is set up in step 3. 

    var (
        total    int64
        products []entity.Product
    )
        // use the pool 
    err = pgxscan.Get(ctx, s.pool, &total, `select count(*) from products`)
    if err != nil {
        return list, err
    }
    if err = pgxscan.Select(ctx, s.pool, &products, `select * from products`); err != nil {
        return list, err
    }

This works to get each tenant's products by default. No need to use transaction anymore. Transaction works too, but it needs to set up transaction every time a query runs.

There seems to be one bug though https://github.com/jackc/pgx/pull/1070.

Overall, I think this solution works better than https://tip.golang.org/pkg/database/sql/#DB.Conn too.

Both single transaction or single connection in a http request life cycle (DB.Conn) like what https://aws.amazon.com/blogs/database/multi-tenant-data-isolation-with-postgresql-row-level-security/ suggests is making the connection life time much longer than what it really needs.

For example,

endpointRequest() {
  // a. start a new single connection (or a new transaction)
  // b. defer return/release this connection back to pool (or rollback this transaction)

  // c. do something unrelated with db, like sending grpc request
  // d. do something with the db

  // e. do something unrelated with db, like connecting to third party service

  // f. do something with the db

  // g. return something
}

If Step c or Step e takes pretty long time like 1 second, then this request is holding one db connection useless for 1 second, which is pretty bad ~

johanbrandhorst commented 3 years ago

Really nice writeup @smiletrl!

trentmurray commented 1 year ago

@jackc awesome job! This new pgxpool works indeed!

I have something like this to work:

  1. Enable row level security for table products, this table has a column tenant_id.
ALTER TABLE products ENABLE ROW LEVEL SECURITY;

CREATE POLICY product_isolation_policy ON products
USING (tenant_id = current_setting('app.current_tenant'));
  1. Create a function to set dynamic configuration like:
CREATE OR REPLACE FUNCTION set_tenant(tenant_id text) RETURNS void AS $$
BEGIN
    PERFORM set_config('app.current_tenant', tenant_id, false);
END;
$$ LANGUAGE plpgsql;
  1. Initialize the pgxpool like below when app starts. Pool only needs to be initialized once.
func NewPool(cfg config.PostgresqlConfig) (pool *pgxpool.Pool, err error) {
  name := cfg.Name
  user := cfg.TenantUser
  pass := cfg.TenantPassword
  host := cfg.Host
  port := cfg.Port

  dsn := fmt.Sprintf("user=%s password=%s dbname=%s host=%s port=%s sslmode=disable",
      user, pass, name, host, port)
  config, err := pgxpool.ParseConfig(dsn)
  if err != nil {
      return pool, err
  }
  config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
      // set the tenant id into this connection's setting
      tenantID := ctx.Value("tenant_id").(string)
      _, err := conn.Exec(ctx, "select set_tenant($1)", tenantID)
      if err != nil {
          panic(err) // or better to log the error, and then `return false` to destroy this connection instead of leaving it open.
      }
      return true
  }

  config.AfterRelease = func(conn *pgx.Conn) bool {
      // set the setting to be empty before this connection is released to pool
      _, err := conn.Exec(context.Background(), "select set_tenant($1)", "")
      if err != nil {
          panic(err) // or better to log the error, and then`return false` to destroy this connection instead of leaving it open.
      }
      return true
  }
  config.MaxConns = int32(20)
  config.MaxConnLifetime = time.Minute
  config.MaxConnIdleTime = time.Minute
  pool, err = pgxpool.ConnectConfig(context.Background(), config)
  return pool, err
}
  1. set tenant_id into the request context, and pass the request context.Context to pgxpool request. This code sample here uses "github.com/georgysavva/scany/pgxscan", and it tries to return the products.
        // this step usually is set in a http request middleware
        ctx = context.WithValue(ctx, "tenant_id", tenantID)

       s.pool = pool // pool is set up in step 3. 

  var (
      total    int64
      products []entity.Product
  )
        // use the pool 
  err = pgxscan.Get(ctx, s.pool, &total, `select count(*) from products`)
  if err != nil {
      return list, err
  }
  if err = pgxscan.Select(ctx, s.pool, &products, `select * from products`); err != nil {
      return list, err
  }

This works to get each tenant's products by default. No need to use transaction anymore. Transaction works too, but it needs to set up transaction every time a query runs.

There seems to be one bug though #1070.

Overall, I think this solution works better than https://tip.golang.org/pkg/database/sql/#DB.Conn too.

Both single transaction or single connection in a http request life cycle (DB.Conn) like what https://aws.amazon.com/blogs/database/multi-tenant-data-isolation-with-postgresql-row-level-security/ suggests is making the connection life time much longer than what it really needs.

For example,

endpointRequest() {
  // a. start a new single connection (or a new transaction)
  // b. defer return/release this connection back to pool (or rollback this transaction)

  // c. do something unrelated with db, like sending grpc request
  // d. do something with the db

  // e. do something unrelated with db, like connecting to third party service

  // f. do something with the db

  // g. return something
}

If Step c or Step e takes pretty long time like 1 second, then this request is holding one db connection useless for 1 second, which is pretty bad ~

I know this is quite old, but I've tried to get this working but for some reason, the BeforeAcquire and AfterRelease run immediately after each other, and then my query runs? Any ideas?

package db

import (
    "context"
    "fmt"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
    "log"
    "os"
    "time"
)

func NewPool() (pool *pgxpool.Pool, err error) {
    dsn := os.Getenv("POSTGRESQL_URL")
    config, err := pgxpool.ParseConfig(dsn)

    if err != nil {
        return pool, err
    }

    config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
        // set the member id into this connection's setting
        memberId := ctx.Value("member_id").(string)
        _, err := conn.Exec(ctx, "select sp_set_member($1)", memberId)

        if err != nil {
            log.Fatal(err)
            return false
        } else {
            fmt.Println("Set session to memberId: " + memberId)
        }

        return true
    }

    config.AfterRelease = func(conn *pgx.Conn) bool {
        // set the setting to be empty before this connection is released to pool
        _, err := conn.Exec(context.Background(), "select sp_set_member($1)", "")

        if err != nil {
            log.Fatal(err)
            return false
        } else {
            fmt.Println("Cleared the member id")
        }

        return true
    }

    config.MaxConns = int32(20)
    config.MaxConnLifetime = time.Minute
    config.MaxConnIdleTime = time.Minute

    pool, err = pgxpool.NewWithConfig(context.Background(), config)
    return pool, err
}
r.Get("/test-sql", func(w http.ResponseWriter, r *http.Request) {

            pool, err := db.NewPool()

            if err != nil {
                fmt.Println(err)
                panic(err)
            }

            rows, err := pool.Query(r.Context(), "SELECT uuid, name, owner_uuid FROM businesses")
            if err != nil {
                log.Fatal(err)
            }

            defer rows.Close()

            var rowSlice []Row
            for rows.Next() {
                var r Row

                err := rows.Scan(&r.UUID, &r.Name, &r.OwnerUUID)
                if err != nil {
                    log.Fatal(err)
                }
                rowSlice = append(rowSlice, r)
            }
            if err := rows.Err(); err != nil {
                log.Fatal(err)
            }

            fmt.Println(rowSlice)
            render.JSON(w, r, "ok sql")
        })

I use go-chi - not that it matters too much... I really can't figure this out.

EDIT Figured it out... The pool needed to be created before the request context of the go-chi router.