Open gerazenobi opened 1 year ago
I was able to inject fakeredis this way:
from fakeredis import FakeServer
from fakeredis.aioredis import FakeConnection
from redis.asyncio.connection import ConnectionPool
@pytest.fixture
async def arq_redis():
arq_redis = ArqRedis(
connection_pool=ConnectionPool(
server=FakeServer(),
connection_class=FakeConnection,
)
)
yield arq_redis
await arq_redis.close(close_connection_pool=True)
It works, but requires removing the line with log_redis_info
here https://github.com/samuelcolvin/arq/blob/9109c2e59d2b13fa59d246da03d19d7844a6fa19/arq/worker.py#L348
It uses INFO
command which is not implemented in fakeredis
log_redis_info()
can be patched:
with patch("arq.worker.log_redis_info"): # `fakeredis` does not support `INFO`
try:
yield worker
finally:
await worker.close()
Hi 👋
I was unit testing my functions that interact with Arq, however all the mocking is becoming rather heavy and I find there would be more value if I rather perform more of integration kind of testing; however I wouldn't want to depend on a real Redis and would like to use fakeredis instead: a common library to mock Redis.
I was previously using
RQ
and injectingFakeStrictRedis
in it's setup was trivial, however I haven't found a way to do so withArq
and was wondering if it was possible or not at all.RQ example
```python from redis import Redis from rq import Queue from rq import SimpleWorker from app.config import ( REDIS_DB, REDIS_HOST, REDIS_PASSWORD, REDIS_PORT, REDIS_SSL_ENABLED, ) def start_worker(): redis_connection = Redis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD, ssl=REDIS_SSL_ENABLED ) queue = Queue(connection=redis_connection) worker = SimpleWorker( queues=[queue], connection=redis_connection, ) worker.work() ``` in the above we would replace connection and queue with: ```python mock_redis_connection = FakeStrictRedis() mock_queue = Queue("mock_queue", connection=mock_redis_connection) ```I imagine the Arq counterpart would be something along the lines:
Thanks in advance.