taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.09k stars 400 forks source link

[Bug]: Eviction policy is volatile-lru. It should be "noeviction" #2215

Closed vicpara closed 1 year ago

vicpara commented 1 year ago

Version

4.12.1

Platform

NodeJS

What happened?

I'm trying to connect BullMQ from a node AWS ECS task to Redis Elasticache. I'm getting the following errors right at the service starts when I'm very sure there is no data being pushed right away in the queues. So the DB is not under pressure.:

{
  "command": {
    "name": "eval",
    "args": [
      "--[[\n Move stalled jobs to wait.\n Input:\n KEYS[1] 'stalled' (SET)\n KEYS[2] 'wait', (LIST)\n KEYS[3] 'active', (LIST)\n KEYS[4] 'failed', (ZSET)\n KEYS[5] 'stalled-check', (KEY)\n KEYS[6] 'meta', (KEY)\n KEYS[7] 'paused', (LIST)\n KEYS[8] 'event stream' (STREAM)\n ARGV[1] Max stalled job count\n ARGV[2] queue.toKey('')\n ARGV[3] timestamp\n ARGV[4] max check time\n Events:\n 'stalled' with stalled job id.\n]] -- Includes\n--[[\n Move stalled jobs to wait.\n Input:\n stalledKey 'stalled' (SET)\n waitKey 'wait', (LIST)\n activeKey 'active', (LIST)\n failedKey 'failed', (ZSET)\n stalledCheckKey 'stalled-check', (KEY)\n metaKey 'meta', (KEY)\n pausedKey 'paused', (LIST)\n eventStreamKey 'event stream' (STREAM)\n maxStalledJobCount Max stalled job count\n queueKeyPrefix queue.toKey('')\n timestamp timestamp\n maxCheckTime max check time\n Events:\n 'stalled' with stalled job id.\n]]\nlocal rcall = redis.call\n-- Includes\n--[[\n Function to loop in batches.\n Just a bit of warning, some commands as ZREM\n could receive a maximum of 7000 parameters per call.\n]]\nlocal function batches(n, batchSize)\n local i = 0\n return function()\n local from = i * batchSize + 1\n i = i + 1\n if (from <= n) then\n local to = math.min(from + batchSize - 1, n)\n return from, to\n end\n end\nend\n--[[\n Function to check for the meta.paused key to decide if we are paused or not\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, waitKey, pausedKey)\n if rcall(\"HEXISTS\", queueMetaKey, \"paused\") ~= 1 then\n return waitKey, false\n else\n return pausedKey, true\n end\nend\n--[[\n Function to remove job.\n]]\n-- Includes\n--[[\n Check if this job has a parent. If so we will just remove it from\n the parent child list, but if it is the last child we should move the parent to \"wait/paused\"\n which requires code from \"moveToFinished\"\n]]\n--[[\n Functions to destructure job key.\n Just a bit of warning, these functions may be a bit slow and affect performance significantly.\n]]\nlocal getJobIdFromKey = function (jobKey)\n return string.match(jobKey, \".*:(.*)\")\nend\nlocal getJobKeyPrefix = function (jobKey, jobId)\n return string.sub(jobKey, 0, #jobKey - #jobId)\nend\nlocal function moveParentToWait(parentPrefix, parentId, emitEvent)\n local parentTarget = getTargetQueueList(parentPrefix .. \"meta\", parentPrefix .. \"wait\", parentPrefix .. \"paused\")\n rcall(\"RPUSH\", parentTarget, parentId)\n if emitEvent then\n local parentEventStream = parentPrefix .. \"events\"\n rcall(\"XADD\", parentEventStream, \"*\", \"event\", \"waiting\", \"jobId\", parentId, \"prev\", \"waiting-children\")\n end\nend\nlocal function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)\n if parentKey then\n local parentDependenciesKey = parentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(parentKey)\n local parentPrefix = getJobKeyPrefix(parentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then\n if parentPrefix == baseKey then\n removeParentDependencyKey(parentKey, hard, nil, baseKey)\n rcall(\"DEL\", parentKey, parentKey .. ':logs',\n parentKey .. ':dependencies', parentKey .. ':processed')\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n end\n else\n local missedParentKey = rcall(\"HGET\", jobKey, \"parentKey\")\n if( (type(missedParentKey) == \"string\") and missedParentKey ~= \"\" and (rcall(\"EXISTS\", missedParentKey) == 1)) then\n local parentDependenciesKey = missedParentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(missedParentKey)\n local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then\n if parentPrefix == baseKey then\n removeParentDependencyKey(missedParentKey, hard, nil, baseKey)\n rcall(\"DEL\", missedParentKey, missedParentKey .. ':logs',\n missedParentKey .. ':dependencies', missedParentKey .. ':processed')\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n end\n end\n end\nend\nlocal function removeJob(jobId, hard, baseKey)\n local jobKey = baseKey .. jobId\n removeParentDependencyKey(jobKey, hard, nil, baseKey)\n rcall(\"DEL\", jobKey, jobKey .. ':logs',\n jobKey .. ':dependencies', jobKey .. ':processed')\nend\n--[[\n Functions to remove jobs by max age.\n]]\n-- Includes\nlocal function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)\n local start = timestamp - maxAge * 1000\n local jobIds = rcall(\"ZREVRANGEBYSCORE\", targetSet, start, \"-inf\")\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix)\n end\n rcall(\"ZREMRANGEBYSCORE\", targetSet, \"-inf\", start)\nend\n--[[\n Functions to remove jobs by max count.\n]]\n-- Includes\nlocal function removeJobsByMaxCount(maxCount, targetSet, prefix)\n local start = maxCount\n local jobIds = rcall(\"ZREVRANGE\", targetSet, start, -1)\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix)\n end\n rcall(\"ZREMRANGEBYRANK\", targetSet, 0, -(maxCount + 1))\nend\n--[[\n Function to trim events, default 10000.\n]]\nlocal function trimEvents(metaKey, eventStreamKey)\n local maxEvents = rcall(\"HGET\", metaKey, \"opts.maxLenEvents\")\n if maxEvents ~= false then\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"~\", maxEvents)\n else\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"~\", 10000)\n end\nend\n-- Check if we need to check for stalled jobs now.\nlocal function checkStalledJobs(stalledKey, waitKey, activeKey, failedKey,\n stalledCheckKey, metaKey, pausedKey,\n eventStreamKey, maxStalledJobCount,\n queueKeyPrefix, timestamp, maxCheckTime)\n if rcall(\"EXISTS\", stalledCheckKey) == 1 then return {{}, {}} end\n rcall(\"SET\", stalledCheckKey, timestamp, \"PX\", maxCheckTime)\n -- Trim events before emiting them to avoid trimming events emitted in this script\n trimEvents(metaKey, eventStreamKey)\n -- Move all stalled jobs to wait\n local stalling = rcall('SMEMBERS', stalledKey)\n local stalled = {}\n local failed = {}\n if (#stalling > 0) then\n rcall('DEL', stalledKey)\n local MAX_STALLED_JOB_COUNT = tonumber(maxStalledJobCount)\n -- Remove from active list\n for i, jobId in ipairs(stalling) do\n if string.sub(jobId, 1, 2) == \"0:\" then\n -- If the jobId is a delay marker ID we just remove it.\n rcall(\"LREM\", activeKey, 1, jobId)\n else\n local jobKey = queueKeyPrefix .. jobId\n -- Check that the lock is also missing, then we can handle this job as really stalled.\n if (rcall(\"EXISTS\", jobKey .. \":lock\") == 0) then\n -- Remove from the active queue.\n local removed = rcall(\"LREM\", activeKey, 1, jobId)\n if (removed > 0) then\n -- If this job has been stalled too many times, such as if it crashes the worker, then fail it.\n local stalledCount =\n rcall(\"HINCRBY\", jobKey, \"stalledCounter\", 1)\n if (stalledCount > MAX_STALLED_JOB_COUNT) then\n local rawOpts = rcall(\"HGET\", jobKey, \"opts\")\n local opts = cjson.decode(rawOpts)\n local removeOnFailType = type(opts[\"removeOnFail\"])\n rcall(\"ZADD\", failedKey, timestamp, jobId)\n local failedReason =\n \"job stalled more than allowable limit\"\n rcall(\"HMSET\", jobKey, \"failedReason\", failedReason,\n \"finishedOn\", timestamp)\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"failed\", \"jobId\", jobId, 'prev', 'active',\n 'failedReason', failedReason)\n if removeOnFailType == \"number\" then\n removeJobsByMaxCount(opts[\"removeOnFail\"],\n failedKey, queueKeyPrefix)\n elseif removeOnFailType == \"boolean\" then\n if opts[\"removeOnFail\"] then\n removeJob(jobId, false, queueKeyPrefix)\n rcall(\"ZREM\", failedKey, jobId)\n end\n elseif removeOnFailType ~= \"nil\" then\n local maxAge = opts[\"removeOnFail\"][\"age\"]\n local maxCount = opts[\"removeOnFail\"][\"count\"]\n if maxAge ~= nil then\n removeJobsByMaxAge(timestamp, maxAge,\n failedKey, queueKeyPrefix)\n end\n if maxCount ~= nil and maxCount > 0 then\n removeJobsByMaxCount(maxCount, failedKey,\n queueKeyPrefix)\n end\n end\n table.insert(failed, jobId)\n else\n local target =\n getTargetQueueList(metaKey, waitKey, pausedKey)\n -- Move the job back to the wait queue, to immediately be picked up by a waiting worker.\n rcall(\"RPUSH\", target, jobId)\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"waiting\", \"jobId\", jobId, 'prev', 'active')\n -- Emit the stalled event\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"stalled\", \"jobId\", jobId)\n table.insert(stalled, jobId)\n end\n end\n end\n end\n end\n end\n -- Mark potentially stalled jobs\n local active = rcall('LRANGE', activeKey, 0, -1)\n if (#active > 0) then\n for from, to in batches(#active, 7000) do\n rcall('SADD', stalledKey, unpack(active, from, to))\n end\n end\n return {failed, stalled}\nend\nreturn checkStalledJobs(KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5], KEYS[6],\n KEYS[7], KEYS[8], ARGV[1], ARGV[2], ARGV[3], ARGV[4])\n",
      "8",
      "bull:secondary:stalled",
      "bull:secondary:wait",
      "bull:secondary:active",
      "bull:secondary:failed",
      "bull:secondary:stalled-check",
      "bull:secondary:meta",
      "bull:secondary:paused",
      "bull:secondary:events",
      "1",
      "bull:secondary:",
      "1696496758541",
      "30000"
    ]
  }
}

