cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
29.74k stars 3.75k forks source link

`SELECT FOR UPDATE SKIP LOCKED` skips unlocked rows sometimes? #121917

Open asg0451 opened 4 months ago

asg0451 commented 4 months ago

Describe the problem

I'm working on a job queue system backed by CRDB and I've run into a situation I don't understand. I'm hoping someone can shed some light on it for me.

To Reproduce

  1. Set up a CRDB Serverless cluster.
  2. Clone the repro code in this repo.
  3. Run with DATABASE_URL="<db conn str>" NUM_WORKERS=2 go run .

The program sets up a jobs table (see schema.sql), seeds it with a few jobs, then runs some workers that poll for work using SELECT FOR UPDATE SKIP LOCKED here

Expected behavior

I expect each worker to grab a job, "process" it, then grab a new one, until no jobs remain. However, observing the log output, you can see that frequently workers fail to find jobs. In fact, It seems like after the first job each gets, it fails to see a new job while the other is working one one. Why is this?

Example run (annotated):

time=2024-04-08T00:29:46.725-04:00 level=DEBUG msg="running schema.sql"
time=2024-04-08T00:29:47.175-04:00 level=DEBUG msg="resetting jobs"
time=2024-04-08T00:29:48.629-04:00 level=INFO msg="job found" worker=1 job="{String:a Valid:true}"
time=2024-04-08T00:29:48.673-04:00 level=INFO msg="job found" worker=0 job="{String:b Valid:true}" // See here they both got jobs
time=2024-04-08T00:29:53.888-04:00 level=INFO msg="job finished" worker=0 job="{ID:{Bytes:[42 4 203 147 205 42 74 144 146 202 156 73 71 160 90 207] Valid:true} Name:{String:b Valid:true} CreatedAt:{Time:2024-04-08 04:29:47.29262 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-08 04:29:48.443138 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 0 Valid:true}}"
time=2024-04-08T00:29:53.888-04:00 level=INFO msg="job finished" worker=1 job="{ID:{Bytes:[24 96 114 86 121 83 76 129 168 179 161 17 215 75 124 18] Valid:true} Name:{String:a Valid:true} CreatedAt:{Time:2024-04-08 04:29:47.29262 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-08 04:29:48.422372 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 1 Valid:true}}"
time=2024-04-08T00:29:54.371-04:00 level=INFO msg="no job found" worker=0  // but now worker 0 can't find any while 1 is working on its
time=2024-04-08T00:29:54.371-04:00 level=INFO msg="job found" worker=1 job="{String:d Valid:true}"
time=2024-04-08T00:29:54.848-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:55.349-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:55.800-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:56.251-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:56.739-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:57.176-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:57.656-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:58.117-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:58.586-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:59.029-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:29:59.475-04:00 level=INFO msg="job finished" worker=1 job="{ID:{Bytes:[98 32 63 71 154 255 64 96 191 82 227 157 177 117 4 16] Valid:true} Name:{String:d Valid:true} CreatedAt:{Time:2024-04-08 04:29:47.29262 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-08 04:29:54.199079 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 1 Valid:true}}"
time=2024-04-08T00:29:59.484-04:00 level=INFO msg="no job found" worker=0 // ditto
time=2024-04-08T00:29:59.938-04:00 level=INFO msg="job found" worker=1 job="{String:c Valid:true}"
time=2024-04-08T00:29:59.939-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:00.412-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:00.875-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:01.346-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:01.799-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:02.235-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:02.745-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:03.181-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:03.655-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:04.124-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:04.538-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:04.969-04:00 level=INFO msg="no job found" worker=0
time=2024-04-08T00:30:05.029-04:00 level=INFO msg="job finished" worker=1 job="{ID:{Bytes:[143 160 240 141 130 52 77 159 151 19 223 198 14 146 153 33] Valid:true} Name:{String:c Valid:true} CreatedAt:{Time:2024-04-08 04:29:47.29262 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-08 04:29:59.789133 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 1 Valid:true}}"
time=2024-04-08T00:30:05.400-04:00 level=INFO msg="job found" worker=0 job="{String:f Valid:true}" // worker 0 wins the race this time and now worker 1 can't find a job!
time=2024-04-08T00:30:05.431-04:00 level=INFO msg="no job found" worker=1
time=2024-04-08T00:30:05.850-04:00 level=INFO msg="no job found" worker=1
time=2024-04-08T00:30:06.246-04:00 level=INFO msg="no job found" worker=1
time=2024-04-08T00:30:06.655-04:00 level=INFO msg="no job found" worker=1
(ctrl-c for brevity)

