Getting DEAP working with Spark #268

ryanpeach opened 6 years ago

ryanpeach commented 6 years ago


I'm trying to get DEAP to parallelize across a spark cluster. I have seen this referenced by other users, as it allows for tight integration with existing server architecture easily via yarn. I have followed several tutorials online cited in the references. I have working code for deap, and then code that I have attempted to transform to use spark. The same error 'Can't get attribute Individual on module deap.creator' is the one to usually occur.

Working Code

    import numpy as np
    import random

    from deap import base
    from deap import creator
    from deap import tools
    from deap import algorithms

    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", list, fitness=creator.FitnessMax)

    def evalOneMax(individual):
        return sum(individual),

    toolbox = base.Toolbox()
    toolbox.register("attr_bool", random.randint, 0, 1)
    toolbox.register("individual", tools.initRepeat, creator.Individual,
        toolbox.attr_bool, 100)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", evalOneMax)
    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
    toolbox.register("select", tools.selTournament, tournsize=3)

    # Define parallelism outside main
    if __name__=="__main__":

        pop = toolbox.population(n=300)
        hof = tools.HallOfFame(1)
        stats = tools.Statistics(lambda ind:
        stats.register("avg", np.mean)
        stats.register("std", np.std)
        stats.register("min", np.min)
        stats.register("max", np.max)

        pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                       stats=stats, halloffame=hof, verbose=True)




Not Working Code

    from pyspark import SparkContext

    import numpy as np
    import random

    from deap import base
    from deap import creator
    from deap import tools
    from deap import algorithms

    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", list, fitness=creator.FitnessMax)

    def evalOneMax(individual):
        return sum(individual),

    toolbox = base.Toolbox()
    toolbox.register("attr_bool", random.randint, 0, 1)
    toolbox.register("individual", tools.initRepeat, creator.Individual,
        toolbox.attr_bool, 100)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", evalOneMax)
    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
    toolbox.register("select", tools.selTournament, tournsize=3)

    # Define parallelism outside main
    if __name__=="__main__":
        sc = SparkContext(appName="DEAP")

        def sparkMap(algorithm, population):
            return sc.parallelize(population).map(algorithm).collect()

        toolbox.register("map", sparkMap)

        pop = toolbox.population(n=300)
        hof = tools.HallOfFame(1)
        stats = tools.Statistics(lambda ind:
        stats.register("avg", np.mean)
        stats.register("std", np.std)
        stats.register("min", np.min)
        stats.register("max", np.max)

        pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                       stats=stats, halloffame=hof, verbose=True)


spark-submit --master local


Full Text (pastebin)


    Traceback (most recent call last):
      File "/Users/ryapeach/Documents/Workspace/relay-death/", line 45, in <module>
        stats=stats, halloffame=hof, verbose=True)
      File "/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/", line 150, in eaSimple
        fitnesses =, invalid_ind)
      File "/Users/ryapeach/Documents/Workspace/relay-death/", line 32, in sparkMap
        return sc.parallelize(population).map(algorithm).collect()
    AttributeError: Can't get attribute 'Individual' on <module 'deap.creator' from '/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/'>


cmd-ntrf commented 6 years ago

Hi Ryan,

First, thanks for the detailed issue, it was a pleasant read. Second, we do have a solution for this. It has been sitting in the pull-request list for almost three years, and I am the one to blame. I have taken a too long DEAP hiatus, during which I have been teaching Spark among other things.

Here is the PR :

There a some conflicts since the patch is a bit old, but you should be able to merge without too much effort and then test it with Spark. It should solve your issue. Let us know the outcome. The patch was normally meant for DEAP 2.0, this could give us the necessary motivation to assemble a new release.

Cheers, Felix

ryanpeach commented 6 years ago

Great, thanks @cmd-ntrf.

I’ve been working with deap parallelization for a while now. Unfortunately I’m unable to publish some of my company tutorials on how to do it xD

Needless to say I’ve become somewhat aquainted with the source code and the like.

It would be a good idea I think to expand our documentation on the parallelization options! My one hint would be that I’ve had luck with DEAP on AWS cfncluster using Scoop. I would like to make it automatic with boto, but I wouldn’t be able to publish that either unfortunately without corporate approval :(

Just saying, it was damn hard! And if anyone not bound by an agreement has examples they should publish (and PM me for advice on bug fixes and directions lol)

cmd-ntrf commented 6 years ago

Do you have specific examples of parallelization that would be of value to you? We thought we had it covered, but clearly there are some issues if you found it "damn hard" ;).

