dss-extensions / DSS-Python

Native, "direct" Python bindings (interface) and misc tools for a custom implementation of OpenDSS (EPRI Distribution System Simulator). Based on CFFI, DSS C-API, aiming for full COM API-level compatibility on Windows, Linux and MacOS, while providing various extensions.
https://dss-extensions.org/DSS-Python/
BSD 3-Clause "New" or "Revised" License
60 stars 4 forks source link

Parallelization of power flow runs with Python multiprocessing #34

Closed Xiren-Hitachi closed 3 years ago

Xiren-Hitachi commented 3 years ago

I'm using multiprocessing to enable parallelization in Python. However it turns out to me that probably dss python bindings does not support launching multiple OpenDSS engines at backend. I really hope you could help me work it out.

The following is the detailed code:

import os
from multiprocessing import dummy, Pool
import numpy as np
from dss import DSS

def run(load_setting, oltc_setting, cap_setting):
    """
    This function is to run snapshot power flow on IEEE13 and return the results in a dictionary
    :param load_setting: the multiplication coefficient of load of shape (30, ) since we have 15 loads in IEEE13. float between 0.5 and 1.0.
    :param oltc_setting: setting of oltc devices of shape (3, ). float between 0.95 and 1.05.
    :param cap_setting: setting of all capacitors of shape (2, ). int of 0 or 1.
    :return: dict. power flow results
    """
    dss_engine = DSS
    dssText = dss_engine.Text
    dssCircuit = dss_engine.ActiveCircuit

    dss_engine.Start(0)
    dssText.Command = "compile [" + dss_file + "]"
    dssCircuit.SetActiveClass('Load')
    LDNames = dssCircuit.ActiveClass.AllNames

    # apply OLTC and Capacitor tap settings
    dssCircuit.SetActiveClass('Transformer')
    xfm_names = dssCircuit.ActiveClass.AllNames
    for i in range(len(xfm_names)):
        xfm = xfm_names[i]
        dssCircuit.SetActiveElement(xfm)
        dssCircuit.ActiveElement.Properties('tap').Val = str(oltc_setting[i])

    dssCircuit.SetActiveClass('Capacitor')
    cap_names = dssCircuit.ActiveClass.AllNames
    for i in range(len(cap_names)):
        cap = cap_names[i]
        dssCircuit.SetActiveElement(cap)
        dssCircuit.ActiveElement.Properties('states').Val = '[ ' + str(cap_setting[i]) + ']'

    # extract line names
    dssCircuit.SetActiveClass('Line')
    line_names = dssCircuit.ActiveClass.AllNames

    # apply the coefficients to load
    NLD = len(LDNames)

    dssCircuit.SetActiveClass('Load')
    for i in range(len(LDNames)):
        dssCircuit.SetActiveElement(LDNames[i])
        tmp = dssCircuit.ActiveElement.Properties('kW').Val
        dssCircuit.ActiveElement.Properties('kW').Val = str(load_setting[i] * float(tmp))  # real power P
        tmp = dssCircuit.ActiveElement.Properties('kvar').Val
        dssCircuit.ActiveElement.Properties('kvar').Val = str(load_setting[NLD + i] * float(tmp))  # reactive power Q

    # solve for power flow
    dssText.Command = 'Set controlmode = off'
    dssText.Command = 'Solve'

    # extract power flow results
    node = dssCircuit.AllNodeNames  # node name
    volt = np.zeros(len(node))  # Voltage
    ld_pwr = np.zeros((len(LDNames), 2))  # load power
    line_pwr = np.zeros((len(line_names), 2))  # power flow of each line
    xfm_tap = np.zeros(len(xfm_names))  # tap position of transformers
    cap_tap = np.zeros(len(cap_names))  # capacitor switch on/off of capacitors

    tmp = dssCircuit.AllBusVmagPu  # voltage as p.u.
    for i in range(len(tmp)):
        volt[i] = float(tmp[i])

    dssCircuit.SetActiveClass('Load')
    for i in range(len(LDNames)):
        dssCircuit.SetActiveElement(LDNames[i])
        ph = int(dssCircuit.ActiveElement.Properties('phases').Val)  # get phase of the PV
        ind = np.arange(0, 2 * ph - 1, 2)
        tmp = dssCircuit.ActiveElement.Powers  # [P_ph1 Q_ph1 P_ph2 Q_ph2 P_ph3 Q_ph3]

        for j in range(ph):
            ld_pwr[i, 0] += tmp[int(ind[j])]

        ind = np.arange(1, 2 * ph, 2)
        # sum over different phases, reactive power
        for j in range(ph):
            ld_pwr[i, 1] += tmp[int(ind[j])]

    dssCircuit.SetActiveClass('Line')
    for i in range(len(line_names)):
        line = line_names[i]
        dssCircuit.SetActiveElement(line)
        ph = int(dssCircuit.ActiveElement.Properties('phases').Val)  # get phase of the PV
        tmp = dssCircuit.ActiveElement.Powers  # [P_ph1 Q_ph1 P_ph2 Q_ph2 P_ph3 Q_ph3]*2; two terminals of each line

        ind = np.arange(0, 2 * ph - 1, 2)  # 0, 2, 4 for 3 phase line
        for j in range(ph):
            line_pwr[i, 0] += tmp[int(ind[j])]  # sum over different phases, active power

        ind = np.arange(1, 2 * ph, 2)  # 1, 3, 5 for 3 phase line
        # sum over different phases, reactive power
        for j in range(ph):
            line_pwr[i, 1] += tmp[int(ind[j])]

    dssCircuit.SetActiveClass('Transformer')
    for i in range(len(xfm_names)):
        xfm = xfm_names[i]
        dssCircuit.SetActiveElement(xfm)
        xfm_tap[i] = float(dssCircuit.ActiveElement.Properties('tap').Val)

    dssCircuit.SetActiveClass('Capacitor')
    for i in range(len(cap_names)):
        cap = cap_names[i]
        dssCircuit.SetActiveElement(cap)
        tmp = dssCircuit.ActiveElement.Properties('states').Val
        cap_tap[i] = int(tmp[2])

    Loss = dssCircuit.Losses / 1000  # kW, kvar

    # pack all the results
    result = {'NodeName': node,
              'voltage': volt,
              'LDpwr': ld_pwr,
              'Line_pwr': line_pwr,
              'Loss': Loss,
              'xfm_tap': xfm_tap,
              'cap_tap': cap_tap,
              }
    return result

