dragonflydb / dragonfly

A modern replacement for Redis and Memcached
https://www.dragonflydb.io/
Other
25.98k stars 955 forks source link

v1.25.0 crashes when running some BullMQ tests #4113

Closed manast closed 2 weeks ago

manast commented 2 weeks ago

Describe the bug Since v1.25.0 released today, BullMQ's test suite produces a hard crash in dragonfly's server.

To Reproduce

Steps to reproduce the behavior: Just run the test suite but one of the tests that reproduces the issue is the first test in test_concurrency.ts: https://github.com/taskforcesh/bullmq/blob/master/tests/test_concurrency.ts#L33

Expected behavior Should not crash and test should pass

Screenshots

Environment (please complete the following information):

Reproducible Code Snippet

Additional context This is the output of the server when it crashes:

 docker run -p 6379:6379 --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly --force_epoll --cluster_mode=emulated --lock_on_hashtags --proactor_threads=4
I20241111 12:02:18.918148     1 init.cc:78] dragonfly running in opt mode.
I20241111 12:02:18.920707     1 dfly_main.cc:693] Starting dragonfly df-v1.25.0-a50679576230724c9e4f528c382870666dee48a9
* Logs will be written to the first available of the following paths:
/tmp/dragonfly.*
./dragonfly.*
* For the available flags type dragonfly [--help | --helpfull]
* Documentation can be found at: https://www.dragonflydb.io/docs
W20241111 12:02:18.923547     1 dfly_main.cc:732] SWAP is enabled. Consider disabling it when running Dragonfly.
I20241111 12:02:18.923569     1 dfly_main.cc:737] maxmemory has not been specified. Deciding myself....
I20241111 12:02:18.923571     1 dfly_main.cc:746] Found 5.86GiB available memory. Setting maxmemory to 4.69GiB
I20241111 12:02:18.926821     1 proactor_pool.cc:147] Running 4 io threads
I20241111 12:02:18.934221     1 server_family.cc:835] Host OS: Linux 6.6.32-linuxkit aarch64 with 4 threads
I20241111 12:02:18.941165     1 snapshot_storage.cc:181] Load snapshot: Searching for snapshot in directory: "/data"
W20241111 12:02:18.941254     1 server_family.cc:949] Load snapshot: No snapshot found
I20241111 12:02:18.947126    11 listener_interface.cc:101] sock[11] AcceptServer - listening on port 6379
W20241111 12:02:22.767335    13 main_service.cc:1409]  COMMAND DOCS failed with reason: syntax error
F20241111 12:14:50.497730    10 common.cc:461] Check failed: !state->squashing_info
*** Check failure stack trace: ***
    @     0xaaaac56dbd6c  google::LogMessage::SendToLog()
    @     0xaaaac56d4e50  google::LogMessage::Flush()
    @     0xaaaac56d678c  google::LogMessageFatal::~LogMessageFatal()
    @     0xaaaac50650dc  dfly::BorrowedInterpreter::BorrowedInterpreter()
    @     0xaaaac4db5798  dfly::Service::Eval()
    @     0xaaaac505809c  dfly::CommandId::Invoke()
    @     0xaaaac4dac150  dfly::Service::InvokeCmd()
    @     0xaaaac4ec2e0c  dfly::MultiCommandSquasher::SquashedHopCb()
    @     0xaaaac4ec3a54  _ZNSt17_Function_handlerIFvvEZN4dfly20MultiCommandSquasher15ExecuteSquashedEPN6facade17RedisReplyBuilderEEUlvE1_E9_M_invokeERKSt9_Any_data
    @     0xaaaac55266a8  util::fb2::FiberQueue::Run()
    @     0xaaaac50b6dac  _ZN5boost7context6detail11fiber_entryINS1_12fiber_recordINS0_5fiberEN4util3fb219FixedStackAllocatorEZNS6_6detail15WorkerFiberImplIZN4dfly9TaskQueue5StartESt17basic_string_viewIcSt11char_traitsIcEEEUlvE_JEEC4IS7_EESF_RKNS0_12preallocatedEOT_OSG_EUlOS4_E_EEEEvNS1_10transfer_tE
    @     0xaaaac55434d4  make_fcontext