Contrast this with postgres's behaviour under the same code:

time=2024-04-08T02:42:40.602-04:00 level=DEBUG msg="running schema.sql"
time=2024-04-08T02:42:41.026-04:00 level=DEBUG msg="resetting jobs"
time=2024-04-08T02:42:42.392-04:00 level=INFO msg="job found" worker=2 job="{String:a Valid:true}"
time=2024-04-08T02:42:42.392-04:00 level=INFO msg="job found" worker=0 job="{String:c Valid:true}"
time=2024-04-08T02:42:42.392-04:00 level=INFO msg="job found" worker=1 job="{String:b Valid:true}"
time=2024-04-08T02:42:47.547-04:00 level=INFO msg="job finished" worker=2 job="{ID:{Bytes:[123 153 228 234 250 118 67 31 188 172 62 47 39 114 235 127] Valid:true} Name:{String:a Valid:true} CreatedAt:{Time:2024-04-07 23:42:41.218606 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-07 23:42:42.258571 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 2 Valid:true}}"
time=2024-04-08T02:42:47.547-04:00 level=INFO msg="job finished" worker=0 job="{ID:{Bytes:[55 177 105 201 234 84 68 211 134 237 144 62 45 182 176 121] Valid:true} Name:{String:c Valid:true} CreatedAt:{Time:2024-04-07 23:42:41.218606 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-07 23:42:42.258597 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 0 Valid:true}}"
time=2024-04-08T02:42:47.620-04:00 level=INFO msg="job finished" worker=1 job="{ID:{Bytes:[217 121 73 159 166 146 79 234 140 118 76 52 225 118 36 173] Valid:true} Name:{String:b Valid:true} CreatedAt:{Time:2024-04-07 23:42:41.218606 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-07 23:42:42.258517 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 1 Valid:true}}"
time=2024-04-08T02:42:47.982-04:00 level=INFO msg="job found" worker=0 job="{String:d Valid:true}"
time=2024-04-08T02:42:47.987-04:00 level=INFO msg="job found" worker=2 job="{String:e Valid:true}"
time=2024-04-08T02:42:48.034-04:00 level=INFO msg="job found" worker=1 job="{String:f Valid:true}"
time=2024-04-08T02:42:53.091-04:00 level=INFO msg="job finished" worker=0 job="{ID:{Bytes:[174 45 184 28 42 93 76 98 145 129 73 191 190 117 226 255] Valid:true} Name:{String:d Valid:true} CreatedAt:{Time:2024-04-07 23:42:41.218606 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-07 23:42:47.938799 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 0 Valid:true}}"
time=2024-04-08T02:42:53.091-04:00 level=INFO msg="job finished" worker=2 job="{ID:{Bytes:[76 229 28 127 149 214 68 210 174 220 73 184 30 206 11 105] Valid:true} Name:{String:e Valid:true} CreatedAt:{Time:2024-04-07 23:42:41.218606 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-07 23:42:47.935604 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 2 Valid:true}}"
time=2024-04-08T02:42:53.111-04:00 level=INFO msg="job finished" worker=1 job="{ID:{Bytes:[151 115 250 132 130 210 75 164 130 102 233 183 252 28 183 50] Valid:true} Name:{String:f Valid:true} CreatedAt:{Time:2024-04-07 23:42:41.218606 +0000 UTC InfinityModifier:finite Valid:true} CompletedAt:{Time:2024-04-07 23:42:48.009649 +0000 UTC InfinityModifier:finite Valid:true} CompletedBy:{String:cc/crdbtestjobs 1 Valid:true}}"
time=2024-04-08T02:42:53.540-04:00 level=INFO msg="no job found" worker=1
time=2024-04-08T02:42:53.540-04:00 level=INFO msg="no job found" worker=2
time=2024-04-08T02:42:53.551-04:00 level=INFO msg="no job found" worker=0
(they actually did them all)

