imranariffin / aiotaskq

A simple asynchronous task queue
MIT License
4 stars 1 forks source link

Enable configurable max number of tasks at one time #41

Open imranariffin opened 1 year ago

imranariffin commented 1 year ago

Right now each worker works on exactly one task at a time :cry:, so we should add a parameter --max-tasks-per-worker, which I believe is similar to celery --max-tasks-per-child, so that a worker can grab a maximum of N tasks at one time and execute them concurrently. In most cases, 0 < N < some_sensible should be generally better in the async world especially if the tasks are I/O heavy -- we don't want a process to be waiting idly for I/O without doing anything meaningful.

imranariffin commented 1 year ago

We can consider writing this kind of test to manifest this issue.

# tests/apps/simple_app/tasks.py
@aiotaskq.task
async def long_io_task(n: int) -> int:
    await asyncio.sleep(n)
    return n * 42
# tests/test_integration.py
@pytest.mark.asyncio
async def test_sync_and_async_parity__simple_app__long_io(worker: WorkerFixture):
    # Given a simple app running as a worker
    await worker.start(app=simple_app.__name__, concurrency=1)

    class Timer:
        def __enter__(self):
            self.t0 = time.time()
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            self.t1 = time.time()
            self.duration = self.t1 - self.t0

    # Then there should be parity between user process and worker process result of the tasks
    with Timer() as timer_user_process:
        result_user_process = await asyncio.gather(
            simple_app.long_io_task(),
            simple_app.long_io_task(),
            simple_app.long_io_task(),
        )
    print(f"User process took {timer_user_process.duration}")
    with Timer() as timer_worker_process:
        result_worker_process = await asyncio.gather(
            simple_app.long_io_task.apply_async(),
            simple_app.long_io_task.apply_async(),
            simple_app.long_io_task.apply_async(),
        )
    print(f"Worker process took {timer_worker_process.duration}")
    assert result_user_process == result_worker_process
    assert abs(timer_user_process - timer_worker_process) < 0.05

As of current code, running this test will give this error:

RuntimeError: read() called while another coroutine is already waiting for incoming data

Which can be quick-fixed by commenting out the singleton logic in PubSub:

class PubSubSingleton:
    @classmethod
    def get(cls, url: str, poll_interval_s: float, **kwargs) -> IPubSub:
        # if cls._instance:
        #     return cls._instance

But then you will still get another error:

  File "/home/in-gote/workspace/aiotaskq/src/aiotaskq/worker.py", line 225, in _main_loop
    task_result = json.dumps(task_result)
  File "/usr/lib/python3.10/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/lib/python3.10/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type coroutine is not JSON serializable

Which can be quick-fixed by changing how we execute the tasks in worker.py:

                # task_result = task_func(*task_args, **task_kwargs)
                if asyncio.iscoroutinefunction(task_func._f):
                    task_result = await task_func(*task_args, **task_kwargs)
                else:
                    task_result = task_func(*task_args, **task_kwargs)

... You'll probably get more error and the last one should be the test failing on

assert abs(timer_user_process - timer_worker_process) < 0.05

Which shows that we're currently not able to run I/O bound tasks concurrently. We can fix this be ensure the logic of executing and publishing of task in background.