celery / kombu

Messaging library for Python.
BSD 3-Clause "New" or "Revised" License
2.81k stars 920 forks source link

Redis transport - Redelivered messages should respect the original priority #2026

Closed zemek closed 1 week ago

zemek commented 2 weeks ago

When messages are restored on warm shutdown in celery, they get placed into the highest priority queue by default. The appropriate behavior should be to place the task back on the same priority queue it originated from.

zemek commented 2 weeks ago

@auvipy I've updated the existing test_do_restore_message_celery unit test to assert that the message was restored to the same priority queue as what is specified in the original message. Please let me know if this is sufficient. Thank you!

zemek commented 1 week ago

@auvipy I've added integration tests test_publish_requeue_consume() for the scenario where messages get requeued and a higher priority message gets enqueued.

I based this test off the existing test_publish_consume() test.

I've also added the integration test for the other transports to show that those already have the correct behavior. It's only the redis transport that needs to be corrected (e.g. pyamqp and mongodb have the correct behavior)

Let me know if this is sufficient, thanks!

zemek commented 1 week ago

is something wrong with the 3.9 unit test? i noticed it's also failing in another PR https://github.com/celery/kombu/actions/runs/9644718778/job/26613373788?pr=2036 which is just a change to the readme..? @Nusnus @auvipy

Nusnus commented 1 week ago

is something wrong with the 3.9 unit test? i noticed it's also failing in another PR https://github.com/celery/kombu/actions/runs/9644718778/job/26613373788?pr=2036 which is just a change to the readme..? @Nusnus @auvipy

Yeah it appears so. Indeed, I also saw the Readme PR failing and was surprised.

Few days ago I released v5.4.0rc1 and everything was totally fine so it's something very recent.

I'll give it a look tomorrow πŸ™

P.S Feel free to check it out too if you can β™₯️

zemek commented 1 week ago

is something wrong with the 3.9 unit test? i noticed it's also failing in another PR https://github.com/celery/kombu/actions/runs/9644718778/job/26613373788?pr=2036 which is just a change to the readme..? @Nusnus @auvipy

Yeah it appears so. Indeed, I also saw the Readme PR failing and was surprised.

Few days ago I released v5.4.0rc1 and everything was totally fine so it's something very recent.

I'll give it a look tomorrow πŸ™

P.S Feel free to check it out too if you can β™₯️

it seems like nothing is actually failing, but it's just running slow for some reason and timing out... πŸ€” or maybe running out of memory?

oh wait it's consistently failing in the exact same spot, so probably not a timeout issue

Nusnus commented 1 week ago

is something wrong with the 3.9 unit test? i noticed it's also failing in another PR https://github.com/celery/kombu/actions/runs/9644718778/job/26613373788?pr=2036 which is just a change to the readme..? @Nusnus @auvipy

Yeah it appears so. Indeed, I also saw the Readme PR failing and was surprised. Few days ago I released v5.4.0rc1 and everything was totally fine so it's something very recent. I'll give it a look tomorrow πŸ™ P.S Feel free to check it out too if you can β™₯️

it seems like nothing is actually failing, but it's just running slow for some reason and timing out... πŸ€” or maybe running out of memory?

oh wait it's consistently failing in the exact same spot, so probably not a timeout issue

Locally 3.8, 3.12 and 3.9 are passing.

Results (34.69s):
    1177 passed
     168 skipped
Restoring 2 unacknowledged message(s)
  3.8-unit: OK (66.25=setup[29.75]+cmd[36.50] seconds)
  congratulations :) (66.32 seconds)
  Results (33.83s):
    1177 passed
     168 skipped
Restoring 2 unacknowledged message(s)
  3.12-unit: OK (58.78=setup[22.93]+cmd[35.85] seconds)
  congratulations :) (58.85 seconds)
  Results (29.86s):
    1177 passed
     168 skipped
Restoring 2 unacknowledged message(s)
  3.9-unit: OK (52.85=setup[21.15]+cmd[31.70] seconds)
  congratulations :) (52.95 seconds)

Also, the last merged PR has 100% passing CI: https://github.com/celery/kombu/actions/runs/9626076136

So it’s hard to quickly judge what’s wrong.

Nusnus commented 1 week ago

@zemek just for the sake of unblocking you, you can comment out 3.9 in the CI and commit it to your branch, mark this PR as a draft (for the sake of marking it not-ready-for-merge), and then we’ll revert it before merge after the issue is resolved independently. This should at least allow you to run the CI and see if everything else passes, so you won't have to wait on this unclear issue to see if you’re all-good with your own changes.