As you can see, postgres does not have this issue at all.

Additional data / screenshots Everything is in the linked repo

Environment:

Jira issue: CRDB-37619

EDIT: All of a sudden this stopped happening for NUM_WORKERS=2 -- both of them reliably got jobs. Running with NUM_WORKERS=3 exhibits the issue again. Idk what changed.

blathers-crl[bot] commented 4 months ago

Hello, I am Blathers. I am here to help you get the issue triaged.

Hoot - a bug! Though bugs are the bane of my existence, rest assured the wretched thing will get the best of care here.

I have CC'd a few people who may be able to assist you:

If we have not gotten back to your issue within a few business days, you can try the following:

:owl: Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

asg0451 commented 4 months ago

I was able to reproduce this on a local crdb node (with start-single-node) with NUM_WORKERS=10. Still confused by this behaviour. Anyone have an idea?

michae2 commented 4 months ago

@asg0451 are you using read committed isolation? If so, this might be because of https://github.com/cockroachdb/cockroach/issues/115057 which is fixed in v24.1. It might be worth trying with v24.1.0-alpha.5 to see if this works correctly there.

asg0451 commented 4 months ago

hey @michae2 , thanks for the reply. It's definitely reproducible on SERIALIZABLE and iirc crdb might not have let me use SKIP LOCKED on READ COMMITTED at all, though i may be misremembering that.

michae2 commented 4 months ago

@asg0451 if you're using serializable, it would be worth trying again with optimizer_use_lock_op_for_serializable and enable_durable_locking_for_serializable turned on. One way to do that for all connections using a specific role / user is:

ALTER ROLE <rolename> SET optimizer_use_lock_op_for_serializable = on;
ALTER ROLE <rolename> SET enable_durable_locking_for_serializable = on;

In v23.2 there is a new implementation of SELECT FOR UPDATE that fixes some problems, which is only enabled for read committed isolation. Turning on these settings enables it for serializable isolation as well.

asg0451 commented 4 months ago

Hey @michae2 , I can confirm that the issue is still present with read committed isolation level.

Trying those settings results in this error when running the query: ERROR: usage of replicated locks in conjunction with skip locked wait policy is currently unsupported (SQLSTATE 0A000)"

build: CCL v23.2.4 @ 2024/04/08 21:52:09 (go1.21.8 X:nocoverageredesign)

ajwerner commented 3 months ago

I met Miles (@asg0451) this morning, and I happen to have a debugging tool I've been pointing at cockroach. I noticed when I captured a snapshot that the initial scan was not using skip locked and was using a hard limit. That lead me to taking a statement bundle which I'll paste here. I think it's just a straight-up bug in how cockroach is applying SKIP LOCKED in the context of index joins.

The statement is:

SELECT
  id, name, created_at, completed_at, completed_by
FROM
  jobs
WHERE
  completed_at IS NULL
LIMIT
  1
FOR UPDATE SKIP LOCKED

The schema is:

CREATE TABLE defaultdb.public.jobs (
    id UUID NOT NULL DEFAULT gen_random_uuid(),
    name STRING NULL,
    created_at TIMESTAMP NOT NULL DEFAULT current_timestamp():::TIMESTAMP,
    completed_at TIMESTAMP NULL,
    completed_by STRING NULL,
    CONSTRAINT jobs_pkey PRIMARY KEY (id ASC),
    INDEX pending_jobs_idx (id ASC, name ASC, created_at ASC, completed_at ASC) WHERE completed_at IS NULL
);

