symbench / symbench-athens-client

Python client for Symbench's UAV workflow runs.
Apache License 2.0
0 stars 2 forks source link

Add support for parallelizing FDM Module :zap:. Closes #103 #104

Closed umesh-timalsina closed 3 years ago

umesh-timalsina commented 3 years ago

Pinning a test script here:


import csv
import time
from multiprocessing import Pool

from symbench_athens_client.fdm_experiments import (
    ExperimentOnTurnigyGraphene6000MAHQuadCopter as experiment,
)

def execute_parallel(fdm_experiment, times=2):
    parallel_pool = Pool(10)
    results = []
    for j in range(times):
        result = parallel_pool.apply_async(
            fdm_experiment.run_for,
            kwds={"parameters": {}, "requirements": {}, "change_dir": True},
        )
        results.append(result)

    parallel_pool.close()
    parallel_pool.join()

    write_csv(
        dict_results=[result.get() for result in results], expr=fdm_experiment
    )

def execute_serial(fdm_experiment, times=2):
    results = []
    for j in range(times):
        result = fdm_experiment.run_for(parameters={}, requirements={}, change_dir=True)
        results.append(result)
    write_csv(dict_results=results, expr=fdm_experiment)

def write_csv(dict_results, expr):
    with open(
        expr.results_dir / "output.csv",
        "w",
        newline="",
    ) as results_csv:
        csv_writer = csv.DictWriter(results_csv, fieldnames=dict_results[0].keys())
        csv_writer.writeheader()
        csv_writer.writerows(dict_results)

if __name__ == "__main__":
    from argparse import ArgumentParser

    parser = ArgumentParser(
        description="Test parallel vs serial execution for the FDM Execution"
    )

    experiment.start_new_session()
    start = time.time()
    execute_parallel(experiment, times=30)
    end = time.time()
    print(
        "***************************** Total Execution Time (Parallel) **********************************"
    )
    print(f"Total Execution Time: {end - start}")
    print(
        {
            "ResultsDir": experiment.results_dir,
            "ExecutionTime": end - start,
            "Parallel": True,
        }
    )
    print(
        "***************************** Total Execution Time **********************************"
    )

    experiment.start_new_session()
    start = time.time()
    execute_serial(experiment, times=30)
    end = time.time()
    print(
        "***************************** Total Execution Time (Serial) **********************************"
    )
    print(f"Total Execution Time: {end - start}")
    print(
        {
            "ResultsDir": experiment.results_dir,
            "ExecutionTime": end - start,
            "Parallel": False,
        }
    )
    print(
        "***************************** Total Execution Time **********************************"
    )

Results

Total Execution Time: 91.31903791427612
{'ResultsDir': WindowsPath('C:/Users/timalsu/symbench-athens-client/results/TurnigyGraphene6000MAHQuadCopter/e-2021-08-25T23-34-24.290303'), 'ExecutionTime': 91.31903
791427612, 'Parallel': True}
Total Execution Time: 494.46925950050354
{'ResultsDir': WindowsPath('C:/Users/timalsu/symbench-athens-client/results/TurnigyGraphene6000MAHQuadCopter/e-2021-08-25T23-35-55.638422'), 'ExecutionTime': 494.4692
5950050354, 'Parallel': False}

With 10 pools and for 30 experiments, we got an improvements of around 5 folds. This number will only go up with the number of pools being higher.

Note:

With this PR there needs to be a change in docker volume (box id), which will be done in subsequent PRs.