taskforcesh / bullmq-pro-support

Support repository for BullMQ Pro edition.
1 stars 0 forks source link

Remove job that has groups fails (sometimes) v5.1.4 #32

Closed dginzbourg closed 1 year ago

dginzbourg commented 1 year ago

Hi

queue.remove(jobId) fails with the following error.

{
  "stack": "ReplyError: ERR user_script:191: attempt to concatenate local 'groupId' (a boolean value) script: 9a062516ccf9b1d7f91b7d0867948d64fc72ce5b, on @user_script:191.\n    at parseError (/Users/dginzbourg/repos/vcp-bulk/node_modules/redis-parser/lib/parser.js:179:12)\n    at parseType (/Users/dginzbourg/repos/vcp-bulk/node_modules/redis-parser/lib/parser.js:302:14)",
  "message": "ERR user_script:191: attempt to concatenate local 'groupId' (a boolean value) script: 9a062516ccf9b1d7f91b7d0867948d64fc72ce5b, on @user_script:191.",
  "command": {
    "name": "eval",
    "args": [
      "--[[\n    Remove a job from all the queues it may be in as well as all its data.\n    In order to be able to remove a job, it cannot be active.\n    Input:\n      KEYS[1] queue prefix\n      ARGV[1] jobId\n    Events:\n      'removed'\n]]\nlocal rcall = redis.call\n-- Includes\n--[[\n  Functions to destructure job key.\n  Just a bit of warning, these functions may be a bit slow and affect performance significantly.\n]]\nlocal getJobIdFromKey = function (jobKey)\n  return string.match(jobKey, \".*:(.*)\")\nend\nlocal getJobKeyPrefix = function (jobKey, jobId)\n  return string.sub(jobKey, 0, #jobKey - #jobId)\nend\n--[[\n  Function to recursively check if there are no locks\n  on the jobs to be removed.\n  returns:\n    boolean\n]]\nlocal function isLocked( prefix, jobId)\n  local jobKey = prefix .. jobId;\n  -- Check if this job is locked\n  local lockKey = jobKey .. ':lock'\n  local lock = rcall(\"GET\", lockKey)\n  if not lock then\n    local dependencies = rcall(\"SMEMBERS\", jobKey .. \":dependencies\")\n    if (#dependencies > 0) then\n      for i, childJobKey in ipairs(dependencies) do\n        -- We need to get the jobId for this job.\n        local childJobId = getJobIdFromKey(childJobKey)\n        local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)\n        local result = isLocked( childJobPrefix, childJobId )\n        if result then\n          return true\n        end\n      end\n    end\n    return false\n  end\n  return true\nend\n--[[\n  Check if this job has a parent. If so we will just remove it from\n  the parent child list, but if it is the last child we should move the parent to \"wait/paused\"\n  which requires code from \"moveToFinished\"\n]]\n--[[\n  Function to check for the meta.paused key to decide if we are paused or not\n  (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, waitKey, pausedKey)\n  if rcall(\"HEXISTS\", queueMetaKey, \"paused\") ~= 1 then\n    return waitKey\n  else\n    return pausedKey\n  end\nend\nlocal function moveParentToWait(parentPrefix, parentId, emitEvent)\n  local parentTarget = getTargetQueueList(parentPrefix .. \"meta\", parentPrefix .. \"wait\", parentPrefix .. \"paused\")\n  rcall(\"RPUSH\", parentTarget, parentId)\n  if emitEvent then\n    local parentEventStream = parentPrefix .. \"events\"\n    rcall(\"XADD\", parentEventStream, \"*\", \"event\", \"waiting\", \"jobId\", parentId, \"prev\", \"waiting-children\")\n  end\nend\nlocal function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)\n  if parentKey then\n    local parentProcessedKey = parentKey .. \":processed\"\n    rcall(\"HDEL\", parentProcessedKey, jobKey)\n    local parentDependenciesKey = parentKey .. \":dependencies\"\n    local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n    if result > 0 then\n      local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n      if pendingDependencies == 0 then\n        local parentId = getJobIdFromKey(parentKey)\n        local parentPrefix = getJobKeyPrefix(parentKey, parentId)\n        local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n        if numRemovedElements == 1 then\n          if hard then\n            if parentPrefix == baseKey then\n              removeParentDependencyKey(parentKey, hard, nil, baseKey)\n              rcall(\"DEL\", parentKey, parentKey .. ':logs',\n                parentKey .. ':dependencies', parentKey .. ':processed')\n            else\n              moveParentToWait(parentPrefix, parentId)\n            end\n          else\n            moveParentToWait(parentPrefix, parentId, true)\n          end\n        end\n      end\n    end\n  else\n    local missedParentKey = rcall(\"HGET\", jobKey, \"parentKey\")\n    if( (type(missedParentKey) == \"string\") and missedParentKey ~= \"\" and (rcall(\"EXISTS\", missedParentKey) == 1)) then\n      local parentProcessedKey = missedParentKey .. \":processed\"\n      rcall(\"HDEL\", parentProcessedKey, jobKey)\n      local parentDependenciesKey = missedParentKey .. \":dependencies\"\n      local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n      if result > 0 then\n        local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n        if pendingDependencies == 0 then\n          local parentId = getJobIdFromKey(missedParentKey)\n          local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)\n          local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n          if numRemovedElements == 1 then\n            if hard then\n              if parentPrefix == baseKey then\n                removeParentDependencyKey(missedParentKey, hard, nil, baseKey)\n                rcall(\"DEL\", missedParentKey, missedParentKey .. ':logs',\n                  missedParentKey .. ':dependencies', missedParentKey .. ':processed')\n              else\n                moveParentToWait(parentPrefix, parentId)\n              end\n            else\n              moveParentToWait(parentPrefix, parentId, true)\n            end\n          end\n        end\n      end\n    end\n  end\nend\n--[[\n  Function to remove from any state.\n  returns:\n    prev state\n]]\n-- Includes\n--[[\n  Function to remove from any state.\n  returns:\n    prev state\n]]\nlocal function removeJobFromAnyState( prefix, jobId)\n  if rcall(\"LREM\", prefix .. \"wait\", 0, jobId) == 1 then\n    return \"wait\"\n  elseif rcall(\"LREM\", prefix .. \"paused\", 0, jobId) == 1 then\n    return \"paused\"\n  elseif rcall(\"LREM\", prefix .. \"active\", 0, jobId) == 1 then\n    return \"active\"\n  elseif rcall(\"ZREM\", prefix .. \"waiting-children\", jobId) == 1 then\n    return \"waiting-children\"\n  elseif rcall(\"ZREM\", prefix .. \"delayed\", jobId) == 1 then\n    return \"delayed\"\n  elseif rcall(\"ZREM\", prefix .. \"completed\", jobId) == 1 then\n    return \"completed\"\n  elseif rcall(\"ZREM\", prefix .. \"failed\", jobId) == 1 then\n    return \"failed\"\n  end\n  return \"unknown\"\nend\n--[[\n  Function to remove last group id if needed.\n  It will reinsert a group from groups zset\n]]\n-- Reinsert the group with the highest score so that it is moved to the last position\nlocal function reinsertGroup(groupKey, groupsKey, groupId)\n  if rcall(\"LLEN\", groupKey) > 0 then\n    local highscore = rcall(\"ZREVRANGE\", groupsKey, 0, 0, \"withscores\")[2] or 0\n    -- Note, this mechanism could keep increasing the score indefinetely.\n    -- Score can represent 2^53 integers, so approximatelly 285 years adding 1M jobs/second\n    -- before it starts misbehaving.\n    rcall(\"ZADD\", groupsKey, highscore + 1, groupId)\n  end\nend\nlocal function removeLastGroupIdIfNeeded( groupsKey, lgidKey, currentGroupId)\n  if rcall(\"GET\", lgidKey) == currentGroupId then\n    local groupIds = rcall(\"ZPOPMIN\", groupsKey)\n    if #groupIds > 0 then\n      local groupId = groupIds[1]\n      local groupKey = groupsKey .. ':' .. groupId\n      reinsertGroup(groupKey, groupsKey, groupId)\n      rcall(\"SET\", lgidKey, groupId)\n    else\n      rcall(\"DEL\", lgidKey)\n    end\n  end\nend\nlocal function removeJobFromAnyStateOrGroup( prefix, jobId, groupId)\n  local state = removeJobFromAnyState(prefix, jobId)\n  if groupId ~= nil and  state == \"unknown\" then\n    local groupKey = prefix .. \"groups:\" .. groupId\n    if rcall(\"LREM\", groupKey, 0, jobId) == 1 then\n      local groupLen = rcall(\"LLEN\", groupKey)\n      local groupsKey = prefix .. \"groups\"\n      if groupLen == 0 then\n        rcall(\"ZREM\", groupKey, groupId)\n      end\n      removeLastGroupIdIfNeeded(groupsKey, prefix .. \"groups-lid\", groupId)\n      if rcall(\"ZSCORE\", prefix .. \"groups:paused\", groupId) ~= false then\n        return \"paused\"\n      else\n        return \"wait\"\n      end\n    end\n  end\n  return state\nend\nlocal function removeJob( prefix, jobId, parentKey, groupId)\n    local jobKey = prefix .. jobId;\n    removeParentDependencyKey(jobKey, false, parentKey)\n    -- Check if this job has children\n    -- If so, we are going to try to remove the children recursively in deep first way because\n    -- if some job is locked we must exit with and error.\n    local processed = rcall(\"HGETALL\", jobKey .. \":processed\")\n    if (#processed > 0) then\n        for i = 1, #processed, 2 do\n            local childJobId = getJobIdFromKey(processed[i])\n            local childJobPrefix = getJobKeyPrefix(processed[i], childJobId)\n            local childGroupId = rcall(\"HGET\", processed[i], \"gid\")\n            removeJob( childJobPrefix, childJobId, jobKey, childGroupId )\n        end\n    end\n    local dependencies = rcall(\"SMEMBERS\", jobKey .. \":dependencies\")\n    if (#dependencies > 0) then\n        for i, childJobKey in ipairs(dependencies) do\n            -- We need to get the jobId for this job.\n            local childJobId = getJobIdFromKey(childJobKey)\n            local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)\n            local childGroupId = rcall(\"HGET\", childJobKey, \"gid\")\n            removeJob( childJobPrefix, childJobId, jobKey, childGroupId )\n        end\n    end\n    local prev = removeJobFromAnyStateOrGroup(prefix, jobId, groupId)\n    rcall(\"ZREM\", prefix .. \"priority\", jobId)\n    rcall(\"DEL\", jobKey, jobKey .. \":logs\", jobKey .. \":dependencies\", jobKey .. \":processed\")\n    rcall(\"XADD\", prefix .. \"events\", \"*\", \"event\", \"removed\", \"jobId\", jobId, \"prev\", prev);\nend\nlocal prefix = KEYS[1]\nif not isLocked(prefix, ARGV[1]) then\n    local groupId = rcall(\"HGET\", prefix .. ARGV[1], \"gid\")\n    removeJob(prefix, ARGV[1], nil, groupId)\n    return 1\nend\nreturn 0\n",
      "1",
      "bull:job-run-snapshot:",
      "ebff2b4e-58bd-48d2-827d-1e73768ecf45"
    ]
  }
}
manast commented 1 year ago

The fix is available in version 5.1.6.