The plan ends up being:

• index join
│ nodes: n1
│ regions: local
│ actual row count: 0
│ KV time: 414µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 1
│ estimated max memory allocated: 20 KiB
│ estimated max sql temp disk usage: 0 B
│ estimated row count: 1
│ table: jobs@jobs_pkey
│ locking strength: for update
│ locking wait policy: skip locked
│ locking durability: guaranteed
│
└── • scan
      nodes: n1
      regions: local
      actual row count: 1
      KV time: 2ms
      KV contention time: 0µs
      KV rows decoded: 1
      KV bytes read: 67 B
      KV gRPC calls: 1
      estimated max memory allocated: 20 KiB
      estimated row count: 1 (0.99% of the table; stats collected 8 minutes ago; using stats forecast for 2 minutes ago)
      table: jobs@pending_jobs_idx (partial index)
      spans: LIMITED SCAN
      limit: 1

A problem with this plan is that the index read does not use SKIP LOCKED, so it'll return the first row in that index. Then, in the index join we'll scan the primary index but we will use locking wait policy: skip locked. The end result is that the primary key lookup will return zero rows. I feel like there's two problems:

1) The limit being pushed down to the index scan probably needs to be a soft limit -- though I can see that one being controversial in its own right because it'll expand the contention footprint of the scan. The join reader should be the one enforcing a hard limit of 1. 2) The read from the index should use skip locked

michae2 commented 3 months ago

Great point @ajwerner! Looks like the plan is correct with optimizer_use_lock_op_for_serializable = off:

  • limit
  │ count: 1
  │
  └── • index join
      │ table: jobs@jobs_pkey
      │ locking strength: for update
      │ locking wait policy: skip locked
      │
      └── • scan
            missing stats
            table: jobs@pending_jobs_idx (partial index)
            spans: FULL SCAN (SOFT LIMIT)
            locking strength: for update
            locking wait policy: skip locked

and incorrect with optimizer_use_lock_op_for_serializable = on:

  • index join
  │ table: jobs@jobs_pkey
  │ locking strength: for update
  │ locking wait policy: skip locked
  │
  └── • scan
        missing stats
        table: jobs@pending_jobs_idx (partial index)
        spans: LIMITED SCAN
        limit: 1
ajwerner commented 3 months ago

One more note is that the bad plan also happens with read committed without those session variables.

ajwerner commented 3 months ago

I think the plan is also wrong even without the index join. Consider the below plan. I think the scan at the bottom needs to have a SKIP LOCKED (otherwise it could block, no?). Then having the limit before the locking lookup join means that you again can fail to find the row because it is indeed locked. I don't know enough to know whether this is the same problem.

planning time: 158µs
execution time: 1ms
distribution: local
vectorized: true
rows decoded from KV: 100 (6.5 KiB, 2 gRPC calls)
cumulative time spent in KV: 1ms
maximum memory usage: 70 KiB
network usage: 0 B (0 messages)
regions: local
isolation level: read committed
priority: normal
quality of service: regular

• lookup join (semi)
│ nodes: n1
│ regions: local
│ actual row count: 0
│ KV time: 510µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 1
│ estimated max memory allocated: 20 KiB
│ table: jobs@jobs_pkey
│ equality: (id) = (id)
│ equality cols are key
│ locking strength: for update
│ locking wait policy: skip locked
│ locking durability: guaranteed
│
└── • limit
    │ count: 1
    │
    └── • filter
        │ nodes: n1
        │ regions: local
        │ actual row count: 1
        │ filter: completed_at IS NULL
        │
        └── • scan
              nodes: n1
              regions: local
              actual row count: 100
              KV time: 633µs
              KV contention time: 0µs
              KV rows decoded: 100
              KV bytes read: 6.5 KiB
              KV gRPC calls: 1
              estimated max memory allocated: 30 KiB
              missing stats
              table: jobs@jobs_pkey
              spans: FULL SCAN (SOFT LIMIT)
