kagkarlsson / db-scheduler

Persistent cluster-friendly scheduler for Java
Apache License 2.0
1.23k stars 188 forks source link

feat: Adding the ability to prioritize tasks #519

Open simeq opened 1 month ago

simeq commented 1 month ago

This change introduces the possibility of using a task prioritization mechanism. This mechanism is disabled by default, and can be enabled using the enablePrioritization() method.

Reminders


cc @kagkarlsson open to any suggestions and comments regarding tests that would be worth adding

kagkarlsson commented 1 month ago

Thanks for updating the PR 👍. Will try and find some time to have a look at it soon!

kagkarlsson commented 1 month ago

Did a quick look through. I can see most lines come from the opt-in toggle 😅. Good job realigning the PR with master. I need to go through it more thoroughly, but a couple of reactions:

Another thing. It would be great to see the numbers for how polling performs, specifically for postgres, how many buffers are read to satisfy the query, with and without priority order.

A test like:

Run the due-query in postgres with (analyze on, buffers on). Possibly with index-variations also (priority,execution_time) or (execution_time,priority).

Not sure if this is something you or someone else are up to, otherwise I need to run it through myself.

simeq commented 1 month ago

Thanks for looking into it!

In near days I will update the PR according to your suggestions, but got a question regarding the not null part.
Nowhere in the schema, priority is set to not null and I added:

Upgrading to 15.x

  • Add column priority and priority_execution_time_idx index to the database schema. See table definitions for postgresql, oracle or mysql. Note that when enablePrioritization() is used, the null value in order of prioritization is handled differently depending on the database used.

So the schema allows null values, but maybe make it just clearer that in this upgrade note we are talking about existing values?

About that testing of changes, didn't do int earlier, but I'm happy to try. If any problems occur for me, I will let you know.

Overall, it's a good exercise diving into this code, helped recently with understanding exactly how to improve performance 😄

kagkarlsson commented 1 month ago

In near days I will update the PR according to your suggestions, but got a question regarding the not null part. Nowhere in the schema, priority is set to not null and I added:

Ah, I didn't check all schemas, just assumed after I think I saw one. Checked now, and looks like mssql has not null, but as you say, none other has it.

GeorgEchterling commented 3 weeks ago

I think I read a discussion about the index usage with prioritization somewhere on this repo, but I can't find it. In case it's still relevant:

Have you considered splitting the "due task detection" from the picking step? I.e. something like this:

UPDATE scheduled_tasks
SET due = TRUE
WHERE NOT due
AND NOT picked
AND execution_time < NOW();

SELECT * FROM schedules_tasks
WHERE due
AND NOT picked
ORDER BY priority, execution_time;

Both queries could be optimized (even for arbitrary priority cardinality) using indices over (due, picked, execution_time) and (due, picked, priority, execution_time).

Also, this PR uses descending priorities. Older versions of MySQL/MariaDB don't support direction on index columns, which would prevent them from using the index when sorting by priority DESC, execution_time ASC. I'm not sure if that affects any other DBs.

simeq commented 3 weeks ago

I made some tests @kagkarlsson for the 10M executions due, wasn't certain what did you mean with "not due" executions, so happy to add them later 😄

_TL;DR priority desc, execution_time asc is the correct index, enabling prioritization causes a reduction in performance of about 15 percent._

I conducted tests on:

Scheduler was configured with lock-and-fetch:

I have scheduled 10M executions that were due, with random priority

And tested two types of indexes:

Results

Results for scheduler without prioritization count mean 1m rate 5m rate 15m rate
vm1 2521123 2194.22 2341.86 2267.74 2153.32
vm2 2483663 2161.62 2289.32 2228.68 2101.48
vm3 2500572 2176.34 2316.60 2246.62 2121.06
vm2 2494642 2171.17 2307.60 2242.47 2128.78
total 10000000 8703.35 9255.38 8985.51 8504.64
Results for prioritization with index priority desc, execution_time asc count mean 1m rate 5m rate 15m rate
vm1 2516239 1890.51 1972.18 1950.99 1923.13
vm2 2481189 1864.18 1951.05 1924.37 1898.35
vm3 2495400 1874.86 1951.23 1934.60 1896.35
vm2 2507172 1883.70 1968.52 1946.73 1921.82
total 10000000 7513.25 7842.98 7756.69 7639.65
Results for prioritization with index execution_time asc, priority desc I just gave up, it was too slow... count mean 1m rate 5m rate 15m rate
total 1600 19.76 13.23 4.62 1.68

Query plans - explain (analyze, buffers)

