tarantool / queue

Create task queues, add and take jobs, monitor failed tasks
Other
234 stars 52 forks source link

queue stuck in INIT state on tarantool 3 #226

Closed vanyarock01 closed 3 months ago

vanyarock01 commented 4 months ago

Problem

I tried to run cluster on tarantool 3 with queue on shard. Everething was fine until i added a replicas to queue replicaset.

image (3)

Sometimes after cluster start i got strange behavior: queue stuck in INIT state on RW instance.

Reproduce

Versions:

config.yaml

app: {}
credentials:
  users:
    client:
      password: super-client
      roles:
        - super
    replicator:
      password: usage-storage-replicator-secret
      roles:
        - replication
    storage:
      password: usage-storage-storage-secret
      roles:
        - super
        - sharding
groups:
  routers:
    replicasets:
      router_001:
        bootstrap_leader: router_001
        instances:
          router_001:
            iproto:
              listen:
                - uri: 127.0.0.1:6100
        leader: router_001
    roles:
    sharding:
      roles:
        - router
  storages:
    app:
      module: app.storage
    replicasets:
      storage_001:
        bootstrap_leader: storage_001_01
        database:
          replicaset_uuid: 45a6e000-0000-0001-0000-57012a6e0000
        instances:
          storage_001_01:
            database:
              instance_uuid: 45a6e000-0000-0001-0001-57012a6e0000
            iproto:
              listen:
                - uri: 127.0.0.1:6011
          storage_001_02:
            database:
              instance_uuid: 45a6e000-0000-0001-0002-57012a6e0000
            iproto:
              listen:
                - uri: 127.0.0.1:6012
        leader: storage_001_01
      storage_002:
        bootstrap_leader: storage_002_01
        database:
          replicaset_uuid: 45a6e000-0000-0002-0000-57012a6e0000
        instances:
          storage_002_01:
            database:
              instance_uuid: 45a6e000-0000-0002-0001-57012a6e0000
            iproto:
              listen:
                - uri: 127.0.0.1:6021
          storage_002_02:
            database:
              instance_uuid: 45a6e000-0000-0002-0002-57012a6e0000
            iproto:
              listen:
                - uri: 127.0.0.1:6022
        leader: storage_002_01
    roles:
      - app.roles.queue

    sharding:
      roles:
        - storage
iproto:
  advertise:
    peer:
      login: replicator
    sharding:
      login: storage
log:
  level: info
  modules:
    roles.queue: verbose
replication:
  bootstrap_strategy: config
  connect_timeout: 3
  failover: manual
roles_cfg:
  app.roles.queue:
    take_timeout: 0
    ttl: 86400
    ttr: 3

app/roles/billing.lua

local log = require 'log'.new('roles.billing-queue')
local fiber = require 'fiber'

local queue = require 'queue'
queue.cfg({ in_replicaset = true })
rawset(_G, 'queue', queue)

local M = {
    role_name = ...,
    defaults = {
        netbox_timeout = 1.5,
        ttr = 2,
        ttl = 30 * 86400,
        take_timeout = 1,
    },
    tube_name = 'billing',
}

function M.validate(cfg)
    cfg = cfg or {}

    if cfg.ttr then
        assert(type(cfg.ttr) == 'number', 'ttr must be a number')
        assert(cfg.ttr > 0, 'ttr must be a positive number')
    end

    if cfg.ttl then
        assert(type(cfg.ttl) == 'number', 'ttl must be a number')
        assert(cfg.ttl > 0, 'ttl must be a positive number')
    end
end

function M.apply(cfg)
    log.info('[queue_debug] apply role')

    -- workaround for correct queue bootstrap
    -- fiber.create(function ()
    --     log.info('[queue_debug] wait master')
    --     box.ctl.wait_rw()
    --     -- trigger box.cfg wrapper
    --     log.info('[queue_debug] box.cfg {}')
    --     box.cfg {}
    --     -- create queue
    --     log.info('[queue_debug] ro=%s', box.info.ro)
    --     if not queue.create_tube(M.tube_name, 'fifottl', { if_not_exists = true }) then
    --         log.info('[queue_debug] failed tube creation')
    --     end
    -- end)

    -- without workaround
    if not queue.create_tube(M.tube_name, 'fifottl', { if_not_exists = true }) then
        log.info('[queue_debug] failed tube creation')
    end
end

function M.stop()
end

return M