michae2 commented 3 months ago

Under read committed (or with optimizer_use_lock_op_for_serializable = on) we're performing unlocked initial scans, and then only locking the PK. So I don't think this could block during the initial scan, since it's an unlocked read. But I agree that the limit there is wrong, it needs to be above the locking lookup join.

We could do a skip-locked locking read during the initial scan, to avoid any optimizer_use_lock_op_for_serializable = off transactions that have locked the secondary index. I think we were trying to avoid locking all indexes, but it could help with contention in some cases.

ajwerner commented 3 months ago

Under read committed (or with optimizer_use_lock_op_for_serializable = on) we're performing unlocked initial scans, and then only locking the PK.

I don't think I have the mental model down for what happens when a READ COMMITTED unlocked scan arrives at an intent. I was thinking with my old serializable hat when I wrote the above comment. I guess the answer is that it pushes the lock holders timestamp above its read time and moves on with its life. So yeah, in that case it is just the limit being below the locking read that's the bug.

I think we were trying to avoid locking all indexes, but it could help with contention in some cases.

Interesting.

asg0451 commented 3 months ago

Thanks for taking a look guys! This was one of the first things i tried to build with CRDB when exploring it, so it was sad when it didn't work. Much of the above discussion is a bit above my head but I'll revisit it as I learn more about the system.

mgartner commented 1 month ago

To summarize the above:

  1. There is a bug in the locking of SKIP LOCKED when using serializable isolation: https://github.com/cockroachdb/cockroach/issues/121917#issuecomment-2118042827
  2. There may be a bug where a LIMIT can be incorrectly pushed below a lookup-join: https://github.com/cockroachdb/cockroach/issues/121917#issuecomment-2118101146
    • I'm not convinced there is a bug here. I'd need to see the query used to generate the query plan to confirm.
ajwerner commented 1 month ago

I'd need to see the query used to generate the query plan to confirm.