Also, I am curious, who do you work for?

ryanpeach commented 6 years ago

Well I think the specific parallelization example (as opposed to something very general like: "Use Scoop") with AWS would be helpful to some people. The problem really isn't as much with DEAP, as it is with the combined 3 documentation, between DEAP, SCOOP, and whatever cloud platform you are working with.

I work with Keysight technologies, I use DEAP mostly for hyperparameter optimization. I'll see after a while if we can't publish some of my tutorials.

jbrant commented 6 years ago

I'm having the same issue and looked at the above referenced pull request, but it appears to address problems with pickling when creator.create is called outside of the global scope; however, in my case (and, it appears, in the OPs example as well), I am calling that function in the global scope and getting a similar exception:

  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\\pyspark\", line 229, in main
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\\pyspark\", line 224, in process
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\\pyspark\", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\brantj\AppData\Local\Continuum\anaconda3\envs\utility\lib\site-packages\pyspark\", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\\pyspark\", line 145, in load_stream
    yield self._read_with_length(stream)
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\\pyspark\", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\\pyspark\", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'Individual' on <module 'deap.creator' from 'C:\\Users\\brantj\\AppData\\Local\\Continuum\\anaconda3\\lib\\site-packages\\deap\\'>

That being said, I'll try merging the pull request and test.

karan10111 commented 6 years ago

I just tried merging the PR. The code which wasn't working for @ryanpeach worked. I couldn't really understand what @jbrant is trying to say. All the tests, seems to be working fine.

karan10111 commented 6 years ago

There was minor issue though, that pull request has dct.iteritems, which I had to change to dct.items(), in the MetaCreator class, for python 3.6.4.

jbrant commented 6 years ago

I just meant that the OP's example called "creator.create" in the global scope, but the PR fix was to allow that call within a local scope, which originally didn't work due to an issue with visibility of pickled objects by worker nodes during distributed execution.

That being said, I was incorrect in assuming that it wouldn't fix the problem - merging PR #76 also got it working for me. I've created PR #280 which incorporates the fix into the latest baseline and it seems to be passing CI checks.

omarcr commented 5 years ago

please see

I am having issue with this as well.

Thanks, Omar

jfrfonseca commented 4 years ago

I created an workaround so that DEAP could work with the Reusable Processes Framework loky.

The example below rewrites DEAP's example

` import array import random import sys

if sys.version_info < (2, 7):
    print("mpga_onemax example requires Python >= 2.7.")

import numpy

from deap import algorithms
from deap import base
from deap import creator
from deap import tools

from loky import get_reusable_executor

# We define the "creator preparation" procedure to be reused by each process
def prepare_creator():
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", array.array, typecode='b', fitness=creator.FitnessMax)

# We RUN the creator preparation procedure so the root process can be prepared

# The following code is the original preparation as for DEAP's example
toolbox = base.Toolbox()

# Attribute generator
toolbox.register("attr_bool", random.randint, 0, 1)

# Structure initializers
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

def evalOneMax(individual):
    return sum(individual),

toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

if __name__ == "__main__":

    # The following code was altered to use LOKY's reusable processes
    # Process Pool of 4 workers
    pool = get_reusable_executor(4, reuse=True)

    # The following code is the mentioned workaround
    # After initiating the pool, we define a "supermap" function that guarantees that every process will execute the creator preparation procedure and then run the mapped function 
    def supermap(*args, **kwargs):
        return*args, **kwargs)

    # We then register our "supermap" function as the parallel directive for DEAP
    toolbox.register("map", supermap)

    # The following code is the same as DEAP's example
    pop = toolbox.population(n=100)
    hof = tools.HallOfFame(1)

    stats = tools.Statistics(lambda ind:
    stats.register("avg", numpy.mean)
    stats.register("std", numpy.std)
    stats.register("min", numpy.min)
    stats.register("max", numpy.max)

    algorithms.eaSimple(pop, toolbox, cxpb=0.5 mutpb=0.2, ngen=40, stats=stats, halloffame=hof)


I believe this approach could also work with PYSPARK, due to its "reusable execution environment" architecture.

It seems that the problem arises from DEAP's unusual design for object scope and memory space, reusing several global objects (for example, the "creator"), that are hard to share among different processes.

berndtlindner commented 4 years ago

Any further feedback on this issue being resolved? Perhaps fixing problems stopping PR #76 from being merged?