EXPLAIN (ANALYZE, BUFFERS)
SELECT task_name, task_instance
FROM scheduled_tasks WHERE picked = false and execution_time <= now()
ORDER BY priority desc, execution_time ASC FOR UPDATE SKIP LOCKED
LIMIT 100

Query plan without index on priority

Limit  (cost=645821.29..645822.54 rows=100 width=39) (actual time=11979.371..11979.485 rows=100 loops=1)
  Buffers: shared hit=113737, temp read=138357 written=206674
  ->  LockRows  (cost=645821.29..770819.29 rows=9999840 width=39) (actual time=11979.370..11979.476 rows=100 loops=1)
        Buffers: shared hit=113737, temp read=138357 written=206674
        ->  Sort  (cost=645821.29..670820.89 rows=9999840 width=39) (actual time=11979.345..11979.372 rows=100 loops=1)
              Sort Key: priority DESC, execution_time
              Sort Method: external merge  Disk: 547248kB
              Buffers: shared hit=113637, temp read=138357 written=206674
              ->  Seq Scan on scheduled_tasks  (cost=0.00..263634.60 rows=9999840 width=39) (actual time=0.014..2431.470 rows=10000000 loops=1)
                    Filter: ((NOT picked) AND (execution_time <= now()))
                    Buffers: shared hit=113637
Planning Time: 0.107 ms
Execution Time: 12091.458 ms

Query plan with index priority desc, execution_time asc

Limit  (cost=0.56..9.29 rows=100 width=39) (actual time=0.028..0.144 rows=100 loops=1)
  Buffers: shared hit=117 dirtied=9
  ->  LockRows  (cost=0.56..872617.26 rows=9999840 width=39) (actual time=0.027..0.135 rows=100 loops=1)
        Buffers: shared hit=117 dirtied=9
        ->  Index Scan using priority_execution_time_idx on scheduled_tasks  (cost=0.56..772618.86 rows=9999840 width=39) (actual time=0.021..0.084 rows=100 loops=1)
              Index Cond: (execution_time <= now())
              Filter: (NOT picked)
              Buffers: shared hit=17 dirtied=9
Planning Time: 0.283 ms
Execution Time: 0.220 ms

Query plan with index execution_time asc, priority desc

Limit  (cost=469587.40..469588.65 rows=100 width=40) (actual time=14596.903..14597.121 rows=100 loops=1)
  Buffers: shared hit=113737 dirtied=103971, temp read=138357 written=206674
  ->  LockRows  (cost=469587.40..553112.54 rows=6682011 width=40) (actual time=14596.902..14597.111 rows=100 loops=1)
        Buffers: shared hit=113737 dirtied=103971, temp read=138357 written=206674
        ->  Sort  (cost=469587.40..486292.43 rows=6682011 width=40) (actual time=14596.873..14596.898 rows=100 loops=1)
              Sort Key: priority DESC, execution_time
              Sort Method: external merge  Disk: 547248kB
              Buffers: shared hit=113637 dirtied=103971, temp read=138357 written=206674
              ->  Seq Scan on scheduled_tasks  (cost=0.00..214205.74 rows=6682011 width=40) (actual time=0.026..5012.498 rows=10000000 loops=1)
                    Filter: ((NOT picked) AND (execution_time <= now()))
                    Buffers: shared hit=113637 dirtied=103971
Planning Time: 0.178 ms
Execution Time: 14794.232 ms

Query plan when prioritization is disabled (ORDER BY execution_time ASC)

Limit  (cost=0.44..5.85 rows=100 width=35) (actual time=0.024..0.101 rows=100 loops=1)
  Buffers: shared hit=105 dirtied=2
  ->  LockRows  (cost=0.44..541357.98 rows=10000056 width=35) (actual time=0.024..0.092 rows=100 loops=1)
        Buffers: shared hit=105 dirtied=2
        ->  Index Scan using execution_time_idx on scheduled_tasks  (cost=0.44..441357.42 rows=10000056 width=35) (actual time=0.015..0.038 rows=100 loops=1)
              Index Cond: (execution_time <= now())
              Filter: (NOT picked)
              Buffers: shared hit=5
Planning Time: 0.258 ms
Execution Time: 0.125 ms
kagkarlsson commented 2 weeks ago

Sorry I haven't followed up earlier. Good job on the testing! Excellent with a full test using concurrent schedulers and detailed statistics 👏.

wasn't certain what did you mean with "not due" executions, so happy to add them later

To make the testing more realistic we have to assume that there are a large number of executions which are not due (also high priority executions that are not due yet).

Index (priority,execution_time) will be better when most executions are due, i.e. execution-time have passed. Index (execution_time,priority) will be better when most executions are not due, i.e. execution-time have not passed yet (future executions).