if __name__ == '__main__':
    dss_file = 'circuit/IEEE13/IEEE13Nodeckt.dss'
    dss_file = os.path.abspath(dss_file)

    num_parallel_runs = 4
    load_settings = np.random.uniform(low=0.5, high=1.0, size=(num_parallel_runs, 30))
    oltc_settings = np.random.uniform(low=0.95, high=1.05, size=(num_parallel_runs, 3))
    cap_settings = np.random.randint(2, size=(num_parallel_runs, 2))

    pool = dummy.Pool(num_parallel_runs)  # or do: Pool(num_parallel_runs)
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))

Running the above code will always cause an error. Sometimes I get the error dss._cffi_api_util.DSSException: (8888, 'There is no active circuit! Create a circuit and retry.'), as shown below for example.

Traceback (most recent call last):
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 3418, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-047aea9c6b45>", line 1, in <module>
    runfile('C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py', wdir='C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss')
  File "C:\Users\zhouxi\AppData\Local\JetBrains\PyCharm Community Edition 2020.3.3\plugins\python-ce\helpers\pydev\_pydev_bundle\pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "C:\Users\zhouxi\AppData\Local\JetBrains\PyCharm Community Edition 2020.3.3\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <module>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 771, in get
    raise self._value
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 48, in mapstar
    return list(map(*args))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <lambda>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 33, in run
    LDNames = dssCircuit.ActiveClass.AllNames
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\dss_capi_ir\IActiveClass.py", line 27, in AllNames
    return self.CheckForError(self._get_string_array(self._lib.ActiveClass_Get_AllNames))
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\_cffi_api_util.py", line 81, in CheckForError
    raise DSSException(error_num, self._get_string(self._lib.Error_Get_Description()))
dss._cffi_api_util.DSSException: (8888, 'There is no active circuit! Create a circuit and retry.')

Sometimes I get other types of errors, for example:

