tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Testing your app that uses SAQ #86

Closed grigi closed 1 year ago

grigi commented 1 year ago

I find I'm needing to do a lot of mocking to unit test my app that uses SAQ. It would be great to have a test double of Queue that basically does NOPs.

What would be needed to make this comprehensive enough to be included in the library as test helper?

tobymao commented 1 year ago

imo this is out of scope and it's better to just add redis + saq to your unit tests

the alternative would be you'd have to basically implement redis

i'm not sure what the state of this is yet https://pypi.org/project/fakeredis/

but you could look into it

grigi commented 1 year ago

I am specifically talking of a lightweight test double, not something comprehensive. e.g:

The latter is easier as you can just call it, but generating a valid ctx is something that, if needed, can be done transparently. The prior really just needs to replace the Queue.enque() method.

It's not for testing that saq itself works right, so assumption is that retries, delayed execution all works just fine.

It could probably be done with no replacement of redis itself.

tobymao commented 1 year ago

is using unittest mock not enough?

grigi commented 1 year ago

It's hard, I need the job object back when queueing so basically I need to implement all the logic for that in the mocks.

I have a small prototype:

@patch('magneto.config.Config.queue', new=TestQueue())
class TestSaq(IsolatedAsyncioTestCase):

    async def test_queue(self):
        job = await queue.enqueue('add', val1=3, val2=5, timeout=10)

        self.assertIsInstance(job, Job)
        self.assertEqual(job.function, 'add')
        self.assertEqual(job.timeout, 10)
        self.assertEqual(job.status, Status.QUEUED)
        self.assertEqual(job.kwargs, {'val1': 3, 'val2': 5})

And a simple double:

class TestQueue(Queue):
    def __init__(self) -> None:
        super().__init__(redis=AsyncMock(spec=Redis))

    async def enqueue(self, job_or_func: str | Job, **kwargs: t.Any) -> Job:

        job_kwargs: dict[str, t.Any] = {}

        for k, v in kwargs.items():
            if k in Job.__dataclass_fields__:  # pylint: disable=no-member
                job_kwargs[k] = v
            else:
                job_kwargs.setdefault("kwargs", {})[k] = v

        if isinstance(job_or_func, str):
            job = Job(function=job_or_func, **job_kwargs)
        else:
            job = job_or_func

            for k, v in job_kwargs.items():
                setattr(job, k, v)

        job.queue = self
        job.queued = now()
        job.status = Status.QUEUED

        return job

If there is some small refactoring in the saq.queue.Queue class that TestQueue can be made significantly smaller.

If this is expanded to also make calling tasks a little easier (e.g. generating a valid ctx), and handle queue.apply()/queue.map() it would probably suit the needs of 80% of users.