My assumption would be that the realistic scenario would be that there are more future executions than due. If there are throughput problems, there will however eventually be a significant amount of due executions, which is the scenario where priority is useful.

So for the testing, I think we should add at least the same amount of future executions to the table as there are due (maybe even a factor higher).
Due: 1M, future: 10M might be a better distribution? 🤔

kagkarlsson commented 2 weeks ago

Have you considered splitting the "due task detection" from the picking step?

@GeorgEchterling not really. That would require an additional update and roundtrip to the database 🤔

(on the other hand, the performance will likely be more predictable)

simeq commented 2 weeks ago

Thanks for your explanation @kagkarlsson.

I ran the tests again, started the same instances and filled the scheduler with 1M due tasks (random -60 minutes) and 10M in the future (random +60 minutes), with random priorities from 1 to 10.

priority desc, execution_time asc is the correct index because PostgreSQL is still doing seq scan for execution_time asc, priority desc regardless of more tasks that are not due. But there is a significant drop in events rate when prioritization is enabled.

Results

Results for scheduler without prioritization count mean 1m rate 5m rate 15m rate
vm1 252366 2274.00 2033.30 2186.72 2174.81
vm2 247850 2233.27 1989.09 2122.85 2104.44
vm3 251391 2265.19 2026.89 2173.77 2160.37
vm2 248393 2238.11 2004.79 2170.64 2164.12
total 1000000 9010.57 8054.07 8653.98 8603.74
Results for prioritization with index priority desc, execution_time asc count mean 1m rate 5m rate 15m rate
vm1 250200 297.86 275.29 287.74 293.46
vm2 249400 296.91 274.74 286.37 288.67
vm3 250000 297.63 277.64 287.47 288.88
vm2 250400 298.10 278.52 288.00 289.24
total 1000000 1190.5 1106.19 1149.58 1160.25
Results for prioritization with index execution_time asc, priority desc count mean 1m rate 5m rate 15m rate
vm1 252324 58.16 55.20 44.27 41.51
vm2 247834 58.18 55.19 44.27 41.51
vm3 252391 56.19 55.42 44.28 41.51
vm2 247451 58.18 55.20 44.27 41.51
total 1000000 230.71 221.01 177.09 166.04

Query plans

Query plan with index priority desc, execution_time asc was:

Limit  (cost=0.56..15.45 rows=100 width=38) (actual time=148.539..148.643 rows=100 loops=1)
  Buffers: shared hit=31277
  ->  LockRows  (cost=0.56..813440.53 rows=5464003 width=38) (actual time=148.538..148.633 rows=100 loops=1)
        Buffers: shared hit=31277
        ->  Index Scan using priority_execution_time_idx on scheduled_tasks  (cost=0.56..758800.50 rows=5464003 width=38) (actual time=148.519..148.562 rows=100 loops=1)
              Index Cond: (execution_time <= now())
              Filter: (NOT picked)
              Buffers: shared hit=31177
Planning Time: 0.113 ms
Execution Time: 148.676 ms

Query plan with index execution_time asc, priority desc was:

Limit  (cost=336782.19..336783.44 rows=100 width=38) (actual time=7200.015..7200.163 rows=100 loops=1)
  Buffers: shared hit=125100 dirtied=110046, temp read=3466 written=9883
  ->  LockRows  (cost=336782.19..386525.02 rows=3979426 width=38) (actual time=7200.014..7200.153 rows=100 loops=1)
        Buffers: shared hit=125100 dirtied=110046, temp read=3466 written=9883
        ->  Sort  (cost=336782.19..346730.76 rows=3979426 width=38) (actual time=7199.988..7200.013 rows=100 loops=1)
              Sort Key: priority DESC, execution_time
              Sort Method: external merge  Disk: 54112kB
              Buffers: shared hit=125000 dirtied=110023, temp read=3466 written=9883
              ->  Seq Scan on scheduled_tasks  (cost=0.00..184691.39 rows=3979426 width=38) (actual time=0.015..6648.807 rows=1000000 loops=1)
                    Filter: ((NOT picked) AND (execution_time <= now()))
                    Rows Removed by Filter: 10000001
                    Buffers: shared hit=125000 dirtied=110023
Planning Time: 0.369 ms
Execution Time: 7212.177 ms
simeq commented 2 weeks ago

Basically, I would say that the usage of prioritization depends on the usage scenario of the scheduler.

I got an instance of scheduler where there are millions of recurring tasks with a persistent schedule and a few millions of one-time tasks added with execution time now() once a day. So for this, I'm guessing that separate schedulers would be better for prioritization.

But in instances where we operate only on one-time tasks that are always added with execution time now() - this type of prioritization would be a suitable solution.