moonlibs / xqueue

Imbue tarantool space with power of queue
15 stars 12 forks source link

xQueue

Coverage Status

Add power of the Queue into Tarantool Space

Latest Release: 5.0.0

Backward compatibility rockspec: rockspecs/xqueue-scm-5.rockspec

Always latest rockspec: rockspecs/xqueue-dev-1.rockspec

Status

Interface

Creator methods

Imbue space with power of queue

M.upgrade(space, {
    format = {
        -- space format. applied to space.format() if passed
    },
    fields = {
        -- id is always taken from pk
        status   = 'status_field_name'    | status_field_no,
        runat    = 'runat_field_name'     | runat_field_no,
        priority = 'priority_field_name'  | priority_field_no,
    },
    features = {
        id = 'auto_increment' | 'time64' | 'uuid' | 'required' | function() ... return task_id end
        -- auto_increment - if pk is number, then use it for auto_increment
        -- uuid - if pk is string, then use uuid for id
        -- required - primary key MUST be present in tuple during put
        -- function - funciton will be called to aquire id for task
        retval = 'table' | 'tuple'
        -- table requires space format. default if applicable. a bit slower

        buried = true,           -- if true, support bury/kick
        delayed = true,          -- if true, support delayed tasks, requires `runat`

        keep = true,             -- if true, keep ack'ed tasks in [D]one state, instead of deleting
        -- mutually exclusive with zombie

        zombie = true|number,    -- requires `runat` field
        -- if number, then with default zombie delay, otherwise only if set delay during ack
        -- mutually exclusive with keep

        ttl    = true|number,    -- requires `runat` field
        -- Time To Live. Task is expired unless taken within time
        -- if number, then with default ttl, otherwise only if set during put/release
        ttr    = true|number,    -- requires `runat` field
        -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time
        -- if number, then with default ttl, otherwise only if set during take
    },
    -- Set tubes for which statistics collector will be enabled
    tube_stats = { 'tube-1', 'tube-2' },
})

Producer methods

box.space.myqueue:put{ name="xxx"; data="yyy"; }
box.space.myqueue:put{ "xxx","yyy" }
box.space.myqueue:put(box.tuple.new{ 1,"xxx","yyy" })

box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5 })
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })

Consumer methods

Admin methods

Explanation of Statuses in :stats()

Status Description
X Special status, means task went to void, or appeared from void (makes sense in transitions)
R (Ready) Task is Ready to be Taken
W (Waiting) Task processing was delayed by producer or consumer
T (Taken) Task has been taken by consumer
B (Buried) Fatal error happened during task processing (and consumer buried it)
Z (Zombie) Task was processed successfully and left in the Queue for zombie_delay
D (Done) Task was processed successfully and left in the Queue
S Extra status for tube transition

Status Transitions

Transition Description
X -> R Task was put into the Queue into Ready state (method :put)
X -> W Task was put into the Queue into Waiting state (method :put with specified delay)
W -> R Task was scheduled by runat mechanism of the Queue and became ready to be Taken by consumer
R -> T Task was taken by consumer
T -> W Consumer returned task into the Queue and delayed its execution
T -> R Consumer returned task into the Queue for immediate execution
T -> X Consumer acknowledged successful execution of the Task, and task was removed from the Queue
T -> Z Consumer acknowledged successful execution of the Task, and task was moved to Zombie state for zombie_delay timeout
T -> D Consumer acknowledged successful execution of the Task, and task was moved to Done status and will be left in the Queue forever
T -> S Task was moved into another tube
S -> R Task was moved from another tube to be Ready
S -> W Task was moved from another tube to be Waiting
S -> D Task was moved from another tube to be Done
S -> B Task was moved from another tube to be Buried

Benchmarks

❯ .rocks/bin/luabench -d 1000000x
Tarantool version: Tarantool 2.10.7-0-g60f7e18
Tarantool build: Darwin-arm64-RelWithDebInfo (static)
Tarantool build flags:  -Wno-unknown-pragmas -fexceptions -funwind-tables -fno-common -Wformat -Wformat-security -Werror=format-security -fstack-protector-strong -fPIC -fmacro-prefix-map=/var/folders/8x/1m5v3n6d4mn62g9w_65vvt_r0000gn/T/tarantool_install1565297302=. -std=c11 -Wall -Wextra -Wno-strict-aliasing -Wno-char-subscripts -Wno-gnu-alignof-expression -Wno-cast-function-type
CPU: Apple M1 @ 8
JIT: Enabled
JIT: fold cse dce fwd dse narrow loop abc sink fuse
Duration: 1000000 iters

--- BENCH: 001_put_take_bench::bench_producer:producer-1
 1000000             44046 ns/op             22703 op/s     4137 B/op   +3946.04MB

--- BENCH: 001_put_take_bench::bench_producer:producer-2
 1000000             27519 ns/op             36339 op/s     4282 B/op   +4084.35MB

--- BENCH: 001_put_take_bench::bench_producer:producer-4
 1000000             21510 ns/op             46491 op/s     4500 B/op   +4291.69MB

--- BENCH: 001_put_take_bench::bench_producer:producer-8
 1000000             20804 ns/op             48068 op/s     4379 B/op   +4176.26MB

--- BENCH: 001_put_take_bench::bench_producer:producer-12
 1000000             20383 ns/op             49062 op/s     4372 B/op   +4169.59MB

--- BENCH: 001_put_take_bench::bench_producer:producer-16
 1000000             21495 ns/op             46523 op/s     4215 B/op   +4019.98MB

--- BENCH: 001_put_take_bench::bench_producer:producer-20
 1000000             23676 ns/op             42238 op/s     4258 B/op   +4061.49MB

--- BENCH: 001_put_take_bench::bench_producer:producer-24
 1000000             24456 ns/op             40891 op/s     4274 B/op   +4076.65MB

TODO