taskiq-python / taskiq-redis

Broker and result backend for taskiq
MIT License
35 stars 17 forks source link

How to run a redis-server without using a default port-6379 #66

Closed wendellswa06 closed 1 month ago

wendellswa06 commented 1 month ago

`redis_async_result = RedisAsyncResultBackend( redis_url="redis://localhost:6379", )

broker = ListQueueBroker( url="redis://localhost:6379", result_backend=redis_async_result, ) Im gonna run a redis-server on one machine, and workers on other machines can listen tasks for that redis-server. I tried with several methods but failed. i usedservice redis-server start` to run a redis-server. ty

s3rius commented 1 month ago

So, you can replace these urls with the correct one.

wendellswa06 commented 1 month ago

use service redis-server start to run a redis service after that, execution of 'redis-cli' yields 127.0.0.0:6379 this shows that 6379 is accessing port to redis db

wendellswa06 commented 1 month ago

@s3rius still waiting for your response. pls help me out for your earliest convenience

s3rius commented 1 month ago

I don't understand what is the problem, actually. Can you please explain what are you trying to achieve?

wendellswa06 commented 1 month ago

I have 2 servers(A, B) each containing 4 GPUs. I'm gonna distribute tasks through all 8 GPUs on 2 servers using taskiq_redis. To do this, set up a broker on A and one worker per GPU on A and B, i.e. totally 8 workers. I changed the 'localhost:6379' into A's dedicated IP address and port, and started to run workers but workers on both of A and B were not working properly. Not sure whether my explanation is clear or not.

s3rius commented 1 month ago

Here's the architecture of how I would approach this problem.

![taskiq-example](https://github.com/user-attachments/assets/89eeb794-618e-4601-8e87-9a03238bf4db)

This example is for network with subnet mask of 172.20.0.0/16 for simplicity. We have your two instances with dedicated IPs of 172.20.0.2 and 172.20.0.3 they both connect to 172.20.0.1 to receive tasks. The actual application that sends tasks is also connected to this instance of redis. In that case your tkq.py file would be something like this:

from taskiq_redis import ListQueueBroker

from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

redis_async_result = RedisAsyncResultBackend(
    redis_url="redis://172.20.0.1:6379/0",
).with_result_backend(RedisAsyncResultBackend("redis://172.20.0.1:6379/0"))

Or if you don't want to deploy in on a dedicated server, but reuse one of your workers servers, you might want to specify redis URL from your env variables.

import os
from taskiq_redis import ListQueueBroker

from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

REDIS_URL = os.environ.get("REDIS_URL", "127.0.0.1:6379")

redis_async_result = RedisAsyncResultBackend(
    redis_url=f"redis://{REDIS_URL}/0",
).with_result_backend(RedisAsyncResultBackend(f"redis://{REDIS_URL}/0"))

In that case you can deploy redis on one of those machines that handle workloads and connect to it by passing an environment variable with the correct URL. You can still use your IP from the network, but it will be just much faster to connect to loopback (127.0.01) on machine that has redis instance. For example, let's say that redis is deployed on instance A with ip of 172.20.0.2.

In that case on instance A we run:

taskiq worker my_app.tkq:broker
REDIS_URL="172.20.0.2:6379" taskiq worker my_app.tkq:broker
REDIS_URL="172.20.0.2:6379" python your_app_entrypoint.py

They idea is simple, if you want to send tasks to different servers, create one point from which they both can get tasks from.

I hope my explanation has helped you. In case if it hasn't, please write here.

wendellswa06 commented 1 month ago

Really appreciate your great and kind explanation. I'll try based on your explanation.

wendellswa06 commented 1 month ago

If I want host a Redis server on A, how to run a Redis-server with a dedicated IP and port rather than 6379. There should be changes in config, I think.

wendellswa06 commented 1 month ago

I hosted a redis on server A(213.173.109.197) using 17790 port using redis-server --port 17790 And then on the server B, setup like below.

redis_async_result = RedisAsyncResultBackend( redis_url="redis://213.173.109.197:17790/0", )

Or you can use PubSubBroker if you need broadcasting

broker = ListQueueBroker( url="redis://213.173.109.197:17790/0", result_backend=redis_async_result, )

Raised the following error. [2024-07-19 19:28:21,487][taskiq.receiver.receiver][INFO ][worker-0] Listening started. Process worker-0: Traceback (most recent call last): File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap self.run() File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.10/dist-packages/taskiq/cli/worker/run.py", line 148, in start_listen loop.run_until_complete(receiver.listen()) File "/usr/local/lib/python3.10/dist-packages/nest_asyncio.py", line 98, in run_until_complete return f.result() File "/usr/lib/python3.10/asyncio/futures.py", line 201, in result raise self._exception.with_traceback(self._exception_tb) File "/usr/lib/python3.10/asyncio/tasks.py", line 232, in __step result = coro.send(None) File "/usr/local/lib/python3.10/dist-packages/taskiq/receiver/receiver.py", line 333, in listen async with anyio.create_task_group() as gr: File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__ raise exceptions[0] File "/usr/lib/python3.10/asyncio/tasks.py", line 232, in __step result = coro.send(None) File "/usr/local/lib/python3.10/dist-packages/taskiq/receiver/receiver.py", line 361, in prefetcher message = await iterator.__anext__() File "/usr/local/lib/python3.10/dist-packages/taskiq_redis/redis_broker.py", line 128, in listen yield (await redis_conn.brpop(self.queue_name))[ File "/usr/local/lib/python3.10/dist-packages/redis/asyncio/client.py", line 607, in execute_command conn = self.connection or await pool.get_connection(command_name, **options) File "/usr/local/lib/python3.10/dist-packages/redis/asyncio/connection.py", line 1232, in get_connection await self.ensure_connection(connection) File "/usr/local/lib/python3.10/dist-packages/redis/asyncio/connection.py", line 1116, in ensure_connection await connection.connect() File "/usr/local/lib/python3.10/dist-packages/redis/asyncio/connection.py", line 289, in connect await self.on_connect() File "/usr/local/lib/python3.10/dist-packages/redis/asyncio/connection.py", line 401, in on_connect await self.read_response() File "/usr/local/lib/python3.10/dist-packages/redis/asyncio/connection.py", line 541, in read_response response = await self._parser.read_response( File "/usr/local/lib/python3.10/dist-packages/redis/_parsers/resp2.py", line 82, in read_response response = await self._read_response(disable_decoding=disable_decoding) File "/usr/local/lib/python3.10/dist-packages/redis/_parsers/resp2.py", line 90, in _read_response raw = await self._readline() File "/usr/local/lib/python3.10/dist-packages/redis/_parsers/base.py", line 221, in _readline raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) redis.exceptions.ConnectionError: Connection closed by server. Could you please check it?

wendellswa06 commented 1 month ago

@s3rius

sminnee commented 1 month ago

On the face of it, this looks like an issue with your redis server - it's closing the connection prematurely for some reason.

Is there a firewall preventing you from accessing it?

can you connect to it from the server running your taskiq worker with the command-line tool redis-cli -h 213.173.109.197 -p 17790?

is it expecting a username & password, perhaps?

wendellswa06 commented 1 month ago

let me try again, thanks yall