closeio / tasktiger

Python task queue using Redis
MIT License
1.41k stars 80 forks source link

Synchronous executor #320

Closed thomasst closed 6 months ago

thomasst commented 6 months ago

Not sure if that's the best name for it -- basically just an executor that runs tasks without threads or processes. It should handle hard timeouts via signal from the runner. Can use -x sync or -x fork (default) to specify which executor to use.

Can review by commit.

Quick benchmark:

class TestSyncExecutorWorkerPerformance:
    def test_sync(self, tiger, ensure_queues):
        import time

        ensure_queues()

        for n in range(1000):
            Task(tiger, simple_task).delay()

        ensure_queues(queued={"default": 1000})

        worker = Worker(tiger, executor_class=SyncExecutor)

        start_time = time.time()
        worker.run(once=True)
        end_time = time.time()

        ensure_queues()
        print("sync took", end_time - start_time)

    def test_fork(self, tiger, ensure_queues):
        import time

        ensure_queues()

        for n in range(1000):
            Task(tiger, simple_task).delay()

        ensure_queues(queued={"default": 1000})

        worker = Worker(tiger)

        start_time = time.time()
        worker.run(once=True)
        end_time = time.time()

        ensure_queues()
        print("fork took", end_time - start_time)

gives:

tests/test_workers.py::TestSyncExecutorWorkerPerformance::test_sync sync took 0.4465901851654053
PASSED
tests/test_workers.py::TestSyncExecutorWorkerPerformance::test_fork fork took 4.435322046279907
PASSED

But the other consideration will be checking out connections from an existing connection pool rather than having to re-establish them.

thomasst commented 6 months ago

Do we need to restart the process if a UnixSignalDeathPenalty occurs? (i.e. exit with error)