ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

Concurrent Execution Not Working as Expected with RxPY ThreadPoolScheduler #713

Open vjcspy opened 8 months ago

vjcspy commented 8 months ago

Hello,

I'm currently encountering an issue with RxPY where I'm trying to execute multiple tasks concurrently using ThreadPoolScheduler, but they seem to be executing sequentially instead. My goal is to have the tasks "Start Alpha", "Beta", "Gamma" begin almost simultaneously, but there's a 2-second delay between each start, which is not what I was expecting. Below is the relevant part of my code:

import threading
import time

import reactivex
from reactivex import operators as ops
from reactivex.scheduler import ThreadPoolScheduler

from metastock.modules.core.logging.logger import Logger

pool_scheduler = ThreadPoolScheduler(5)

def intense_calculation(value):
    Logger().info(f"Start {value}")
    time.sleep(2)  # Simulate an intensive calculation
    return f"Result {value}"

# Create an Observable
source = reactivex.from_(["Alpha", "Beta", "Gamma"])
source.pipe(
    ops.observe_on(ThreadPoolScheduler(5)),
    ops.map(intense_calculation),
).subscribe(
    on_next=lambda s: Logger().info(
        f"Processed {s} on {threading.current_thread().name}"
    ),
    on_error=lambda e: Logger().info(e),
    on_completed=lambda: Logger().info("Process complete!"),
)

# Wait until all tasks are completed
input("Press any key to exit\n")

Logs are showing that the tasks are starting and finishing sequentially, each 2 seconds apart:

Press any key to exit
[03/24/24 12:05:31] INFO     Start Alpha                           rx_test.py:16
[03/24/24 12:05:33] INFO     Processed Result Alpha on             rx_test.py:27
                             ThreadPoolExecutor-1_0                             
                    INFO     Start Beta                            rx_test.py:16
[03/24/24 12:05:35] INFO     Processed Result Beta on              rx_test.py:27
                             ThreadPoolExecutor-1_1                             
                    INFO     Start Gamma                           rx_test.py:16
[03/24/24 12:05:37] INFO     Processed Result Gamma on             rx_test.py:27
                             ThreadPoolExecutor-1_1                             
                    INFO     Process complete!                     rx_test.py:31

I was under the impression that using ThreadPoolScheduler with a pool size of 5 would allow these tasks to run in parallel, but it appears they are not. I expect "Start Alpha", "Beta", "Gamma" to be logged simultaneously, with the operations happening concurrently on different threads.

Could anyone help identify what might be causing this sequential execution instead of concurrent execution, and how can I adjust my code to achieve the expected parallelism?

Thank you for any insights or suggestions you can provide!

Thank you in advance for your help!

matiboy commented 8 months ago

Hi @vjcspy

For extra information you can refer to @dbrattli 's reply on an older issue

That issue being from v3 times, below is a sample code that should get you the expected parallelism.

def main():
    import time

    import reactivex
    from reactivex import operators as ops
    from reactivex.scheduler import ThreadPoolScheduler

    start = time.time()

    pool_scheduler = ThreadPoolScheduler(5)
    messages = []

    def log(message):
        with threading.Lock():
            messages.append(f"{round(time.time() - start , 1)}: {message}")

    def intense_calculation(value):
        time.sleep(2)
        return f"Computed for {value}"

    # Create an Observable
    source = reactivex.from_(["Alpha", "Beta", "Gamma"])

    source.pipe(
        ops.flat_map(lambda s: reactivex.from_future(pool_scheduler.executor.submit(intense_calculation, s))),
    ).subscribe(
        on_next=lambda s: log(
            f"Processed {s} on {threading.current_thread().name}"
        ),
        on_error=lambda e: log(f"ERROR {e}"),
        on_completed=lambda: log("Process complete!"),
    )

    time.sleep(3)
    log("End")
    print(messages)

Additional issues that discuss this: https://github.com/ReactiveX/RxPY/issues/67

Hope this helps