tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Upkeep error #66

Closed sondrelg closed 1 year ago

sondrelg commented 1 year ago

Hi @tobymao!

Out of nowhere, our saq process started emitting Upkeep task failed unexpectedly errors, where the exception logger contains the following traceback:

Traceback (most recent call last):
  File \"/opt/venv/lib/python3.11/site-packages/saq/worker.py\", line 180, in poll
    await func(arg or sleep)
  File \"/opt/venv/lib/python3.11/site-packages/saq/worker.py\", line 167, in schedule
    scheduled = await self.queue.schedule(lock)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/saq/queue.py\", line 248, in schedule
    return await self._schedule_script(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/commands/core.py\", line 4983, in __call__
    return await client.evalsha(self.sha, len(keys), *args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 487, in execute_command
    return await conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/retry.py\", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 463, in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 505, in parse_response
    response = await connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/connection.py\", line 963, in read_response
    raise response from None
redis.exceptions.ResponseError: user_script:12: too many results to unpack script: 15c892840ce49156b99eaf77c367913a8c069ed5, on @user_script:12.

Not sure if this is really a saq issue, but any ideas as to what could be wrong? I've flushed scripts in the redis instance, but the problem persists.

sondrelg commented 1 year ago

Looks like it's failing on this line calling RPUSH.

Flushing all records in redis solved the problem. Is it possible that RPUSH will return multiple items in some cases, or that the number of arguments it can receive is limited?

tobymao commented 1 year ago

how many jobs are you trying to schedule? perhaps you could batch them up?

tobymao commented 1 year ago

this could be a limitation in lua? https://stackoverflow.com/questions/35582647/whats-the-maximum-values-accepted-by-redis-lpush-list-operation

it seems like you can do at least 100k, can you confirm how many values you're trying to schedule?

sondrelg commented 1 year ago

It's possible we've exceeded 100k. Seems to be non-saq related. Thanks @tobymao :+1:

aleixrodriala commented 5 months ago

I'm facing this issue when I accumulate 22k tasks scheduled, is there any workaround for this or it's just not viable for scheduling that many tasks?

tobymao commented 5 months ago

it seems like maybe we need to update the schedule script to batch this up. feel free to create a PR for this, it shouldn't be too difficult but i don't have the bandwidth for it.

aleixrodriala commented 5 months ago

playing with ChatGPT I got this:

if redis.call('EXISTS', KEYS[1]) == 0 then
        redis.call('SETEX', KEYS[1], ARGV[1], 1)
        local jobs = redis.call('ZRANGEBYSCORE', KEYS[2], 1, ARGV[2])
        local limit = 8000  -- Adjust limit based on your environment's capabilities

        local start_index = 1
        while start_index <= #jobs do
            local end_index = math.min(start_index + limit - 1, #jobs)
            local scores = {}

            for i = start_index, end_index do
                table.insert(scores, 0)
                table.insert(scores, jobs[i])
            end

            redis.call('ZADD', KEYS[2], unpack(scores))
            redis.call('RPUSH', KEYS[3], unpack(jobs, start_index, end_index))
            start_index = end_index + 1
        end

        return jobs
    end

But it failed maybe because the 8k limit is too high still, i'll keep testing, meanwhile this one worked but maybe adds too much overhead calling zadd and rpush for each item.

if redis.call('EXISTS', KEYS[1]) == 0 then
    redis.call('SETEX', KEYS[1], ARGV[1], 1)
    local jobs = redis.call('ZRANGEBYSCORE', KEYS[2], 1, ARGV[2])

    if next(jobs) then
        for _, v in ipairs(jobs) do
            redis.call('ZADD', KEYS[2], 0, v)
            redis.call('RPUSH', KEYS[3], v)
        end
    end

    return jobs
end
tobymao commented 5 months ago

why don't you make the limit 1000?

tobymao commented 5 months ago

please don't submit any chat gpt code that you don't fully understand. you can use it as a starting point, but you should deeply understand the reasoning behind every single line of code that you submit in the PR.

aleixrodriala commented 5 months ago

that's why I didn't submit the PR :), I started using the library a week ago, so untill today had no idea what this code did, or how the scheduler works etc

aleixrodriala commented 5 months ago

why don't you make the limit 1000?

still, I thought that if I understood correctly to get the less calls possible the idea would be to batch them as big as the limit is no?

tobymao commented 5 months ago

yea, i guess find a reasonably large number.

tobymao commented 5 months ago

i decided to do the simple approach, it's only 10% slower on a million jobs, taking around 4 seconds. i don't think it's worth the complexity of the lua script since i don't know lua very well.

this is released in v0.12.5

https://github.com/tobymao/saq/commit/314314a1d624b4a08deb1b89ef8a632d0a4c6226