Traceback (most recent call last):
...
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <module>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 771, in get
    raise self._value
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 48, in mapstar
    return list(map(*args))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <lambda>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 31, in run
    dssText.Command = "compile [" + dss_file + "]"
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\dss_capi_ir\IText.py", line 22, in Command
    self.CheckForError(self._lib.Text_Set_Command(Value))
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\_cffi_api_util.py", line 81, in CheckForError
    raise DSSException(error_num, self._get_string(self._lib.Error_Get_Description()))
dss._cffi_api_util.DSSException: (303, 'Error 303 Reported From OpenDSS Intrinsic Function: \r\nProcessCommand\r\nException Raised While Processing DSS Command:\r\nNew object=vsource.source Bus1=SourceBus   \r\n\r\nError Description: \r\nAccess violation\r\n\r\nProbable Cause: \r\nError in command string or circuit data.')
Traceback (most recent call last):
 ...
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 771, in get
    raise self._value
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 48, in mapstar
    return list(map(*args))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <lambda>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 31, in run
    dssText.Command = "compile [" + dss_file + "]"
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\dss_capi_ir\IText.py", line 22, in Command
    self.CheckForError(self._lib.Text_Set_Command(Value))
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\_cffi_api_util.py", line 81, in CheckForError
    raise DSSException(error_num, self._get_string(self._lib.Error_Get_Description()))
dss._cffi_api_util.DSSException: (320, 'Unknown parameter "us1" for Object "VSource.source"')
Traceback (most recent call last):
 ...
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <module>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 771, in get
    raise self._value
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 48, in mapstar
    return list(map(*args))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <lambda>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 31, in run
    dssText.Command = "compile [" + dss_file + "]"
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\dss_capi_ir\IText.py", line 22, in Command
    self.CheckForError(self._lib.Text_Set_Command(Value))
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\_cffi_api_util.py", line 81, in CheckForError
    raise DSSException(error_num, self._get_string(self._lib.Error_Get_Description()))
dss._cffi_api_util.DSSException: (302, 'Unknown Command: "new reg" \r\nredirect IEEELineCodes.dss ')
Traceback (most recent call last):
 ...
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <module>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 771, in get
    raise self._value
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\zhouxi\Anaconda3\lib\multiprocessing\pool.py", line 48, in mapstar
    return list(map(*args))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 149, in <lambda>
    results = pool.map(lambda power_flow_setting: run(*power_flow_setting), zip(load_settings, oltc_settings, cap_settings))
  File "C:/Users/zhouxi/PycharmProjects/VVC/powerflow/opendss/parallel_test.py", line 41, in run
    dssCircuit.ActiveElement.Properties('tap').Val = str(oltc_setting[i])
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\dss_capi_ir\IDSSProperty.py", line 51, in __call__
    return self.__getitem__(propname_index)
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\dss_capi_ir\IDSSProperty.py", line 46, in __getitem__
    self.CheckForError(self._lib.DSSProperty_Set_Name(propname_index))
  File "C:\Users\zhouxi\Anaconda3\lib\site-packages\dss\_cffi_api_util.py", line 81, in CheckForError
    raise DSSException(error_num, self._get_string(self._lib.Error_Get_Description()))
dss._cffi_api_util.DSSException: (33003, 'Invalid property name "tap" for "LineCode.1"')

and etc. Only occasionally does the program executes without error.

Apparently parallel simulation runs interfere with one another.

Then I noticed that dss.DSS is basically an instance of class dss.dss_capi_gr.IDSS.IDSS. So I modified my code to instantiate multiple DSS engines (as the following code snippet) to see if they could work independently.

def run(load_setting, oltc_setting, cap_setting):
    from dss import dss_capi_gr, dss_capi_ir, CffiApiUtil, ffi, lib
    api_util = CffiApiUtil(ffi, lib)
    dss_engine = dss_capi_ir.IDSS(api_util)
    ...

However, unfortunately this won't make any difference at all. It seems to me that dss.DSS does not work in a conventional object-oriented way -- in one Python program, there's only one essential engine running at backend even if we instantiate multiple DSS objects.

I also tried out using pool = Pool(num_parallel_runs) instead of pool = dummy.Pool(num_parallel_runs). Also, I tried creating multiple processes/threads explicitly and calling start() and join() to make the parallelization instead of using pool. It turns out there's no difference - The above issue has nothing to do with how I implement with multiprocessing.

Any solution/workaround?

PMeira commented 3 years ago