zemek commented 1 week ago

@zemek just for the sake of unblocking you, you can comment out 3.9 in the CI and commit it to your branch, mark this PR as a draft (for the sake of marking it not-ready-for-merge), and then we’ll revert it before merge after the issue is resolved independently. This should at least allow you to run the CI and see if everything else passes, so you won't have to wait on this unclear issue to see if you’re all-good with your own changes.

sounds good, will do that!

FWIW: https://github.com/celery/kombu/actions/runs/9644113951/job/26595518270 succeeded at June 24, 3:58am PDT https://github.com/celery/kombu/actions/runs/9644718778/job/26597405786 failed at June 24, 4:50am PDT

everything looks pretty much the same between the two runs though... :/

Nusnus commented 1 week ago

Just to be on the safe side, I ran 3.8-3.12 unit tests on your branch locally right now, and they passed successfully.

tox -e 3.8-unit,3.9-unit,3.10-unit,3.11-unit,3.12-unit -p -o

Results (31.24s):
    1177 passed
     168 skipped
Restoring 2 unacknowledged message(s)
3.10-unit: OK βœ” in 34.65 seconds
3.11-unit: OK βœ” in 35.06 seconds
3.9-unit: OK βœ” in 35.71 seconds
3.12-unit: OK βœ” in 35.79 seconds
  3.8-unit: OK (35.94=setup[3.06]+cmd[32.88] seconds)
  3.9-unit: OK (35.71=setup[2.89]+cmd[32.82] seconds)
  3.10-unit: OK (34.65=setup[2.70]+cmd[31.95] seconds)
  3.11-unit: OK (35.06=setup[2.86]+cmd[32.20] seconds)
  3.12-unit: OK (35.79=setup[4.12]+cmd[31.68] seconds)
  congratulations :) (36.05 seconds)

That being said, the first time I tried I got this error - funny, but I think it’s due to running it in parallel (-p arg for tox):

Results (41.23s):
    1177 passed
     168 skipped