IMPORTANT! Eviction policy is volatile-lru. It should be "noeviction" IMPORTANT! Eviction policy is volatile-lru. It should be "noeviction" IMPORTANT! Eviction policy is volatile-lru. It should be "noeviction"


{"command":{"name":"eval","args":["--[[\n Move stalled jobs to wait.\n Input:\n KEYS[1] 'stalled' (SET)\n KEYS[2] 'wait', (LIST)\n KEYS[3] 'active', (LIST)\n KEYS[4] 'failed', (ZSET)\n KEYS[5] 'stalled-check', (KEY)\n KEYS[6] 'meta', (KEY)\n KEYS[7] 'paused', (LIST)\n KEYS[8] 'event stream' (STREAM)\n ARGV[1] Max stalled job count\n ARGV[2] queue.toKey('')\n ARGV[3] timestamp\n ARGV[4] max check time\n Events:\n 'stalled' with stalled job id.\n]] -- Includes\n--[[\n Move stalled jobs to wait.\n Input:\n stalledKey 'stalled' (SET)\n waitKey 'wait', (LIST)\n activeKey 'active', (LIST)\n failedKey 'failed', (ZSET)\n stalledCheckKey 'stalled-check', (KEY)\n metaKey 'meta', (KEY)\n pausedKey 'paused', (LIST)\n eventStreamKey 'event stream' (STREAM)\n maxStalledJobCount Max stalled job count\n queueKeyPrefix queue.toKey('')\n timestamp timestamp\n maxCheckTime max check time\n Events:\n 'stalled' with stalled job id.\n]]\nlocal rcall = redis.call\n-- Includes\n--[[\n Function to loop in batches.\n Just a bit of warning, some commands as ZREM\n could receive a maximum of 7000 parameters per call.\n]]\nlocal function batches(n, batchSize)\n local i = 0\n return function()\n local from = i * batchSize + 1\n i = i + 1\n if (from <= n) then\n local to = math.min(from + batchSize - 1, n)\n return from, to\n end\n end\nend\n--[[\n Function to check for the meta.paused key to decide if we are paused or not\n (since an empty list and !EXISTS are not really the same).\n]]\nlocal function getTargetQueueList(queueMetaKey, waitKey, pausedKey)\n if rcall(\"HEXISTS\", queueMetaKey, \"paused\") ~= 1 then\n return waitKey, false\n else\n return pausedKey, true\n end\nend\n--[[\n Function to remove job.\n]]\n-- Includes\n--[[\n Check if this job has a parent. If so we will just remove it from\n the parent child list, but if it is the last child we should move the parent to \"wait/paused\"\n which requires code from \"moveToFinished\"\n]]\n--[[\n Functions to destructure job key.\n Just a bit of warning, these functions may be a bit slow and affect performance significantly.\n]]\nlocal getJobIdFromKey = function (jobKey)\n return string.match(jobKey, \".*:(.*)\")\nend\nlocal getJobKeyPrefix = function (jobKey, jobId)\n return string.sub(jobKey, 0, #jobKey - #jobId)\nend\nlocal function moveParentToWait(parentPrefix, parentId, emitEvent)\n local parentTarget = getTargetQueueList(parentPrefix .. \"meta\", parentPrefix .. \"wait\", parentPrefix .. \"paused\")\n rcall(\"RPUSH\", parentTarget, parentId)\n if emitEvent then\n local parentEventStream = parentPrefix .. \"events\"\n rcall(\"XADD\", parentEventStream, \"*\", \"event\", \"waiting\", \"jobId\", parentId, \"prev\", \"waiting-children\")\n end\nend\nlocal function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)\n if parentKey then\n local parentDependenciesKey = parentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(parentKey)\n local parentPrefix = getJobKeyPrefix(parentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then\n if parentPrefix == baseKey then\n removeParentDependencyKey(parentKey, hard, nil, baseKey)\n rcall(\"DEL\", parentKey, parentKey .. ':logs',\n parentKey .. ':dependencies', parentKey .. ':processed')\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n end\n else\n local missedParentKey = rcall(\"HGET\", jobKey, \"parentKey\")\n if( (type(missedParentKey) == \"string\") and missedParentKey ~= \"\" and (rcall(\"EXISTS\", missedParentKey) == 1)) then\n local parentDependenciesKey = missedParentKey .. \":dependencies\"\n local result = rcall(\"SREM\", parentDependenciesKey, jobKey)\n if result > 0 then\n local pendingDependencies = rcall(\"SCARD\", parentDependenciesKey)\n if pendingDependencies == 0 then\n local parentId = getJobIdFromKey(missedParentKey)\n local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)\n local numRemovedElements = rcall(\"ZREM\", parentPrefix .. \"waiting-children\", parentId)\n if numRemovedElements == 1 then\n if hard then\n if parentPrefix == baseKey then\n removeParentDependencyKey(missedParentKey, hard, nil, baseKey)\n rcall(\"DEL\", missedParentKey, missedParentKey .. ':logs',\n missedParentKey .. ':dependencies', missedParentKey .. ':processed')\n else\n moveParentToWait(parentPrefix, parentId)\n end\n else\n moveParentToWait(parentPrefix, parentId, true)\n end\n end\n end\n end\n end\n end\nend\nlocal function removeJob(jobId, hard, baseKey)\n local jobKey = baseKey .. jobId\n removeParentDependencyKey(jobKey, hard, nil, baseKey)\n rcall(\"DEL\", jobKey, jobKey .. ':logs',\n jobKey .. ':dependencies', jobKey .. ':processed')\nend\n--[[\n Functions to remove jobs by max age.\n]]\n-- Includes\nlocal function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)\n local start = timestamp - maxAge * 1000\n local jobIds = rcall(\"ZREVRANGEBYSCORE\", targetSet, start, \"-inf\")\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix)\n end\n rcall(\"ZREMRANGEBYSCORE\", targetSet, \"-inf\", start)\nend\n--[[\n Functions to remove jobs by max count.\n]]\n-- Includes\nlocal function removeJobsByMaxCount(maxCount, targetSet, prefix)\n local start = maxCount\n local jobIds = rcall(\"ZREVRANGE\", targetSet, start, -1)\n for i, jobId in ipairs(jobIds) do\n removeJob(jobId, false, prefix)\n end\n rcall(\"ZREMRANGEBYRANK\", targetSet, 0, -(maxCount + 1))\nend\n--[[\n Function to trim events, default 10000.\n]]\nlocal function trimEvents(metaKey, eventStreamKey)\n local maxEvents = rcall(\"HGET\", metaKey, \"opts.maxLenEvents\")\n if maxEvents ~= false then\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"~\", maxEvents)\n else\n rcall(\"XTRIM\", eventStreamKey, \"MAXLEN\", \"~\", 10000)\n end\nend\n-- Check if we need to check for stalled jobs now.\nlocal function checkStalledJobs(stalledKey, waitKey, activeKey, failedKey,\n stalledCheckKey, metaKey, pausedKey,\n eventStreamKey, maxStalledJobCount,\n queueKeyPrefix, timestamp, maxCheckTime)\n if rcall(\"EXISTS\", stalledCheckKey) == 1 then return {{}, {}} end\n rcall(\"SET\", stalledCheckKey, timestamp, \"PX\", maxCheckTime)\n -- Trim events before emiting them to avoid trimming events emitted in this script\n trimEvents(metaKey, eventStreamKey)\n -- Move all stalled jobs to wait\n local stalling = rcall('SMEMBERS', stalledKey)\n local stalled = {}\n local failed = {}\n if (#stalling > 0) then\n rcall('DEL', stalledKey)\n local MAX_STALLED_JOB_COUNT = tonumber(maxStalledJobCount)\n -- Remove from active list\n for i, jobId in ipairs(stalling) do\n if string.sub(jobId, 1, 2) == \"0:\" then\n -- If the jobId is a delay marker ID we just remove it.\n rcall(\"LREM\", activeKey, 1, jobId)\n else\n local jobKey = queueKeyPrefix .. jobId\n -- Check that the lock is also missing, then we can handle this job as really stalled.\n if (rcall(\"EXISTS\", jobKey .. \":lock\") == 0) then\n -- Remove from the active queue.\n local removed = rcall(\"LREM\", activeKey, 1, jobId)\n if (removed > 0) then\n -- If this job has been stalled too many times, such as if it crashes the worker, then fail it.\n local stalledCount =\n rcall(\"HINCRBY\", jobKey, \"stalledCounter\", 1)\n if (stalledCount > MAX_STALLED_JOB_COUNT) then\n local rawOpts = rcall(\"HGET\", jobKey, \"opts\")\n local opts = cjson.decode(rawOpts)\n local removeOnFailType = type(opts[\"removeOnFail\"])\n rcall(\"ZADD\", failedKey, timestamp, jobId)\n local failedReason =\n \"job stalled more than allowable limit\"\n rcall(\"HMSET\", jobKey, \"failedReason\", failedReason,\n \"finishedOn\", timestamp)\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"failed\", \"jobId\", jobId, 'prev', 'active',\n 'failedReason', failedReason)\n if removeOnFailType == \"number\" then\n removeJobsByMaxCount(opts[\"removeOnFail\"],\n failedKey, queueKeyPrefix)\n elseif removeOnFailType == \"boolean\" then\n if opts[\"removeOnFail\"] then\n removeJob(jobId, false, queueKeyPrefix)\n rcall(\"ZREM\", failedKey, jobId)\n end\n elseif removeOnFailType ~= \"nil\" then\n local maxAge = opts[\"removeOnFail\"][\"age\"]\n local maxCount = opts[\"removeOnFail\"][\"count\"]\n if maxAge ~= nil then\n removeJobsByMaxAge(timestamp, maxAge,\n failedKey, queueKeyPrefix)\n end\n if maxCount ~= nil and maxCount > 0 then\n removeJobsByMaxCount(maxCount, failedKey,\n queueKeyPrefix)\n end\n end\n table.insert(failed, jobId)\n else\n local target =\n getTargetQueueList(metaKey, waitKey, pausedKey)\n -- Move the job back to the wait queue, to immediately be picked up by a waiting worker.\n rcall(\"RPUSH\", target, jobId)\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"waiting\", \"jobId\", jobId, 'prev', 'active')\n -- Emit the stalled event\n rcall(\"XADD\", eventStreamKey, \"*\", \"event\",\n \"stalled\", \"jobId\", jobId)\n table.insert(stalled, jobId)\n end\n end\n end\n end\n end\n end\n -- Mark potentially stalled jobs\n local active = rcall('LRANGE', activeKey, 0, -1)\n if (#active > 0) then\n for from, to in batches(#active, 7000) do\n rcall('SADD', stalledKey, unpack(active, from, to))\n end\n end\n return {failed, stalled}\nend\nreturn checkStalledJobs(KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5], KEYS[6],\n KEYS[7], KEYS[8], ARGV[1], ARGV[2], ARGV[3], ARGV[4])\n","8","bull:secondary:stalled","bull:secondary:wait","bull:secondary:active","bull:secondary:failed","bull:secondary:stalled-check","bull:secondary:meta","bull:secondary:paused","bull:secondary:events","1","bull:secondary:","1696497591264","30000"]}}