Can you list your Python and DSS Python versions? In your example, is that an unmodified IEEE13 test case? If so, I could try reproduce the issue.

Then I noticed that dss.DSS is basically an instance of class dss.dss_capi_gr.IDSS.IDSS. So I modified my code to instantiate multiple DSS engines (as the following code snippet) to see if they could work independently.

The current version is a per-process singleton at the Pascal side, so dummy.Pool won't work at all. Some changes should be merged in the coming month to allow creating multiple instances, but I'm working on other features right now.

On Linux, you may need to change the start method multiprocessing.set_start_method('spawn'), but that depends on your code.

We use the built-in multiprocessing, Dask Distributed, and other tools all the time. Unless you need some raw pointers, you don't need to mess with the API internals at all.

See also (ODD.py uses the engine from DSS Python):

PMeira commented 3 years ago

In your example, is that an unmodified IEEE13 test case? If so, I could try reproduce the issue.

Well, gave it a try anyway. I guess you modified the system since it seems to expect only 3 transformers. The more obvious things are not related to DSS Python.

Something like this should work:

dss_file = 'circuit/IEEE13/IEEE13Nodeckt.dss'
dss_file = os.path.abspath(dss_file)

if __name__ == '__main__':
    num_parallel_runs = 4
    load_settings = np.random.uniform(low=0.5, high=1.0, size=(num_parallel_runs, 30))
    oltc_settings = np.random.uniform(low=0.95, high=1.05, size=(num_parallel_runs, 3))
    cap_settings = np.random.randint(2, size=(num_parallel_runs, 2))

    pool = Pool(num_parallel_runs)
    results = pool.starmap(run, zip(load_settings, oltc_settings, cap_settings))

As a general note, this is slow and convoluted:

    dssCircuit.SetActiveClass('Transformer')
    xfm_names = dssCircuit.ActiveClass.AllNames
    for i in range(len(xfm_names)):
        xfm = xfm_names[i]
        dssCircuit.SetActiveElement(xfm)
        dssCircuit.ActiveElement.Properties('tap').Val = str(oltc_setting[i])

That style of code is required only when you don't have access to the properties of the class.

You could use the instead:

    xfm_names = dssCircuit.Transformers.AllNames
    for i, xfm in enumerate(xfm_names):
        dssCircuit.Transformers.Name = xfm
        dssCircuit.Transformers.Tap = oltc_setting[i]

In fact, even simpler would be:

    for i, xfm in enumerate(dssCircuit.Transformers):
        xfm.Tap = oltc_setting[i]

(the usual OpenDSS limitations still apply -- i.e. only one transformer or cktelement can be activated for API usage at a time)

In other scenarios, for performance, you could also use dssCircuit.Transformers.idx to select the transformer by index. The official OpenDSS has that for some components but we extended it for all of them.

Xiren-Hitachi commented 3 years ago

Something like this should work:

dss_file = 'circuit/IEEE13/IEEE13Nodeckt.dss'
dss_file = os.path.abspath(dss_file)

if __name__ == '__main__':
    num_parallel_runs = 4
    load_settings = np.random.uniform(low=0.5, high=1.0, size=(num_parallel_runs, 30))
    oltc_settings = np.random.uniform(low=0.95, high=1.05, size=(num_parallel_runs, 3))
    cap_settings = np.random.randint(2, size=(num_parallel_runs, 2))

    pool = Pool(num_parallel_runs)
    results = pool.starmap(run, zip(load_settings, oltc_settings, cap_settings))

Thank you @PMeira ! I've tried starmap without use of dummy.Pool and lambda function. It solves the issue perfectly! Also, thank you so much for pointing out the slow & convoluted implementation. I'll definitely modify my code.

PS: just so you know...

Can you list your Python and DSS Python versions?

Python version: 3.8.5; DSS Python version: 0.10.7.

In your example, is that an unmodified IEEE13 test case? If so, I could try reproduce the issue.

Actually this was written by my teammates. Anyway the code is simply doing power flow simulations given different settings for loads, oltcs, and capacitors. So I think this is easily reproducible based on any power flow circuit file and on any feeder model.

PMeira commented 3 years ago

It solves the issue perfectly!

That's great! I'll close the issue then.

So I think this is easily reproducible based on any power flow circuit file and on any feeder model.

Thanks for sharing it. It certainly helped to identify the issue.