celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.87k stars 928 forks source link

Bug: Fanout exchange messages mixed across virtual databases in Redis sentinel #1987

Closed huyenvu2101 closed 3 months ago

huyenvu2101 commented 5 months ago

I'm encountering a bug where fanout exchange messages are being mixed between Celery instances even though they are configured to use separate Redis virtual databases.

Description: We are running two Django applications that utilize the same Redis Sentinel for task processing, but with different virtual databases. We observed unexpected behavior where workers from the first Celery instance were logging missed heartbeats from workers in the second Celery instance.

These heartbeat messages were being published to the same Redis channel, as we observed all PUBLISH commands processed by the Redis look like:

1713522462.131041 [0 10.240.201.12:6379] "SELECT" "1"
1713522462.131060 [1 10.240.201.12:6379] "PUBLISH" "/{db}.celeryev/worker.heartbeat" "{\"body\": \"eyJob3N0bmFtZSI6ICJjZWxlcnlAODY0OTg0OGYxNjAzIiwgInV0Y29mZnNldCI6IDAsICJwaWQiOiA3LCAiY2xvY2siOiAxMjQ2MTYxLCAiZnJlcSI6IDIuMCwgImFjdGl2ZSI6IDAsICJwcm9jZXNzZWQiOiA4NjgsICJsb2FkYXZnIjogWzAuMTUsIDAuMjQsIDAuMjldLCAic3dfaWRlbnQiOiAicHktY2VsZXJ5IiwgInN3X3ZlciI6ICI1LjIuNyIsICJzd19zeXMiOiAiTGludXgiLCAidGltZXN0YW1wIjogMTcxMzUyMjQ2Mi4xMjkyNDU1LCAidHlwZSI6ICJ3b3JrZXItaGVhcnRiZWF0In0=\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"hostname\": \"celery@8649848f1603\"}, \"properties\": {\"delivery_mode\": 1, \"delivery_info\": {\"exchange\": \"celeryev\", \"routing_key\": \"worker.heartbeat\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"1c110d46-b084-4233-a303-820207bf7729\"}}"
1713522462.132977 [1 10.240.201.12:6379] "PUBLISH" "/{db}.celeryev/worker.heartbeat" "{\"body\": \"eyJob3N0bmFtZSI6ICJjZWxlcnlAZWIwMGY4NzExMGM0IiwgInV0Y29mZnNldCI6IDAsICJwaWQiOiA3LCAiY2xvY2siOiAxMjQ2MTYzLCAiZnJlcSI6IDIuMCwgImFjdGl2ZSI6IDAsICJwcm9jZXNzZWQiOiAxNzgyNywgImxvYWRhdmciOiBbMC4xNSwgMC4yNCwgMC4yOV0sICJzd19pZGVudCI6ICJweS1jZWxlcnkiLCAic3dfdmVyIjogIjUuMi43IiwgInN3X3N5cyI6ICJMaW51eCIsICJ0aW1lc3RhbXAiOiAxNzEzNTIyNDYyLjEzMTMzOSwgInR5cGUiOiAid29ya2VyLWhlYXJ0YmVhdCJ9\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"hostname\": \"celery@eb00f87110c4\"}, \"properties\": {\"delivery_mode\": 1, \"delivery_info\": {\"exchange\": \"celeryev\", \"routing_key\": \"worker.heartbeat\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"4f9fb170-46df-4031-a52c-c4a682c46a8e\"}}"
1713522462.133438 [1 10.240.201.12:6379] "PUBLISH" "/{db}.celeryev/worker.heartbeat" "{\"body\": \"eyJob3N0bmFtZSI6ICJjZWxlcnlAYTYzMDI4N2QyZTIxIiwgInV0Y29mZnNldCI6IDAsICJwaWQiOiA3LCAiY2xvY2siOiAxMjQ2MTYzLCAiZnJlcSI6IDIuMCwgImFjdGl2ZSI6IDAsICJwcm9jZXNzZWQiOiA1OTYsICJsb2FkYXZnIjogWzQuMiwgMy4xMSwgMi4zOF0sICJzd19pZGVudCI6ICJweS1jZWxlcnkiLCAic3dfdmVyIjogIjUuMi43IiwgInN3X3N5cyI6ICJMaW51eCIsICJ0aW1lc3RhbXAiOiAxNzEzNTIyNDYyLjEzMTU4OTQsICJ0eXBlIjogIndvcmtlci1oZWFydGJlYXQifQ==\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"hostname\": \"celery@a630287d2e21\"}, \"properties\": {\"delivery_mode\": 1, \"delivery_info\": {\"exchange\": \"celeryev\", \"routing_key\": \"worker.heartbeat\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"905f2990-57c5-4809-a04e-92f18afd3c12\"}}"
1713522462.164768 [0 10.240.201.12:29477] "PUBLISH" "__sentinel__:hello" "10.240.201.12,26379,938bc824e34f72b95061209ee579c072b34fdeb1,31,mymaster,10.240.201.12,6379,31"

=> The channel name should ideally have a prefix reflecting the virtual database (/1.celeryev/worker.heartbeat) instead of /{db}.celeryev/worker.heartbeat

Potential Cause: It appears the _get_pool function within the SentinelChannel class is not formatting the keyprefix_fanout variable correctly as the one in Channel:

def _get_pool(self, asynchronous=False):
    params = self._connparams(asynchronous=asynchronous)
    self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
    return redis.ConnectionPool(**params)
tu-pm commented 5 months ago

@Nusnus @auvipy Can you take a look at this?

auvipy commented 5 months ago

I will

Redoubts commented 1 month ago

Has this fix been released?