The query and the schema are in the comment above (https://github.com/cockroachdb/cockroach/issues/121917#issuecomment-2118022756).

michae2 commented 1 month ago

We worked on this during the collab session, PR coming up!

mgartner commented 1 month ago

I'd need to see the query used to generate the query plan to confirm.

The query and the schema are in the comment above (https://github.com/cockroachdb/cockroach/issues/121917#issuecomment-2118022756).

I was referring to the query that leads to the lookup semi-join. Looks like this does it:

CREATE TABLE defaultdb.public.jobs (
  id UUID NOT NULL DEFAULT gen_random_uuid(),
  name STRING NULL,
  created_at TIMESTAMP NOT NULL DEFAULT current_timestamp():::TIMESTAMP,
  completed_at TIMESTAMP NULL,
  completed_by STRING NULL,
  CONSTRAINT jobs_pkey PRIMARY KEY (id ASC),
  INDEX pending_jobs_idx (id ASC, name ASC, created_at ASC, completed_at ASC) WHERE completed_at IS NULL
);

SET optimizer_use_lock_op_for_serializable=on;
SET enable_durable_locking_for_serializable=on;

EXPLAIN
SELECT
  id
FROM
  jobs
WHERE
  completed_at IS NULL
LIMIT
  1
FOR UPDATE SKIP LOCKED;
--                            info
-- ------------------------------------------------------
--   distribution: local
--   vectorized: true
-- 
--   • lookup join (semi)
--   │ table: jobs@jobs_pkey
--   │ equality: (id) = (id)
--   │ equality cols are key
--   │ locking strength: for update
--   │ locking wait policy: skip locked
--   │ locking durability: guaranteed
--   │
--   └── • scan
--         missing stats
--         table: jobs@pending_jobs_idx (partial index)
--         spans: LIMITED SCAN
--         limit: 1
-- (16 rows)
asg0451 commented 2 weeks ago

This still appears to be broken using my original repro script.

tested on master @ 7ea67ccd150cac6fca8f464ba417876ecce007ff

my repro script is in the original issue body up there ^ but i'd be happy to demonstrate the problem personally if anyone would like

michae2 commented 2 weeks ago

Thanks for trying it out @asg0451! I will try your repro script this afternoon.

asg0451 commented 2 weeks ago

sounds good. I set up a local single node cluster and ran NUM_WORKERS=10 DATABASE_URL=postgresql://..... go run main.go

lmk if you have any questions

michae2 commented 1 week ago

TLDR: I think changing some cfetcher batch size logic will make a big difference.


Finally got a chance to play with this using the tip of master (37a37b00e6a4cf4f2b5bd1d416a3358c0f736eb1). For reference, the worker queries are as follows (correct me if this is wrong, @asg0451):

CREATE TABLE public.jobs (
    id UUID NOT NULL DEFAULT gen_random_uuid(),
    name STRING NULL,
    created_at TIMESTAMP NOT NULL DEFAULT current_timestamp():::TIMESTAMP,
    completed_at TIMESTAMP NULL,
    completed_by STRING NULL,
    CONSTRAINT jobs_pkey PRIMARY KEY (id ASC),
    INDEX pending_jobs_idx (id ASC, name ASC, created_at ASC, completed_at ASC) WHERE completed_at IS NULL
);

BEGIN;
-- GetJob
SELECT id, name, created_at, completed_at, completed_by FROM jobs where completed_at IS NULL limit 1 for update skip locked;
-- FinishJob
UPDATE jobs SET completed_at = CURRENT_TIMESTAMP, completed_by = $2 WHERE id = $1 RETURNING id, name, created_at, completed_at, completed_by;
COMMIT;

Behavior is a little different for the three modes this can run in (serializable, serializable with lock op, and read committed). In all three modes the script appears to make progress, but sometimes it is very slow progress depending on the plan used for GetJob.

For all three modes, the FinishJob plan is exactly the same, and locks a single row in the primary index:

  distribution: local
  vectorized: true

  • update
  │ estimated row count: 1
  │ table: jobs
  │ set: completed_at, completed_by
  │
  └── • render
      │
      └── • scan
            estimated row count: 1 (0.99% of the table; stats collected 3 minutes ago)
            table: jobs@jobs_pkey
            spans: [/'ff7da26e-5089-4a23-bdc4-dc19031c1434' - /'ff7da26e-5089-4a23-bdc4-dc19031c1434']
            locking strength: for update

Here's what I believe is happening with the GetJob plan in each of the three modes. (Much depends on the cfetcher's batching behavior when it has a soft limit, which controls the number of rows read (and locked) by a scan in a single batch request.)

SERIALIZABLE

The GetJob plan starts out using pending_jobs_idx:

  distribution: local
  vectorized: true

  • limit
  │ count: 1
  │
  └── • index join
      │ table: jobs@jobs_pkey
      │ locking strength: for update
      │ locking wait policy: skip locked
      │
      └── • scan
            missing stats
            table: jobs@pending_jobs_idx (partial index)
            spans: FULL SCAN (SOFT LIMIT)
            locking strength: for update
            locking wait policy: skip locked

and then after a minute (after a stats collection) the plan switches to using jobs_pkey with a filter:

  distribution: local
  vectorized: true

  • limit
  │ count: 1
  │
  └── • filter
      │ estimated row count: 85
      │ filter: completed_at IS NULL
      │
      └── • scan
            estimated row count: 2 - 101 (100% of the table; stats collected 7 seconds ago)
            table: jobs@jobs_pkey
            spans: FULL SCAN (SOFT LIMIT)
            locking strength: for update
            locking wait policy: skip locked

All workers make progress when using the first plan, which usually only reads and locks a single row in both indexes.

When using the second plan, throughput drops to just a single worker making progress. All other workers consistently get 0 rows for GetJob. This is because the second plan locks all 101 rows in the primary index, due to (a) locking before filtering out completed rows, which means (b) we can't satisfy the query after the initial 1-KV batch of the scan, which means (c) we continue the scan with a second, larger batch which locks the rest of the rows in the primary index.

After a long time the script successfully finishes.

SERIALIZABLE, with optimizer_use_lock_op_for_serializable = on

Again, the plan starts out using pending_jobs_idx, but now we're only locking the primary index:

  distribution: local
  vectorized: true

  • limit
  │ count: 1
  │
  └── • index join
      │ table: jobs@jobs_pkey
      │ locking strength: for update
      │ locking wait policy: skip locked
      │
      └── • scan
            missing stats
            table: jobs@pending_jobs_idx (partial index)
            spans: FULL SCAN (SOFT LIMIT)
            locking wait policy: skip locked

Again, after stats collection, the plan switches to jobs_pkey with a filter, but now the lock is after the filter:

  distribution: local
  vectorized: true

  • limit
  │ count: 1
  │
  └── • lookup join (semi)
      │ estimated row count: 100
      │ table: jobs@jobs_pkey
      │ equality: (id) = (id)
      │ equality cols are key
      │ locking strength: for update
      │ locking wait policy: skip locked
      │
      └── • filter
          │ estimated row count: 100
          │ filter: completed_at IS NULL
          │
          └── • scan
                estimated row count: 2 - 101 (100% of the table; stats collected 5 seconds ago)
                table: jobs@jobs_pkey
                spans: FULL SCAN (SOFT LIMIT)
                locking wait policy: skip locked

Progress is uneven when using the first plan. Usually one worker can make progress, occasionally two, and the others get 0 rows for GetJob. This is because the first execution of the plan satisfies it using the initial 1-KV batch of the scan, and thus only locks one row, and then the second execution cannot satisfy in the initial 1-KV batch and ends up locking the rest of the rows due to a larger second batch.

When using the second plan all workers make progress, except for occasional brief moments of all workers except one getting 0 rows for GetJob. This is because in the second plan we can usually satisfy the entire query in the initial 1-KV batch of the lookup join, unless two workers race to lock the same row during the lookup join. The losing worker will then read another batch of rows and will then lock all the remaining uncompleted rows.

Eventually, about 30 seconds after one of these moments, the script fails with a transaction timeout:

time=2024-08-13T15:15:37.502-07:00 level=ERROR msg="run error" err="errgroup.Wait: crdbpgx.ExecuteTx: queries.FinishJob: ERROR: query execution canceled due to transaction timeout (SQLSTATE 57014)"

I'm not sure why this is yet.

READ COMMITTED

Again, starts out using pending_jobs_idx, but with guaranteed locking durability:

  distribution: local
  vectorized: true

  • limit
  │ count: 1
  │
  └── • index join
      │ table: jobs@jobs_pkey
      │ locking strength: for update
      │ locking wait policy: skip locked
      │ locking durability: guaranteed
      │
      └── • scan
            missing stats
            table: jobs@pending_jobs_idx (partial index)
            spans: FULL SCAN (SOFT LIMIT)
            locking wait policy: skip locked

and again, after a stats collection, it switches to jobs_pkey with a filter, with locking after the filter:

  distribution: local
  vectorized: true

  • limit
  │ count: 1
  │
  └── • lookup join (semi)
      │ estimated row count: 83
      │ table: jobs@jobs_pkey
      │ equality: (id) = (id)
      │ equality cols are key
      │ locking strength: for update
      │ locking wait policy: skip locked
      │ locking durability: guaranteed
      │
      └── • filter
          │ estimated row count: 83
          │ filter: completed_at IS NULL
          │
          └── • scan
                estimated row count: 2 - 101 (100% of the table; stats collected 3 seconds ago)
                table: jobs@jobs_pkey
                spans: FULL SCAN (SOFT LIMIT)
                locking wait policy: skip locked

Again, progress is uneven when using the first plan. Usually one or two workers can make progress, and the others get 0 rows for GetJob. This is due to the same explanation as above.

When using the second plan all workers make progress. Unlike the serializable with lock op mode, however, under this mode the script successfully finishes. I think this probably has something to do with readers not waiting for writers under RC.


So I think a couple more fixes will help:

asg0451 commented 1 week ago

thanks @michae2, those queries look correct, as do your explanation of the symptoms. I just tried running the script with select .. from jobs@pending_jobs_idx .. and that works perfectly (under serializable), backing up what you said above.

(unfortunately since that's not valid postgres syntax i can't use this as a workaround in my program but anyway)

why does locking before/after the filter matter? naively i'd think we'd want it to lock after the limit so we only lock one row.

michae2 commented 1 week ago

why does locking before/after the filter matter?

Locking before the filter has two effects: (1) it causes us to lock extra rows that are rejected by the filter, and (2) because of the batching logic it causes us to not answer the query using the first 1-KV batch, requiring a second batch when then locks the rest of the rows in the table.

naively i'd think we'd want it to lock after the limit so we only lock one row.

This is what we do for normal SELECT FOR UPDATE. For SKIP LOCKED, however, we must check for skip locked before applying the limit, otherwise we might miss an unlocked row that could be returned.

We could do something like (lock (limit (skip-locked read))) but right now the locking and skip-locked read are the same operation, so it has to be (limit (lock skip-locked read)).

michae2 commented 1 week ago

I talked to @rytaft about this, and we think the proper way to improve this workload is to push the limit down into index join and lookup join, so that we use hard-limit behavior instead of soft-limit behavior.

Unfortunately, JoinReader doesn't yet support limits. But we need to do this for #128704 anyway. So after #128704 is fixed, we can come back to this and add a rule to push the limit into index join and lookup join.

asg0451 commented 1 week ago

thanks @michae2 . can you expand on this a bit? because of the batching logic it causes us to not answer the query using the first 1-KV batch, requiring a second batch when then locks the rest of the rows in the table. i havent dug into kv/sql internals enough to understand this quite yet. maybe you could point me at some resources?

michae2 commented 1 week ago

can you expand on this a bit? because of the batching logic it causes us to not answer the query using the first 1-KV batch, requiring a second batch when then locks the rest of the rows in the table. i havent dug into kv/sql internals enough to understand this quite yet. maybe you could point me at some resources?

Ah yes, I did not explain this well, let me try again. For a query plan like this (the second plan under serializable):

  distribution: local
  vectorized: true

  • limit
  │ count: 1
  │
  └── • filter
      │ estimated row count: 85
      │ filter: completed_at IS NULL
      │
      └── • scan
            estimated row count: 2 - 101 (100% of the table; stats collected 7 seconds ago)
            table: jobs@jobs_pkey
            spans: FULL SCAN (SOFT LIMIT)
            locking strength: for update
            locking wait policy: skip locked

the execution engine has to make some choices about how many rows to process with each operator at once. It could process a single row with each operator all the way up the plan (depth-first execution), or it could process all possible rows with the bottommost operator, then all possible rows with the next operator, etc (breadth-first execution). The former ensures we process the fewest number of rows, but has high overhead. The latter has the lowest overhead but also processes the max number of rows.

The strategy currently used by the execution engine is to process a batch of rows at once in each operator, in the hopes of balancing tradeoffs. The default batch size is 10k rows I think. But when there is a "soft limit", meaning a limit somewhere up above in the plan, the execution engine uses the soft limit as the size of the first batch. So for the locking scan, the execution engine first asks for 1 KV from kvserver, and passes this all the way up the plan. If the query has not finished after this first batch, the locking scan then asks for 10k KVs from kvserver.

asg0451 commented 1 week ago

I see, so because that first batch doesnt match the filter (completed_at is not null), the query isnt satisfied by that batch. then instead of trying one more, it tries 10k. and it is doing the locking at the lowest level so it locks the whole table (or up to 10k rows) as a result of the query.

thanks for the explanation!