*** SIGABRT received at time=1731327290 on cpu 0 ***
PC: @     0xffff93f0f200  (unknown)  (unknown)
    @     0xaaaac572f924        480  absl::lts_20240116::AbslFailureSignalHandler()
    @     0xffff941927a0       4960  (unknown)
    @     0xffff93eca67c        208  gsignal
    @     0xffff93eb7130         32  abort
    @     0xaaaac56e162c        336  google::DumpStackTraceAndExit()
    @     0xaaaac56d544c        192  google::LogMessage::Fail()
    @     0xaaaac56dbd6c         16  google::LogMessage::SendToLog()
    @     0xaaaac56d4e50        208  google::LogMessage::Flush()
    @     0xaaaac56d678c         80  google::LogMessageFatal::~LogMessageFatal()
    @     0xaaaac50650dc         16  dfly::BorrowedInterpreter::BorrowedInterpreter()
    @     0xaaaac4db5798        160  dfly::Service::Eval()
    @     0xaaaac505809c        208  dfly::CommandId::Invoke()
    @     0xaaaac4dac150        112  dfly::Service::InvokeCmd()
    @     0xaaaac4ec2e0c        672  dfly::MultiCommandSquasher::SquashedHopCb()
    @     0xaaaac4ec3a54       1072  std::_Function_handler<>::_M_invoke()
    @     0xaaaac55266a8         32  util::fb2::FiberQueue::Run()
    @     0xaaaac50b6dac        288  boost::context::detail::fiber_entry<>()
[failure_signal_handler.cc : 345] RAW: Signal 5 raised at PC=0xffff93eb71ec while already in AbslFailureSignalHandler()
*** SIGTRAP received at time=1731327290 on cpu 0 ***
PC: @     0xffff93eb71ec  (unknown)  abort
    @     0xaaaac572f924        480  absl::lts_20240116::AbslFailureSignalHandler()
    @     0xffff941927a0       4960  (unknown)
    @     0xaaaac56e162c        336  google::DumpStackTraceAndExit()
    @     0xaaaac56d544c        192  google::LogMessage::Fail()
    @     0xaaaac56dbd6c         16  google::LogMessage::SendToLog()
    @     0xaaaac56d4e50        208  google::LogMessage::Flush()
    @     0xaaaac56d678c         80  google::LogMessageFatal::~LogMessageFatal()
    @     0xaaaac50650dc         16  dfly::BorrowedInterpreter::BorrowedInterpreter()
    @     0xaaaac4db5798        160  dfly::Service::Eval()
    @     0xaaaac505809c        208  dfly::CommandId::Invoke()
    @     0xaaaac4dac150        112  dfly::Service::InvokeCmd()
    @     0xaaaac4ec2e0c        672  dfly::MultiCommandSquasher::SquashedHopCb()
    @     0xaaaac4ec3a54       1072  std::_Function_handler<>::_M_invoke()
    @     0xaaaac55266a8         32  util::fb2::FiberQueue::Run()
    @     0xaaaac50b6dac        288  boost::context::detail::fiber_entry<>()

This is the last command sent to Dragonfly and that triggers the crash:

