riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.51k stars 94 forks source link

Background worker always uses public schema #531

Closed arp242 closed 2 months ago

arp242 commented 2 months ago

It seems that the background worker doesn't work when setting a different schema, and it always uses the public schema. I saw there was a fix for this recently (#505), and the migration does run correctly. After running the code below I have:

=# select table_catalog, table_schema, table_name from information_schema.tables
      where table_schema in ('public', 'myschema');
table_catalog │ table_schema │     table_name
──────────────┼──────────────┼────────────────────
rt            │ myschema     │ river_queue
rt            │ myschema     │ river_leader
rt            │ myschema     │ river_migration
rt            │ myschema     │ river_job
rt            │ myschema     │ river_client
rt            │ myschema     │ river_client_queue

Which is expected, because that's what I set the search_path to. But actually running jobs doesn't work, and errors out with:

ERROR: relation "river_queue" does not exist (SQLSTATE 42P01)

It does seem to insert the jobs correctly:

=# select state, created_at from myschema.river_job;
  state   │          created_at
──────────┼───────────────────────────────
available │ 2024-08-16 10:08:37.859183+00
available │ 2024-08-16 10:12:41.994965+00
available │ 2024-08-16 10:12:46.233646+00
available │ 2024-08-16 10:16:16.473273+00

But the rc.Start() background worker uses the wrong schema. If I manually create the tables on the public schema then it doesn't error out (but also doesn't do anything, because there are no jobs).

All of this is with 0.11.2 and ee3e5b37.


Code to reproduce:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    _ "github.com/jackc/pgx/v5/stdlib"
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/riverdriver/riverdatabasesql"
    "github.com/riverqueue/river/rivermigrate"
)

func f(err error) {
    if err != nil {
        panic(err)
    }
}

func main() {
    db, err := sql.Open("pgx", "postgresql://aleph:aleph@localhost:5432/rt")
    f(err)
    f(db.Ping())

    _, err = db.Exec("create schema if not exists myschema")
    f(err)
    _, err = db.Exec("set search_path to myschema")
    f(err)

    _, err = rivermigrate.New(riverdatabasesql.New(db), nil).
        Migrate(context.Background(), rivermigrate.DirectionUp, nil)
    f(err)

    workers := river.NewWorkers()
    river.AddWorker(workers, &testWorker{})

    rc, err := river.NewClient(riverdatabasesql.New(db), &river.Config{
        Workers: workers,
        Queues:  map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 100}},
        // Logger: slog.New(slog_align.NewAlignedHandler(os.Stdout, &slog.HandlerOptions{ })),
    })
    f(err)

    go func() {
        f(rc.Start(context.Background()))
    }()

    _, err = rc.Insert(context.Background(), testWorkerArgs{}, nil)
    f(err)

    time.Sleep(time.Second)
    f(rc.Stop(context.Background()))
}

type (
    testWorker struct {
        river.WorkerDefaults[testWorkerArgs]
    }
    testWorkerArgs struct {
        Str string
    }
)

func (testWorkerArgs) Kind() string { return "test_worker" }

func (w *testWorker) Work(ctx context.Context, job *river.Job[testWorkerArgs]) error {
    fmt.Println("TEST WORKER:", job)
    return nil
}

With go.mod:

module rt

go 1.23.0

require (
    github.com/jackc/pgx/v5 v5.6.0
    github.com/riverqueue/river v0.11.2
    github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.2
)

require (
    github.com/davecgh/go-spew v1.1.1 // indirect
    github.com/jackc/pgpassfile v1.0.0 // indirect
    github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
    github.com/jackc/puddle/v2 v2.2.1 // indirect
    github.com/lib/pq v1.10.9 // indirect
    github.com/pmezard/go-difflib v1.0.0 // indirect
    github.com/riverqueue/river/riverdriver v0.11.2 // indirect
    github.com/riverqueue/river/rivershared v0.11.2 // indirect
    github.com/riverqueue/river/rivertype v0.11.2 // indirect
    github.com/stretchr/testify v1.9.0 // indirect
    go.uber.org/goleak v1.3.0 // indirect
    golang.org/x/crypto v0.17.0 // indirect
    golang.org/x/sync v0.8.0 // indirect
    golang.org/x/text v0.16.0 // indirect
    gopkg.in/yaml.v3 v3.0.1 // indirect
)
brandur commented 2 months ago

@arp242 The db you get with sql.Open is a connection pool. When you do this:

_, err = db.Exec("set search_path to myschema")
f(err)

You're just setting the search path on a connection that may or may not still be around later on. It's not sticky, and no other connections procured will have the setting.

What you want to do is set search_path as a connection parameter. Here's a working version of your repro code:

diff --git a/main.go b/main.go
index 75f48a8..646536a 100644
--- a/main.go
+++ b/main.go
@@ -19,14 +19,14 @@ func f(err error) {
 }

 func main() {
-   db, err := sql.Open("pgx", "postgresql:///river-schema-test")
+   db, err := sql.Open("pgx", "postgresql:///river-schema-test?search_path=myschema")
    f(err)
    f(db.Ping())

-   _, err = db.Exec("create schema if not exists myschema")
-   f(err)
-   _, err = db.Exec("set search_path to myschema")
-   f(err)
+   // _, err = db.Exec("create schema if not exists myschema")
+   // f(err)
+   // _, err = db.Exec("set search_path to myschema")
+   // f(err)

    _, err = rivermigrate.New(riverdatabasesql.New(db), nil).
        Migrate(context.Background(), rivermigrate.DirectionUp, nil)

Output:

$ go run main.go
TEST WORKER: &{0x14000126b40 {}}
TEST WORKER: &{0x14000126c60 {}}