Restoring 2 unacknowledged message(s)
3.12-unit: OK βœ” in 1 minute 6.47 seconds
3.8-unit: OK βœ” in 1 minute 6.64 seconds
  3.8-unit: OK (66.64=setup[23.87]+cmd[42.77] seconds)
  3.9-unit: FAIL code 3 (65.10=setup[23.68]+cmd[41.42] seconds)
  3.10-unit: OK (65.26=setup[23.51]+cmd[41.74] seconds)
  3.11-unit: OK (67.00=setup[23.57]+cmd[43.43] seconds)
  3.12-unit: OK (66.47=setup[25.13]+cmd[41.34] seconds)
  evaluation failed :( (67.42 seconds)

Because the error was with code coverage (I/O, probably due to parallel runs):

 t/unit/utils/test_scheduling.py::test_round_robin_cycle βœ“                                                                                                                           97% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
 t/unit/utils/test_scheduling.py::test_priority_cycle βœ“                                                                                                                              97% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
 t/unit/utils/test_scheduling.py::test_sorted_cycle βœ“                                                                                                                                97% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/_pytest/main.py:339: PluggyTeardownRaisedWarning: A plugin raised an exception during an old-style hookwrapper teardown.
Plugin: _cov, Hook: pytest_runtestloop
DataError: Couldn't end data file '/Users/nusnus/dev/GitHub/kombu/.coverage.Tomers-MacBook-Pro.local.90690.XapqHrBx': disk I/O error
For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning
 t/unit/utils/test_time.py::test_maybe_s_to_ms[3-3000] βœ“                                                                                                                             97% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
 t/unit/utils/test_time.py::test_maybe_s_to_ms[3.0-3000] βœ“                                                                                                                           98% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
 t/unit/utils/test_uuid.py::test_UUID.test_uuid βœ“                                                                                                                                   100% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆINTERNALERROR> Traceback (most recent call last):
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/sqlitedb.py", line 99, in __exit__
INTERNALERROR>     self.con.__exit__(exc_type, exc_value, traceback)
INTERNALERROR> sqlite3.OperationalError: disk I/O error
INTERNALERROR> The above exception was the direct cause of the following exception:
INTERNALERROR> Traceback (most recent call last):
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/_pytest/main.py", line 285, in wrap_session
INTERNALERROR>     session.exitstatus = doit(config, session) or 0
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/_pytest/main.py", line 339, in _main
INTERNALERROR>     config.hook.pytest_runtestloop(session=session)
 t/unit/utils/test_time.py::test_maybe_s_to_ms[303-303000] βœ“                                                                                                                         98% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pluggy/_hooks.py", line 513, in __call__
INTERNALERROR>     return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pluggy/_manager.py", line 120, in _hookexec
INTERNALERROR>     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pluggy/_callers.py", line 156, in _multicall
INTERNALERROR>     teardown[0].send(outcome)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pytest_cov/plugin.py", line 339, in pytest_runtestloop
INTERNALERROR>     self.cov_controller.finish()
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pytest_cov/engine.py", line 46, in ensure_topdir_wrapper
INTERNALERROR>     return meth(self, *args, **kwargs)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pytest_cov/engine.py", line 255, in finish
INTERNALERROR>     self.cov.stop()
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/control.py", line 785, in save
INTERNALERROR>     data = self.get_data()
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/control.py", line 865, in get_data
INTERNALERROR>     if self._collector.flush_data():
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/collector.py", line 548, in flush_data
INTERNALERROR>     self.covdata.add_arcs(self.mapped_file_dict(arc_data))
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/sqldata.py", line 123, in _wrapped
INTERNALERROR>     return method(self, *args, **kwargs)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/sqldata.py", line 542, in add_arcs
INTERNALERROR>     con.executemany_void(
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/sqlitedb.py", line 104, in __exit__
INTERNALERROR>     raise DataError(f"Couldn't end data file {self.filename!r}: {exc}") from exc
INTERNALERROR> coverage.exceptions.DataError: Couldn't end data file '/Users/nusnus/dev/GitHub/kombu/.coverage.Tomers-MacBook-Pro.local.90690.XapqHrBx': disk I/O error
 t/unit/utils/test_time.py::test_maybe_s_to_ms[303.33-303330] βœ“                                                                                                                      98% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
 t/unit/utils/test_time.py::test_maybe_s_to_ms[303.333-303333] βœ“                                                                                                                     98% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–Š
Nusnus commented 1 week ago

Hmm… just for fun I tried running it again. Same tox -e 3.8-unit,3.9-unit,3.10-unit,3.11-unit,3.12-unit -p -o Again, only 3.9 failed πŸ€”- still on code cov though.. but it’s by chance I’m sure.

 t/unit/utils/test_uuid.py::test_UUID.test_uuid4 βœ“                                                                                                                                   99% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ
/Users/nusnus/dev/GitHub/kombu/.tox/3.12-unit/lib/python3.12/site-packages/coverage/data.py:180: CoverageWarning: Data file '/Users/nusnus/dev/GitHub/kombu/.coverage.Tomers-MacBook-Pro.local.97662.XwPjIjZx' doesn't seem to be a coverage data file:
/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/_pytest/main.py:339: PluggyTeardownRaisedWarning: A plugin raised an exception during an old-style hookwrapper teardown.
Plugin: _cov, Hook: pytest_runtestloop
FileNotFoundError: [Errno 2] No such file or directory: '/Users/nusnus/dev/GitHub/kombu/.coverage.Tomers-MacBook-Pro.local.97684.XSRXGqSx'
For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning
 t/unit/utils/test_uuid.py::test_UUID.test_uuid βœ“                                                                                                                                   100% β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆINTERNALERROR> Traceback (most recent call last):
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/_pytest/main.py", line 285, in wrap_session
INTERNALERROR>     session.exitstatus = doit(config, session) or 0
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/_pytest/main.py", line 339, in _main
INTERNALERROR>     config.hook.pytest_runtestloop(session=session)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pluggy/_hooks.py", line 513, in __call__
INTERNALERROR>     return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pluggy/_manager.py", line 120, in _hookexec
INTERNALERROR>     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pluggy/_callers.py", line 156, in _multicall
INTERNALERROR>     teardown[0].send(outcome)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pytest_cov/plugin.py", line 339, in pytest_runtestloop
INTERNALERROR>     self.cov_controller.finish()
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pytest_cov/engine.py", line 46, in ensure_topdir_wrapper
INTERNALERROR>     return meth(self, *args, **kwargs)
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/pytest_cov/engine.py", line 255, in finish
INTERNALERROR>     self.cov.stop()
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/control.py", line 837, in combine
INTERNALERROR>     combine_parallel_data(
INTERNALERROR>   File "/Users/nusnus/dev/GitHub/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/data.py", line 162, in combine_parallel_data
INTERNALERROR>     with open(f, "rb") as fobj:
INTERNALERROR> FileNotFoundError: [Errno 2] No such file or directory: '/Users/nusnus/dev/GitHub/kombu/.coverage.Tomers-MacBook-Pro.local.97684.XSRXGqSx’
3.11-unit: OK βœ” in 35.37 seconds
3.12-unit: OK βœ” in 36.16 seconds
  3.8-unit: OK (36.26=setup[2.94]+cmd[33.32] seconds)
  3.9-unit: FAIL code 3 (34.91=setup[2.76]+cmd[32.15] seconds)
  3.10-unit: OK (34.90=setup[2.64]+cmd[32.26] seconds)
  3.11-unit: OK (35.37=setup[2.71]+cmd[32.66] seconds)
  3.12-unit: OK (36.16=setup[4.08]+cmd[32.08] seconds)
  evaluation failed :( (36.37 seconds)
zemek commented 1 week ago

@Nusnus looks like all the integration tests passed πŸ˜„

should i add back py3.9 to CI or leave it out for now?

Nusnus commented 1 week ago

@Nusnus looks like all the integration tests passed πŸ˜„

should i add back py3.9 to CI or leave it out for now?

Haha nice! Leave it until it’s either resolved (I’ll update you) or @auvipy approves this PR - changes-wise. If he says it’s all good, you can revert the commit and we’ll mark it as approved.

Merge will probably be done after it is solved so we’ll get 100% passing CI without skipping 3.9, but let’s clear you out first so we’ll know the work on the PR is at least done & approved.

zemek commented 1 week ago

@auvipy sorry if i'm not supposed to re-request review, just noticed that the requested changes were on the ci.yaml which you already updated

auvipy commented 1 week ago

Yes i committed them on your behalf

auvipy commented 1 week ago

got this failure after merge on 3.10 integration test:

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html =========================== short test summary info ============================ FAILED t/integration/test_py_amqp.py::test_PyAMQPPriority::test_publish_requeue_consume - AssertionError !!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!

Nusnus commented 1 week ago

@auvipy sorry if i'm not supposed to re-request review, just noticed that the requested changes were on the ci.yaml which you already updated

I suspected the 3.9 issue is something that will be β€œself-resolved”. These things tend to come and go by themselves sometimes. @auvipy probably noticed it was fine now and undid the CI patch we discussed about yesterday.

This is normal, don’t worry @zemek.

Nusnus commented 1 week ago

got this failure after merge on 3.10 integration test:

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html =========================== short test summary info ============================ FAILED t/integration/test_py_amqp.py::test_PyAMQPPriority::test_publish_requeue_consume - AssertionError !!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!

The rest of the integration tests are passing - could it be just a flaky issue possibly? I think we should just rerun and see if everything is fine @auvipy - I’m not sure there’s much else to do IMHO πŸ™ The integration tests passed 100% yesterday too, twice. Once after the β€œignore unit-3.9” patch to the CI and once after you undid the changes (before the merge). So I’m pretty positive it’s all-good and it’s just a hiccup. CleanShot 2024-06-25 at 22 04 01@2x

zemek commented 1 week ago

@Nusnus @auvipy hmm I wonder if a sleep() is needed after publishing a new message for pyamqp. I noticed that there is a sleep(0.5) in the original test_publish_consume() test which I did copy over to test_publish_requeue_consume() https://github.com/celery/kombu/blob/d620132ecee40fc021f7a78750dfe01331e8a8c0/t/integration/common.py#L331-L332

but I didn't include this sleep after requeuing and publishing the fourth message. Could that potentially be making it flaky?

Nusnus commented 6 days ago

@Nusnus @auvipy hmm I wonder if a sleep() is needed after publishing a new message for pyamqp. I noticed that there is a sleep(0.5) in the original test_publish_consume() test which I did copy over to test_publish_requeue_consume()


but I didn't include this sleep after requeuing and publishing the fourth message. Could that potentially be making it flaky?

Hmm… TBH I’m not sure - worth a try.

That being said, it looks like 3.9 is back to making troubles: https://github.com/celery/kombu/pull/2043/checks

auvipy commented 6 days ago

It might be a flaky one but lets try...

Nusnus commented 1 day ago

@Nusnus @auvipy hmm I wonder if a sleep() is needed after publishing a new message for pyamqp. I noticed that there is a sleep(0.5) in the original test_publish_consume() test which I did copy over to test_publish_requeue_consume()


but I didn't include this sleep after requeuing and publishing the fourth message. Could that potentially be making it flaky?

Yeah that was the reason :) Fixed in https://github.com/celery/kombu/pull/2048