1731325576.1345656 [0 192.168.65.1:32473] "EVAL" "--[[\n  Adds a job to the queue by doing the following:\n    - Increases the job counter if needed.\n    - Creates a new job key with the job data.\n    - if delayed:\n      - computes timestamp.\n      - adds to delayed zset.\n      - Emits a global event 'delayed' if the job is delayed.\n    - if not delayed\n      - Adds the jobId to the wait/paused list in one of three ways:\n         - LIFO\n         - FIFO\n         - prioritized.\n      - Adds the job to the \"added\" list so that workers gets notified.\n    Input:\n      KEYS[1] 'wait',\n      KEYS[2] 'paused'\n      KEYS[3] 'meta'\n      KEYS[4] 'id'\n      KEYS[5] 'completed'\n      KEYS[6] 'active'\n      KEYS[7] events stream key\n      KEYS[8] marker key\n      ARGV[1] msgpacked arguments array\n            [1]  key prefix,\n            [2]  custom id (will not generate one automatically)\n            [3]  name\n            [4]  timestamp\n            [5]  parentKey?\n            [6]  waitChildrenKey key.\n            [7]  parent dependencies key.\n            [8]  parent? {id, queueKey}\n            [9]  repeat job key\n            [10] deduplication key\n      ARGV[2] Json stringified job data\n      ARGV[3] msgpacked options\n      Output:\n        jobId  - OK\n        -5     - Missing parent key\n]]\nlocal eventsKey = KEYS[7]\nlocal jobId\nlocal jobIdKey\nlocal rcall = redis.call\nlocal args = cmsgpack.unpack(ARGV[1])\nlocal data = ARGV[2]\nlocal opts = cmsgpack.unpack(ARGV[3])\nlocal parentKey = args[5]\nlocal parent = args[8]\nlocal repeatJobKey = args[9]\nlocal deduplicationKey = args[10]\nlocal parentData\n-- Includes\n--[[\n  Function to add job in target list and add marker if needed.\n]]\n-- Includes\n--[[\n  Add marker if needed when a job is available.\n]]\nlocal function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\n  if not isPausedOrMaxed then\n    rcall(\"ZADD\", markerKey, 0, \"0\")\n  end  \nend\nlocal function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)\n  rcall(pushCmd, targetKey, jobId)\n  addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n  Function to debounce a job.\n]]\nlocal function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents)\n  local deduplicationId = deduplicationOpts and deduplicationOpts['id']\n  if deduplicationId then\n    local ttl = deduplicationOpts['ttl']\n    local deduplicationKeyExists\n    if ttl then\n      deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')\n    else\n      deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')\n    end\n    if deduplicationKeyExists then\n      local currentDebounceJobId = rcall('GET', deduplicationKey)\n      rcall(\"XADD\", eventsKey, \"MAXLEN\", \"~\", maxEvents, \"*\", \"event\",\n        \"debounced\", \"jobId\", currentDebounceJobId, \"debounceId\", deduplicationId)\n      rcall(\"XADD\", eventsKey, \"MAXLEN\", \"~\", maxEvents, \"*\", \"event\",\n        \"deduplicated\", \"jobId\", currentDebounceJobId, \"deduplicationId\", deduplicationId)\n      return currentDebounceJobId\n    end\n  end\nend\n--[[\n  Function to get max events value or set by default 10000.\n]]\nlocal function getOrSetMaxEvents(metaKey)\n    local maxEvents = rcall(\"HGET\", metaKey, \"opts.maxLenEvents\")\n    if not maxEvents then\n        maxEvents = 10000\n        rcall(\"HSET\", metaKey, \"opts.maxLenEvents\", maxEvents)\n    end\n    return maxEvents\nend\n--[[\n  Function to check for the meta.paused key to decide if we are paused or not\n  (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)\n  local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n  if queueAttributes[1] then\n    return pausedKey, true\n  else\n    if queueAttributes[2] then\n      local activeCount = rcall(\"LLEN\", activeKey)\n      if activeCount >= tonumber(queueAttributes[2]) then\n        return waitKey, true\n      else\n        return waitKey, false\n      end\n    end\n  end\n  return waitKey, false\nend\n--[[\n  Function to handle the case when job is duplicated.\n]]\n-- Includes\n--[[\n    This function is used to update the parent's dependencies if the job\n    is already completed and about to be ignored. The parent must get its\n    dependencies updated to avoid the parent job being stuck forever in \n    the waiting-children state.\n]]\n-- Includes\n--[[\n  Validate and move or add dependencies to parent.\n]]\n-- Includes\n--[[\n  Validate and move parent to active if needed.\n]]\n-- Includes\n--[[\n  Add delay marker if needed.\n]]\n-- Includes\n--[[\n  Function to return the next delayed job timestamp.\n]]\nlocal function getNextDelayedTimestamp(delayedKey)\n  local result = rcall(\"ZRANGE\", delayedKey, 0, 0, \"WITHSCORES\")\n  if #result then\n    local nextTimestamp = tonumber(result[2])\n    if nextTimestamp ~= nil then \n      return nextTimestamp / 0x1000\n    end\n  end\nend\nlocal function addDelayMarkerIfNeeded(markerKey, delayedKey)\n  local nextTimestamp = getNextDelayedTimestamp(delayedKey)\n  if nextTimestamp ~= nil then\n    -- Replace the score of the marker with the newest known\n    -- next timestamp.\n    rcall(\"ZADD\", markerKey, nextTimestamp, \"1\")\n  end\nend\n--[[\n  Function to add job considering priority.\n]]\n-- Includes\nlocal function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,\n  isPausedOrMaxed)\n  local prioCounter = rcall(\"INCR\", priorityCounterKey)\n  local score = priority * 0x100000000 + prioCounter % 0x100000000\n  rcall(\"ZADD\", prioritizedKey, score, jobId)\n  addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)\nend\n--[[\n  Function to check if queue is paused or maxed\n  (since an empty list and !EXISTS are not really the same).\n]]\nlocal function isQueuePausedOrMaxed(queueMetaKey, activeKey)\n  local queueAttributes = rcall(\"HMGET\", queueMetaKey, \"paused\", \"concurrency\")\n  if queueAttributes[1] then\n    return true\n  else\n    if queueAttributes[2] then\n      local activeCount = rcall(\"LLEN\", activeKey)\n      return activeCount >= tonumber(queueAttributes[2])\n    end\n  end\n  return false\nend\nlocal function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,\n                                        parentKey, parentId, timestamp)\n    local isParentActive = rcall(\"ZSCORE\",\n                                 parentQueueKey .. \":waiting-children\", parentId)\n    if rcall(\"SCARD\", parentDependenciesKey) == 0 and isParentActive then\n        rcall(\"ZREM\", parentQueueKey .. \":waiting-children\", parentId)\n        local parentWaitKey = parentQueueKey .. \":wait\"\n        local parentPausedKey = parentQueueKey .. \":paused\"\n        local parentActiveKey = parentQueueKey .. \":active\"\n        local parentMetaKey = parentQueueKey .. \":meta\"\n        local parentMarkerKey = parentQueueKey .. \":marker\"\n        local jobAttributes = rcall(\"HMGET\", parentKey, \"priority\", \"delay\")\n        local priority = tonumber(jobAttributes[1]) or 0\n        local delay = tonumber(jobAttributes[2]) or 0\n        if delay > 0 then\n            local delayedTimestamp = tonumber(timestamp) + delay\n            local score = delayedTimestamp * 0x1000\n            local parentDelayedKey = parentQueueKey .. \":delayed\"\n            rcall(\"ZADD\", parentDelayedKey, score, parentId)\n            rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"delayed\",\n                  \"jobId\", parentId, \"delay\", delayedTimestamp)\n            addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)\n        else\n            if priority == 0 then\n                local parentTarget, isParentPausedOrMaxed =\n                    getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey,\n                                       parentPausedKey)\n                addJobInTargetList(parentTarget, parentMarkerKey, \"RPUSH\", isParentPausedOrMaxed,\n                    parentId)\n            else\n                local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)\n                addJobWithPriority(parentMarkerKey,\n                                   parentQueueKey .. \":prioritized\", priority,\n                                   parentId, parentQueueKey .. \":pc\", isPausedOrMaxed)\n            end\n            rcall(\"XADD\", parentQueueKey .. \":events\", \"*\", \"event\", \"waiting\",\n                  \"jobId\", parentId, \"prev\", \"waiting-children\")\n        end\n    end\nend\nlocal function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,\n  parentId, jobIdKey, returnvalue, timestamp )\n  local processedSet = parentKey .. \":processed\"\n  rcall(\"HSET\", processedSet, jobIdKey, returnvalue)\n  moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)\nend\nlocal function updateExistingJobsParent(parentKey, parent, parentData,\n                                        parentDependenciesKey, completedKey,\n                                        jobIdKey, jobId, timestamp)\n    if parentKey ~= nil then\n        if rcall(\"ZSCORE\", completedKey, jobId) ~= false then\n            local returnvalue = rcall(\"HGET\", jobIdKey, \"returnvalue\")\n            updateParentDepsIfNeeded(parentKey, parent['queueKey'],\n                                     parentDependenciesKey, parent['id'],\n                                     jobIdKey, returnvalue, timestamp)\n        else\n            if parentDependenciesKey ~= nil then\n                rcall(\"SADD\", parentDependenciesKey, jobIdKey)\n            end\n        end\n        rcall(\"HMSET\", jobIdKey, \"parentKey\", parentKey, \"parent\", parentData)\n    end\nend\nlocal function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,\n  parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)\n  local existedParentKey = rcall(\"HGET\", jobKey, \"parentKey\")\n  if not existedParentKey or existedParentKey == currentParentKey then\n    updateExistingJobsParent(currentParentKey, currentParent, parentData,\n      parentDependenciesKey, completedKey, jobKey,\n      jobId, timestamp)\n  else\n    if currentParentKey ~= nil and currentParentKey ~= existedParentKey\n      and (rcall(\"EXISTS\", existedParentKey) == 1) then\n      return -7\n    end\n  end\n  rcall(\"XADD\", eventsKey, \"MAXLEN\", \"~\", maxEvents, \"*\", \"event\",\n    \"duplicated\", \"jobId\", jobId)\n  return jobId .. \"\" -- convert to string\nend\n--[[\n  Function to store a job\n]]\nlocal function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,\n                        parentKey, parentData, repeatJobKey)\n    local jsonOpts = cjson.encode(opts)\n    local delay = opts['delay'] or 0\n    local priority = opts['priority'] or 0\n    local debounceId = opts['de'] and opts['de']['id']\n    local optionalValues = {}\n    if parentKey ~= nil then\n        table.insert(optionalValues, \"parentKey\")\n        table.insert(optionalValues, parentKey)\n        table.insert(optionalValues, \"parent\")\n        table.insert(optionalValues, parentData)\n    end\n    if repeatJobKey ~= nil then\n        table.insert(optionalValues, \"rjk\")\n        table.insert(optionalValues, repeatJobKey)\n    end\n    if debounceId then\n        table.insert(optionalValues, \"deid\")\n        table.insert(optionalValues, debounceId)\n    end\n    rcall(\"HMSET\", jobIdKey, \"name\", name, \"data\", data, \"opts\", jsonOpts,\n          \"timestamp\", timestamp, \"delay\", delay, \"priority\", priority,\n          unpack(optionalValues))\n    rcall(\"XADD\", eventsKey, \"*\", \"event\", \"added\", \"jobId\", jobId, \"name\", name)\n    return delay, priority\nend\nif parentKey ~= nil then\n    if rcall(\"EXISTS\", parentKey) ~= 1 then return -5 end\n    parentData = cjson.encode(parent)\nend\nlocal jobCounter = rcall(\"INCR\", KEYS[4])\nlocal metaKey = KEYS[3]\nlocal maxEvents = getOrSetMaxEvents(metaKey)\nlocal parentDependenciesKey = args[7]\nlocal timestamp = args[4]\nif args[2] == \"\" then\n    jobId = jobCounter\n    jobIdKey = args[1] .. jobId\nelse\n    jobId = args[2]\n    jobIdKey = args[1] .. jobId\n    if rcall(\"EXISTS\", jobIdKey) == 1 then\n        return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,\n            parentData, parentDependenciesKey, KEYS[5], eventsKey,\n            maxEvents, timestamp)\n    end\nend\nlocal deduplicationJobId = deduplicateJob(args[1], opts['de'],\n  jobId, deduplicationKey, eventsKey, maxEvents)\nif deduplicationJobId then\n  return deduplicationJobId\nend\n-- Store the job.\nstoreJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,\n         parentKey, parentData, repeatJobKey)\nlocal target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2])\n-- LIFO or FIFO\nlocal pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'\naddJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId)\n-- Emit waiting event\nrcall(\"XADD\", eventsKey, \"MAXLEN\", \"~\", maxEvents, \"*\", \"event\", \"waiting\",\n      \"jobId\", jobId)\n-- Check if this job is a child of another job, if so add it to the parents dependencies\nif parentDependenciesKey ~= nil then\n    rcall(\"SADD\", parentDependenciesKey, jobIdKey)\nend\nreturn jobId .. \"\" -- convert to string\n" "8" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:wait" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:paused" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:meta" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:id" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:completed" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:active" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:events" "{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:marker" "\x9a\xd9.{b}:test-2ec5c4ed-f994-4d0b-9516-08c4f423d244:\xa0\xa4test\xcbBy1\xb0\xbc<@\x00\xc0\xc0\xc0\xc0\xc0\xc0" "{\"foo\":\"bar0\"}" "\xde\x00\x04\xa8attempts\x00\xa5jobId\xc0\xa2tm\xc0\xa7backoff\xc0"
Error: Server closed the connection

Let me know if you need more info.

romange commented 2 weeks ago

@manast thanks for notifying us. Can you please remind me how to run the test suite?

romange commented 2 weeks ago

nm, figured this out from your actions:

yarn install --frozen-lockfile --non-interactive
yarn build
BULLMQ_TEST_PREFIX={b} yarn test

with local dragonfly: ./dragonfly --dbfilename= --noversion_check --maxmemory=8G --cluster_mode=emulated --lock_on_hashtags --logtostderr

mogery commented 2 weeks ago

Hi there. This same issue broke our production :D Waiting for a fix. Reverting to ghcr.io/dragonflydb/dragonfly:v1.24.0 works. Would probably be good to push a revert to 1.25.1 to fix deployments fetching the latest version.

romange commented 2 weeks ago

Good idea, I pushed the latest tag to point to v1.24.0 for now.