How to reproduce.

Can it be that the errors are caused by a too small Redis instance? The Redis instance i try to deploy to has 0.5Gb ram.

Relevant log output

No response

Code of Conduct

vicpara commented 1 year ago

Ok. It seems the entire configuration for Redis Elasticache is not suitable for using with BullMQ as we may loose data due to the default and unchangeable eviction policy. It all makes sense. Closing this.

manast commented 1 year ago

You can configure the eviction policy in Elasticache, in fact it works pretty well with BullMQ, I have used it myself for years.

vicpara commented 1 year ago

I tried in the latest version and it tells me changing eviction property isn't possible.

manast commented 1 year ago

I think the issue is not that it cannot be changed, but that they do not allow BullMQ to check the correct maxmemory policy: https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/ParameterGroups.Redis.html#ParameterGroups.Redis.4-0-10

manast commented 1 year ago

TLDR; if you know that the eviction policy is correct, you can ignore the warning.

JohnPeberdyRR commented 12 months ago

@manast, sorry for the dumb questions, but is the eviction policy log correct at all? The official redis docs say this:

The policies volatile-lru, volatile-lfu, volatile-random, and volatile-ttl behave like noeviction if there are no keys to evict matching the prerequisites.

In other words, if no keys with expire set to true exist, nothing will be evicted. My newb assumption is that Bullmq has control over whether expire is set to true.

manast commented 12 months ago

@JohnPeberdyRR locks on jobs have expire set, so they could be randomly removed by Redis if choosing any of those policies.

JohnPeberdyRR commented 12 months ago

@manast, thank you for clarifying.