XanaduAI / strawberryfields

Strawberry Fields is a full-stack Python library for designing, simulating, and optimizing continuous variable (CV) quantum optical circuits.
Apache License 2.0
737 stars 188 forks source link

Strawberryfields with ProcessPoolExecutor #744

Open DS-Liu opened 8 months ago

DS-Liu commented 8 months ago

The number of shots can only be set to 1 for 'fock' backend, as described here. Therefore, to obtain 10000 measurement sampes, I need to run the circuit for 10000 times. I have already defined the parameterized program and the engine as attributes of my class GBS . Then I tried to use ProcessPoolExecutor to to run the circuit 10000 times concurrently. However, this error occurs: TypeError: LocalEngine.__new__() missing 1 required positional argument: 'backend.

A MWE is shown as follows:

from concurrent.futures import ProcessPoolExecutor

import strawberryfields as sf
from strawberryfields import ops

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q

        self.prog = prog
        self.eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})

    def test(self, param):
        result = self.eng.run(self.prog, args={'a': param})
        return result

    def run(self):
        with ProcessPoolExecutor(1) as executor:
            future = executor.submit(self.test, 1)

def main():
    gbs = GBS()

if __name__=='__main__':


How can I solve this?

DS-Liu commented 8 months ago

When I instantiated the Engine in the test method rather than in the __init__ method, it worked. Why this happens?

from concurrent.futures import ProcessPoolExecutor

import strawberryfields as sf
from strawberryfields import ops

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q

        self.prog = prog

    def test(self, param):
        eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})
        result = eng.run(self.prog, args={'a': param})
        return result

    def run(self):
        with ProcessPoolExecutor(1) as executor:
            future = executor.submit(self.test, 1)

def main():
    gbs = GBS()

if __name__=='__main__':
isaacdevlugt commented 8 months ago

Hey @DS-Liu! Is there a reason why you can't call test a bunch of times like this?

def main():
    gbs = GBS()
    results = []

    for _ in range(10):

I'm not super familiar with ProcessPoolExecutor, so I'm not sure why your first example wasn't working šŸ¤”

Let me know if this helps!

DS-Liu commented 8 months ago

I want to run the code concurrently instead of a for loop which is quite slow. That's why I use ProcessPoolExecutor. I can't figure out why the Engine instantiated in the main process won't work in the child process. But when instantiated in the child process, it works.

isaacdevlugt commented 8 months ago

I had better luck with getting multiprocessing to at least work (see docs here: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing):

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q

        self.prog = prog
        self.eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})

    def test(self, param):
        result = self.eng.run(self.prog, args={'a': param})
        return result

def main():

    gbs = GBS()
    results = []
    num_runs = 50_000

    parallel_time = time.process_time()

    with Pool() as pool:
        # call the function for each item in parallel
        for _ in range(num_runs):

    parallel_time = time.process_time() - parallel_time

    serial_time = time.process_time()

    for _ in range(num_runs):

    serial_time = time.process_time() - serial_time

    print(parallel_time, serial_time)

if __name__=='__main__':
25.561596 26.430917

Unfortunately the speedup isn't that great when I ran it on my machine. Not sure if I'm using it properly, but maybe this will help šŸ˜„

DS-Liu commented 8 months ago

I had better luck with getting multiprocessing to at least work (see docs here: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing):

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q

        self.prog = prog
        self.eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})

    def test(self, param):
        result = self.eng.run(self.prog, args={'a': param})
        return result

def main():

    gbs = GBS()
    results = []
    num_runs = 50_000

    parallel_time = time.process_time()

    with Pool() as pool:
        # call the function for each item in parallel
        for _ in range(num_runs):

    parallel_time = time.process_time() - parallel_time

    serial_time = time.process_time()

    for _ in range(num_runs):

    serial_time = time.process_time() - serial_time

    print(parallel_time, serial_time)

if __name__=='__main__':
25.561596 26.430917

Unfortunately the speedup isn't that great when I ran it on my machine. Not sure if I'm using it properly, but maybe this will help šŸ˜„

Actually you haven't use Pool object in your code. It can be used by calling pool.map() function. There's no speedup since you're running it serially.

isaacdevlugt commented 8 months ago

Ah! Well, clearly you know more about parallelizing code than me šŸ˜…. In any case, if you use multiprocessing (properly) does it help?