Closed sunderved closed 10 months ago
Is there a reason to recalculate the fitness even if keep_elitism=1
? This might help us decide a better solution.
A simple solution is to re-implement the cal_pop_fitness()
method as listed below. It disables the part that reuses the fitness when keep_elitism != 0
. This means even if keep_elitism=1
, the fitness function will still be called. Note that you still have to set keep_parents=0
and save_solutions=False
.
This is a full working code that does what you are looking for. The cal_pop_fitness()
method has a print statement to make sure you are calling the right method.
import pygad
import numpy
import sys
from pygad import GA
import concurrent.futures
class Test(GA):
def cal_pop_fitness(self):
"""
Calculating the fitness values of batches of solutions in the current population.
It returns:
-fitness: An array of the calculated fitness values.
"""
try:
if self.valid_parameters == False:
raise Exception("ERROR calling the cal_pop_fitness() method: \nPlease check the parameters passed while creating an instance of the GA class.\n")
# 'last_generation_parents_as_list' is the list version of 'self.last_generation_parents'
# It is used to return the parent index using the 'in' membership operator of Python lists. This is much faster than using 'numpy.where()'.
if self.last_generation_parents is not None:
last_generation_parents_as_list = [
list(gen_parent) for gen_parent in self.last_generation_parents]
# 'last_generation_elitism_as_list' is the list version of 'self.last_generation_elitism'
# It is used to return the elitism index using the 'in' membership operator of Python lists. This is much faster than using 'numpy.where()'.
# if self.last_generation_elitism is not None:
# last_generation_elitism_as_list = [
# list(gen_elitism) for gen_elitism in self.last_generation_elitism]
pop_fitness = ["undefined"] * len(self.population)
if self.parallel_processing is None:
print('\nAAAAAAAAAAAAAAAAAAAAAAA\n')
# Calculating the fitness value of each solution in the current population.
for sol_idx, sol in enumerate(self.population):
# Check if the `save_solutions` parameter is `True` and whether the solution already exists in the `solutions` list. If so, use its fitness rather than calculating it again.
# The functions numpy.any()/numpy.all()/numpy.where()/numpy.equal() are very slow.
# So, list membership operator 'in' is used to check if the solution exists in the 'self.solutions' list.
# Make sure that both the solution and 'self.solutions' are of type 'list' not 'numpy.ndarray'.
# if (self.save_solutions) and (len(self.solutions) > 0) and (numpy.any(numpy.all(self.solutions == numpy.array(sol), axis=1)))
# if (self.save_solutions) and (len(self.solutions) > 0) and (numpy.any(numpy.all(numpy.equal(self.solutions, numpy.array(sol)), axis=1)))
if (self.save_solutions) and (len(self.solutions) > 0) and (list(sol) in self.solutions):
solution_idx = self.solutions.index(list(sol))
fitness = self.solutions_fitness[solution_idx]
elif (self.save_best_solutions) and (len(self.best_solutions) > 0) and (list(sol) in self.best_solutions):
solution_idx = self.best_solutions.index(list(sol))
fitness = self.best_solutions_fitness[solution_idx]
# elif (self.keep_elitism > 0) and (self.last_generation_elitism is not None) and (len(self.last_generation_elitism) > 0) and (list(sol) in last_generation_elitism_as_list):
# # Return the index of the elitism from the elitism array 'self.last_generation_elitism'.
# # This is not its index within the population. It is just its index in the 'self.last_generation_elitism' array.
# elitism_idx = last_generation_elitism_as_list.index(list(sol))
# # Use the returned elitism index to return its index in the last population.
# elitism_idx = self.last_generation_elitism_indices[elitism_idx]
# # Use the elitism's index to return its pre-calculated fitness value.
# fitness = self.previous_generation_fitness[elitism_idx]
# If the solutions are not saved (i.e. `save_solutions=False`), check if this solution is a parent from the previous generation and its fitness value is already calculated. If so, use the fitness value instead of calling the fitness function.
# We cannot use the `numpy.where()` function directly because it does not support the `axis` parameter. This is why the `numpy.all()` function is used to match the solutions on axis=1.
# elif (self.last_generation_parents is not None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
elif ((self.keep_parents == -1) or (self.keep_parents > 0)) and (self.last_generation_parents is not None) and (len(self.last_generation_parents) > 0) and (list(sol) in last_generation_parents_as_list):
# Index of the parent in the 'self.last_generation_parents' array.
# This is not its index within the population. It is just its index in the 'self.last_generation_parents' array.
# parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
parent_idx = last_generation_parents_as_list.index(list(sol))
# Use the returned parent index to return its index in the last population.
parent_idx = self.last_generation_parents_indices[parent_idx]
# Use the parent's index to return its pre-calculated fitness value.
fitness = self.previous_generation_fitness[parent_idx]
else:
# Check if batch processing is used. If not, then calculate this missing fitness value.
if self.fitness_batch_size in [1, None]:
fitness = self.fitness_func(self, sol, sol_idx)
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pass
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pass
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Reaching this point means that batch processing is in effect to calculate the fitness values.
# Do not continue the loop as no fitness is calculated. The fitness will be calculated later in batch mode.
continue
# This is only executed if the fitness value was already calculated.
pop_fitness[sol_idx] = fitness
if self.fitness_batch_size not in [1, None]:
# Reaching this block means that batch fitness calculation is used.
# Indices of the solutions to calculate their fitness.
solutions_indices = [idx for idx, fit in enumerate(pop_fitness) if type(fit) is str and fit == "undefined"]
# Number of batches.
num_batches = int(numpy.ceil(len(solutions_indices) / self.fitness_batch_size))
# For each batch, get its indices and call the fitness function.
for batch_idx in range(num_batches):
batch_first_index = batch_idx * self.fitness_batch_size
batch_last_index = (batch_idx + 1) * self.fitness_batch_size
batch_indices = solutions_indices[batch_first_index:batch_last_index]
batch_solutions = self.population[batch_indices, :]
batch_fitness = self.fitness_func(
self, batch_solutions, batch_indices)
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
elif len(numpy.array(batch_fitness)) != len(batch_indices):
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
for index, fitness in zip(batch_indices, batch_fitness):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Calculating the fitness value of each solution in the current population.
for sol_idx, sol in enumerate(self.population):
# Check if the `save_solutions` parameter is `True` and whether the solution already exists in the `solutions` list. If so, use its fitness rather than calculating it again.
# The functions numpy.any()/numpy.all()/numpy.where()/numpy.equal() are very slow.
# So, list membership operator 'in' is used to check if the solution exists in the 'self.solutions' list.
# Make sure that both the solution and 'self.solutions' are of type 'list' not 'numpy.ndarray'.
if (self.save_solutions) and (len(self.solutions) > 0) and (list(sol) in self.solutions):
solution_idx = self.solutions.index(list(sol))
fitness = self.solutions_fitness[solution_idx]
pop_fitness[sol_idx] = fitness
# elif (self.keep_elitism > 0) and (self.last_generation_elitism is not None) and (len(self.last_generation_elitism) > 0) and (list(sol) in last_generation_elitism_as_list):
# # Return the index of the elitism from the elitism array 'self.last_generation_elitism'.
# # This is not its index within the population. It is just its index in the 'self.last_generation_elitism' array.
# elitism_idx = last_generation_elitism_as_list.index(
# list(sol))
# # Use the returned elitism index to return its index in the last population.
# elitism_idx = self.last_generation_elitism_indices[elitism_idx]
# # Use the elitism's index to return its pre-calculated fitness value.
# fitness = self.previous_generation_fitness[elitism_idx]
# pop_fitness[sol_idx] = fitness
# If the solutions are not saved (i.e. `save_solutions=False`), check if this solution is a parent from the previous generation and its fitness value is already calculated. If so, use the fitness value instead of calling the fitness function.
# We cannot use the `numpy.where()` function directly because it does not support the `axis` parameter. This is why the `numpy.all()` function is used to match the solutions on axis=1.
# elif (self.last_generation_parents is not None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
elif ((self.keep_parents == -1) or (self.keep_parents > 0)) and (self.last_generation_parents is not None) and (len(self.last_generation_parents) > 0) and (list(sol) in last_generation_parents_as_list):
# Index of the parent in the 'self.last_generation_parents' array.
# This is not its index within the population. It is just its index in the 'self.last_generation_parents' array.
# parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
parent_idx = last_generation_parents_as_list.index(
list(sol))
# Use the returned parent index to return its index in the last population.
parent_idx = self.last_generation_parents_indices[parent_idx]
# Use the parent's index to return its pre-calculated fitness value.
fitness = self.previous_generation_fitness[parent_idx]
pop_fitness[sol_idx] = fitness
# Decide which class to use based on whether the user selected "process" or "thread"
if self.parallel_processing[0] == "process":
ExecutorClass = concurrent.futures.ProcessPoolExecutor
else:
ExecutorClass = concurrent.futures.ThreadPoolExecutor
# We can use a with statement to ensure threads are cleaned up promptly (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example)
with ExecutorClass(max_workers=self.parallel_processing[1]) as executor:
solutions_to_submit_indices = []
solutions_to_submit = []
for sol_idx, sol in enumerate(self.population):
# The "undefined" value means that the fitness of this solution must be calculated.
if type(pop_fitness[sol_idx]) is str:
if pop_fitness[sol_idx] == "undefined":
solutions_to_submit.append(sol.copy())
solutions_to_submit_indices.append(sol_idx)
elif type(pop_fitness[sol_idx]) in [list, tuple, numpy.ndarray]:
# This is a multi-objective problem. The fitness is already calculated. Nothing to do.
pass
# Check if batch processing is used. If not, then calculate the fitness value for individual solutions.
if self.fitness_batch_size in [1, None]:
for index, fitness in zip(solutions_to_submit_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), solutions_to_submit, solutions_to_submit_indices)):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Reaching this block means that batch processing is used. The fitness values are calculated in batches.
# Number of batches.
num_batches = int(numpy.ceil(len(solutions_to_submit_indices) / self.fitness_batch_size))
# Each element of the `batches_solutions` list represents the solutions in one batch.
batches_solutions = []
# Each element of the `batches_indices` list represents the solutions' indices in one batch.
batches_indices = []
# For each batch, get its indices and call the fitness function.
for batch_idx in range(num_batches):
batch_first_index = batch_idx * self.fitness_batch_size
batch_last_index = (batch_idx + 1) * self.fitness_batch_size
batch_indices = solutions_to_submit_indices[batch_first_index:batch_last_index]
batch_solutions = self.population[batch_indices, :]
batches_solutions.append(batch_solutions)
batches_indices.append(batch_indices)
for batch_indices, batch_fitness in zip(batches_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), batches_solutions, batches_indices)):
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
elif len(numpy.array(batch_fitness)) != len(batch_indices):
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
for index, fitness in zip(batch_indices, batch_fitness):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value ({fitness}) of type {type(fitness)} found.")
pop_fitness = numpy.array(pop_fitness)
except Exception as ex:
self.logger.exception(ex)
sys.exit(-1)
return pop_fitness
function_inputs = [4,-2,3.5,5,-11,-4.7]
desired_output = 44
def fitness_func(ga_instance, solution, solution_idx):
output = numpy.sum(solution*function_inputs)
solution_fitness = 1.0 / numpy.abs(output - desired_output)
return solution_fitness
num_generations = 2
num_parents_mating = 4
sol_per_pop = 10
num_genes = len(function_inputs)
ga_instance = pygad.GA(num_generations=num_generations,
num_parents_mating=num_parents_mating,
fitness_func=fitness_func,
sol_per_pop=sol_per_pop,
num_genes=num_genes,
suppress_warnings=True)
ga_instance.cal_pop_fitness = Test(num_generations=num_generations,
num_parents_mating=num_parents_mating,
fitness_func=fitness_func,
sol_per_pop=sol_per_pop,
num_genes=num_genes,
suppress_warnings=True).cal_pop_fitness
ga_instance.run()
Thanks for sharing this code. I will definitely try it out.
I suspect that my use case is similar to what is reported here: https://github.com/ahmedfgad/GeneticAlgorithmPython/issues/100
I am working on an AI for a card game. Each AI candidate is a solution in the population. My thought was to evaluate the fitness of the different solutions by making them play many matches against each other. With this approach, the fitness of a given solution is not always going to be same as it depends on the quality of the other solutions. That's why I don't want to reuse the fitness values of the solutions which are passed on to the next generation.
Thanks for clarification. I am asking because your problem is, let's say, semi-deterministic.
If the problem is non-deterministic, then:
keep_parents=0
keep_elitism=0
save_solutions=False
If the problem is deterministic, then the first 2 parameters can have positive values and the last one can be set to True
.
In your case, you want keep_elitism=1
while not re-using the fitness. This is why I think the best way is to implement the cal_pop_fitness()
method to implement your desired behavior.
I certainly work with that. Thanks @ahmedfgad.
@ahmedfgad, I think there is a bug in your code.
I added print statements in the fitness and on_generation callbacks.
My test works fine if I do not overload the cal_pop_fitness function with your version.
import pygad
import numpy
import sys
from pygad import GA
import concurrent.futures
class Test(GA):
def cal_pop_fitness(self):
"""
Calculating the fitness values of batches of solutions in the current population.
It returns:
-fitness: An array of the calculated fitness values.
"""
try:
if self.valid_parameters == False:
raise Exception("ERROR calling the cal_pop_fitness() method: \nPlease check the parameters passed while creating an instance of the GA class.\n")
# 'last_generation_parents_as_list' is the list version of 'self.last_generation_parents'
# It is used to return the parent index using the 'in' membership operator of Python lists. This is much faster than using 'numpy.where()'.
if self.last_generation_parents is not None:
last_generation_parents_as_list = [
list(gen_parent) for gen_parent in self.last_generation_parents]
# 'last_generation_elitism_as_list' is the list version of 'self.last_generation_elitism'
# It is used to return the elitism index using the 'in' membership operator of Python lists. This is much faster than using 'numpy.where()'.
# if self.last_generation_elitism is not None:
# last_generation_elitism_as_list = [
# list(gen_elitism) for gen_elitism in self.last_generation_elitism]
pop_fitness = ["undefined"] * len(self.population)
if self.parallel_processing is None:
print('\nAAAAAAAAAAAAAAAAAAAAAAA\n')
# Calculating the fitness value of each solution in the current population.
for sol_idx, sol in enumerate(self.population):
# Check if the `save_solutions` parameter is `True` and whether the solution already exists in the `solutions` list. If so, use its fitness rather than calculating it again.
# The functions numpy.any()/numpy.all()/numpy.where()/numpy.equal() are very slow.
# So, list membership operator 'in' is used to check if the solution exists in the 'self.solutions' list.
# Make sure that both the solution and 'self.solutions' are of type 'list' not 'numpy.ndarray'.
# if (self.save_solutions) and (len(self.solutions) > 0) and (numpy.any(numpy.all(self.solutions == numpy.array(sol), axis=1)))
# if (self.save_solutions) and (len(self.solutions) > 0) and (numpy.any(numpy.all(numpy.equal(self.solutions, numpy.array(sol)), axis=1)))
if (self.save_solutions) and (len(self.solutions) > 0) and (list(sol) in self.solutions):
solution_idx = self.solutions.index(list(sol))
fitness = self.solutions_fitness[solution_idx]
elif (self.save_best_solutions) and (len(self.best_solutions) > 0) and (list(sol) in self.best_solutions):
solution_idx = self.best_solutions.index(list(sol))
fitness = self.best_solutions_fitness[solution_idx]
# elif (self.keep_elitism > 0) and (self.last_generation_elitism is not None) and (len(self.last_generation_elitism) > 0) and (list(sol) in last_generation_elitism_as_list):
# # Return the index of the elitism from the elitism array 'self.last_generation_elitism'.
# # This is not its index within the population. It is just its index in the 'self.last_generation_elitism' array.
# elitism_idx = last_generation_elitism_as_list.index(list(sol))
# # Use the returned elitism index to return its index in the last population.
# elitism_idx = self.last_generation_elitism_indices[elitism_idx]
# # Use the elitism's index to return its pre-calculated fitness value.
# fitness = self.previous_generation_fitness[elitism_idx]
# If the solutions are not saved (i.e. `save_solutions=False`), check if this solution is a parent from the previous generation and its fitness value is already calculated. If so, use the fitness value instead of calling the fitness function.
# We cannot use the `numpy.where()` function directly because it does not support the `axis` parameter. This is why the `numpy.all()` function is used to match the solutions on axis=1.
# elif (self.last_generation_parents is not None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
elif ((self.keep_parents == -1) or (self.keep_parents > 0)) and (self.last_generation_parents is not None) and (len(self.last_generation_parents) > 0) and (list(sol) in last_generation_parents_as_list):
# Index of the parent in the 'self.last_generation_parents' array.
# This is not its index within the population. It is just its index in the 'self.last_generation_parents' array.
# parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
parent_idx = last_generation_parents_as_list.index(list(sol))
# Use the returned parent index to return its index in the last population.
parent_idx = self.last_generation_parents_indices[parent_idx]
# Use the parent's index to return its pre-calculated fitness value.
fitness = self.previous_generation_fitness[parent_idx]
else:
# Check if batch processing is used. If not, then calculate this missing fitness value.
if self.fitness_batch_size in [1, None]:
fitness = self.fitness_func(self, sol, sol_idx)
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pass
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pass
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Reaching this point means that batch processing is in effect to calculate the fitness values.
# Do not continue the loop as no fitness is calculated. The fitness will be calculated later in batch mode.
continue
# This is only executed if the fitness value was already calculated.
pop_fitness[sol_idx] = fitness
if self.fitness_batch_size not in [1, None]:
# Reaching this block means that batch fitness calculation is used.
# Indices of the solutions to calculate their fitness.
solutions_indices = [idx for idx, fit in enumerate(pop_fitness) if type(fit) is str and fit == "undefined"]
# Number of batches.
num_batches = int(numpy.ceil(len(solutions_indices) / self.fitness_batch_size))
# For each batch, get its indices and call the fitness function.
for batch_idx in range(num_batches):
batch_first_index = batch_idx * self.fitness_batch_size
batch_last_index = (batch_idx + 1) * self.fitness_batch_size
batch_indices = solutions_indices[batch_first_index:batch_last_index]
batch_solutions = self.population[batch_indices, :]
batch_fitness = self.fitness_func(
self, batch_solutions, batch_indices)
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
elif len(numpy.array(batch_fitness)) != len(batch_indices):
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
for index, fitness in zip(batch_indices, batch_fitness):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Calculating the fitness value of each solution in the current population.
for sol_idx, sol in enumerate(self.population):
# Check if the `save_solutions` parameter is `True` and whether the solution already exists in the `solutions` list. If so, use its fitness rather than calculating it again.
# The functions numpy.any()/numpy.all()/numpy.where()/numpy.equal() are very slow.
# So, list membership operator 'in' is used to check if the solution exists in the 'self.solutions' list.
# Make sure that both the solution and 'self.solutions' are of type 'list' not 'numpy.ndarray'.
if (self.save_solutions) and (len(self.solutions) > 0) and (list(sol) in self.solutions):
solution_idx = self.solutions.index(list(sol))
fitness = self.solutions_fitness[solution_idx]
pop_fitness[sol_idx] = fitness
# elif (self.keep_elitism > 0) and (self.last_generation_elitism is not None) and (len(self.last_generation_elitism) > 0) and (list(sol) in last_generation_elitism_as_list):
# # Return the index of the elitism from the elitism array 'self.last_generation_elitism'.
# # This is not its index within the population. It is just its index in the 'self.last_generation_elitism' array.
# elitism_idx = last_generation_elitism_as_list.index(
# list(sol))
# # Use the returned elitism index to return its index in the last population.
# elitism_idx = self.last_generation_elitism_indices[elitism_idx]
# # Use the elitism's index to return its pre-calculated fitness value.
# fitness = self.previous_generation_fitness[elitism_idx]
# pop_fitness[sol_idx] = fitness
# If the solutions are not saved (i.e. `save_solutions=False`), check if this solution is a parent from the previous generation and its fitness value is already calculated. If so, use the fitness value instead of calling the fitness function.
# We cannot use the `numpy.where()` function directly because it does not support the `axis` parameter. This is why the `numpy.all()` function is used to match the solutions on axis=1.
# elif (self.last_generation_parents is not None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
elif ((self.keep_parents == -1) or (self.keep_parents > 0)) and (self.last_generation_parents is not None) and (len(self.last_generation_parents) > 0) and (list(sol) in last_generation_parents_as_list):
# Index of the parent in the 'self.last_generation_parents' array.
# This is not its index within the population. It is just its index in the 'self.last_generation_parents' array.
# parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
parent_idx = last_generation_parents_as_list.index(
list(sol))
# Use the returned parent index to return its index in the last population.
parent_idx = self.last_generation_parents_indices[parent_idx]
# Use the parent's index to return its pre-calculated fitness value.
fitness = self.previous_generation_fitness[parent_idx]
pop_fitness[sol_idx] = fitness
# Decide which class to use based on whether the user selected "process" or "thread"
if self.parallel_processing[0] == "process":
ExecutorClass = concurrent.futures.ProcessPoolExecutor
else:
ExecutorClass = concurrent.futures.ThreadPoolExecutor
# We can use a with statement to ensure threads are cleaned up promptly (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example)
with ExecutorClass(max_workers=self.parallel_processing[1]) as executor:
solutions_to_submit_indices = []
solutions_to_submit = []
for sol_idx, sol in enumerate(self.population):
# The "undefined" value means that the fitness of this solution must be calculated.
if type(pop_fitness[sol_idx]) is str:
if pop_fitness[sol_idx] == "undefined":
solutions_to_submit.append(sol.copy())
solutions_to_submit_indices.append(sol_idx)
elif type(pop_fitness[sol_idx]) in [list, tuple, numpy.ndarray]:
# This is a multi-objective problem. The fitness is already calculated. Nothing to do.
pass
# Check if batch processing is used. If not, then calculate the fitness value for individual solutions.
if self.fitness_batch_size in [1, None]:
for index, fitness in zip(solutions_to_submit_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), solutions_to_submit, solutions_to_submit_indices)):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Reaching this block means that batch processing is used. The fitness values are calculated in batches.
# Number of batches.
num_batches = int(numpy.ceil(len(solutions_to_submit_indices) / self.fitness_batch_size))
# Each element of the `batches_solutions` list represents the solutions in one batch.
batches_solutions = []
# Each element of the `batches_indices` list represents the solutions' indices in one batch.
batches_indices = []
# For each batch, get its indices and call the fitness function.
for batch_idx in range(num_batches):
batch_first_index = batch_idx * self.fitness_batch_size
batch_last_index = (batch_idx + 1) * self.fitness_batch_size
batch_indices = solutions_to_submit_indices[batch_first_index:batch_last_index]
batch_solutions = self.population[batch_indices, :]
batches_solutions.append(batch_solutions)
batches_indices.append(batch_indices)
for batch_indices, batch_fitness in zip(batches_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), batches_solutions, batches_indices)):
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
elif len(numpy.array(batch_fitness)) != len(batch_indices):
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
for index, fitness in zip(batch_indices, batch_fitness):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value ({fitness}) of type {type(fitness)} found.")
pop_fitness = numpy.array(pop_fitness)
except Exception as ex:
self.logger.exception(ex)
sys.exit(-1)
return pop_fitness
function_inputs = [4,-2,3.5,5,-11,-4.7]
desired_output = 44
def fitness_func(ga_instance, solution, solution_idx):
output = numpy.sum(solution*function_inputs)
solution_fitness = 1.0 / numpy.abs(output - desired_output)
print(' ', ga_instance.generations_completed, solution_idx, solution_fitness, output, solution)
return solution_fitness
# Function called at the end of each generation
def callback_generation(ga_instance):
# pop_fitness=ga_instance.last_generation_fitness
solution, solution_fitness, solution_idx = ga_instance.best_solution(pop_fitness=ga_instance.last_generation_fitness)
output = numpy.sum(solution*function_inputs)
print(' EOG', ga_instance.generations_completed, solution_idx, solution_fitness, output, solution)
num_generations = 4
num_parents_mating = 4
sol_per_pop = 10
num_genes = len(function_inputs)
stop_criteria = "reach_10"
ga_instance = pygad.GA(num_generations=num_generations,
num_parents_mating=num_parents_mating,
fitness_func=fitness_func,
sol_per_pop=sol_per_pop,
num_genes=num_genes,
suppress_warnings=False,
stop_criteria=stop_criteria,
on_generation=callback_generation)
ga_instance.cal_pop_fitness = Test(num_generations=num_generations,
num_parents_mating=num_parents_mating,
fitness_func=fitness_func,
sol_per_pop=sol_per_pop,
num_genes=num_genes,
suppress_warnings=False,
stop_criteria=stop_criteria,
on_generation=callback_generation).cal_pop_fitness
ga_instance.run()
My fault!
This is the updated code. I just removed some commented lines.
Please check if it has any issues.
import numpy
import sys
from pygad import GA
import concurrent.futures
class NewGA(GA):
def cal_pop_fitness(self):
"""
Calculating the fitness values of batches of solutions in the current population.
It returns:
-fitness: An array of the calculated fitness values.
"""
try:
if self.valid_parameters == False:
raise Exception("ERROR calling the cal_pop_fitness() method: \nPlease check the parameters passed while creating an instance of the GA class.\n")
# 'last_generation_parents_as_list' is the list version of 'self.last_generation_parents'
# It is used to return the parent index using the 'in' membership operator of Python lists. This is much faster than using 'numpy.where()'.
if self.last_generation_parents is not None:
last_generation_parents_as_list = [
list(gen_parent) for gen_parent in self.last_generation_parents]
pop_fitness = ["undefined"] * len(self.population)
if self.parallel_processing is None:
print('\nAAAAAAAAAAAAAAAAAAAAAAA\n')
# Calculating the fitness value of each solution in the current population.
for sol_idx, sol in enumerate(self.population):
# Check if the `save_solutions` parameter is `True` and whether the solution already exists in the `solutions` list. If so, use its fitness rather than calculating it again.
# The functions numpy.any()/numpy.all()/numpy.where()/numpy.equal() are very slow.
# So, list membership operator 'in' is used to check if the solution exists in the 'self.solutions' list.
# Make sure that both the solution and 'self.solutions' are of type 'list' not 'numpy.ndarray'.
if (self.save_solutions) and (len(self.solutions) > 0) and (list(sol) in self.solutions):
solution_idx = self.solutions.index(list(sol))
fitness = self.solutions_fitness[solution_idx]
elif (self.save_best_solutions) and (len(self.best_solutions) > 0) and (list(sol) in self.best_solutions):
solution_idx = self.best_solutions.index(list(sol))
fitness = self.best_solutions_fitness[solution_idx]
# If the solutions are not saved (i.e. `save_solutions=False`), check if this solution is a parent from the previous generation and its fitness value is already calculated. If so, use the fitness value instead of calling the fitness function.
# We cannot use the `numpy.where()` function directly because it does not support the `axis` parameter. This is why the `numpy.all()` function is used to match the solutions on axis=1.
# elif (self.last_generation_parents is not None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
elif ((self.keep_parents == -1) or (self.keep_parents > 0)) and (self.last_generation_parents is not None) and (len(self.last_generation_parents) > 0) and (list(sol) in last_generation_parents_as_list):
# Index of the parent in the 'self.last_generation_parents' array.
# This is not its index within the population. It is just its index in the 'self.last_generation_parents' array.
# parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
parent_idx = last_generation_parents_as_list.index(list(sol))
# Use the returned parent index to return its index in the last population.
parent_idx = self.last_generation_parents_indices[parent_idx]
# Use the parent's index to return its pre-calculated fitness value.
fitness = self.previous_generation_fitness[parent_idx]
else:
# Check if batch processing is used. If not, then calculate this missing fitness value.
if self.fitness_batch_size in [1, None]:
fitness = self.fitness_func(self, sol, sol_idx)
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pass
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pass
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Reaching this point means that batch processing is in effect to calculate the fitness values.
# Do not continue the loop as no fitness is calculated. The fitness will be calculated later in batch mode.
continue
# This is only executed if the fitness value was already calculated.
pop_fitness[sol_idx] = fitness
if self.fitness_batch_size not in [1, None]:
# Reaching this block means that batch fitness calculation is used.
# Indices of the solutions to calculate their fitness.
solutions_indices = [idx for idx, fit in enumerate(pop_fitness) if type(fit) is str and fit == "undefined"]
# Number of batches.
num_batches = int(numpy.ceil(len(solutions_indices) / self.fitness_batch_size))
# For each batch, get its indices and call the fitness function.
for batch_idx in range(num_batches):
batch_first_index = batch_idx * self.fitness_batch_size
batch_last_index = (batch_idx + 1) * self.fitness_batch_size
batch_indices = solutions_indices[batch_first_index:batch_last_index]
batch_solutions = self.population[batch_indices, :]
batch_fitness = self.fitness_func(
self, batch_solutions, batch_indices)
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
elif len(numpy.array(batch_fitness)) != len(batch_indices):
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
for index, fitness in zip(batch_indices, batch_fitness):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Calculating the fitness value of each solution in the current population.
for sol_idx, sol in enumerate(self.population):
# Check if the `save_solutions` parameter is `True` and whether the solution already exists in the `solutions` list. If so, use its fitness rather than calculating it again.
# The functions numpy.any()/numpy.all()/numpy.where()/numpy.equal() are very slow.
# So, list membership operator 'in' is used to check if the solution exists in the 'self.solutions' list.
# Make sure that both the solution and 'self.solutions' are of type 'list' not 'numpy.ndarray'.
if (self.save_solutions) and (len(self.solutions) > 0) and (list(sol) in self.solutions):
solution_idx = self.solutions.index(list(sol))
fitness = self.solutions_fitness[solution_idx]
pop_fitness[sol_idx] = fitness
# If the solutions are not saved (i.e. `save_solutions=False`), check if this solution is a parent from the previous generation and its fitness value is already calculated. If so, use the fitness value instead of calling the fitness function.
# We cannot use the `numpy.where()` function directly because it does not support the `axis` parameter. This is why the `numpy.all()` function is used to match the solutions on axis=1.
# elif (self.last_generation_parents is not None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
elif ((self.keep_parents == -1) or (self.keep_parents > 0)) and (self.last_generation_parents is not None) and (len(self.last_generation_parents) > 0) and (list(sol) in last_generation_parents_as_list):
# Index of the parent in the 'self.last_generation_parents' array.
# This is not its index within the population. It is just its index in the 'self.last_generation_parents' array.
# parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
parent_idx = last_generation_parents_as_list.index(
list(sol))
# Use the returned parent index to return its index in the last population.
parent_idx = self.last_generation_parents_indices[parent_idx]
# Use the parent's index to return its pre-calculated fitness value.
fitness = self.previous_generation_fitness[parent_idx]
pop_fitness[sol_idx] = fitness
# Decide which class to use based on whether the user selected "process" or "thread"
if self.parallel_processing[0] == "process":
ExecutorClass = concurrent.futures.ProcessPoolExecutor
else:
ExecutorClass = concurrent.futures.ThreadPoolExecutor
# We can use a with statement to ensure threads are cleaned up promptly (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example)
with ExecutorClass(max_workers=self.parallel_processing[1]) as executor:
solutions_to_submit_indices = []
solutions_to_submit = []
for sol_idx, sol in enumerate(self.population):
# The "undefined" value means that the fitness of this solution must be calculated.
if type(pop_fitness[sol_idx]) is str:
if pop_fitness[sol_idx] == "undefined":
solutions_to_submit.append(sol.copy())
solutions_to_submit_indices.append(sol_idx)
elif type(pop_fitness[sol_idx]) in [list, tuple, numpy.ndarray]:
# This is a multi-objective problem. The fitness is already calculated. Nothing to do.
pass
# Check if batch processing is used. If not, then calculate the fitness value for individual solutions.
if self.fitness_batch_size in [1, None]:
for index, fitness in zip(solutions_to_submit_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), solutions_to_submit, solutions_to_submit_indices)):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.")
else:
# Reaching this block means that batch processing is used. The fitness values are calculated in batches.
# Number of batches.
num_batches = int(numpy.ceil(len(solutions_to_submit_indices) / self.fitness_batch_size))
# Each element of the `batches_solutions` list represents the solutions in one batch.
batches_solutions = []
# Each element of the `batches_indices` list represents the solutions' indices in one batch.
batches_indices = []
# For each batch, get its indices and call the fitness function.
for batch_idx in range(num_batches):
batch_first_index = batch_idx * self.fitness_batch_size
batch_last_index = (batch_idx + 1) * self.fitness_batch_size
batch_indices = solutions_to_submit_indices[batch_first_index:batch_last_index]
batch_solutions = self.population[batch_indices, :]
batches_solutions.append(batch_solutions)
batches_indices.append(batch_indices)
for batch_indices, batch_fitness in zip(batches_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), batches_solutions, batches_indices)):
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
elif len(numpy.array(batch_fitness)) != len(batch_indices):
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
for index, fitness in zip(batch_indices, batch_fitness):
if type(fitness) in GA.supported_int_float_types:
# The fitness function returns a single numeric value.
# This is a single-objective optimization problem.
pop_fitness[index] = fitness
elif type(fitness) in [list, tuple, numpy.ndarray]:
# The fitness function returns a list/tuple/numpy.ndarray.
# This is a multi-objective optimization problem.
pop_fitness[index] = fitness
else:
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value ({fitness}) of type {type(fitness)} found.")
pop_fitness = numpy.array(pop_fitness)
except Exception as ex:
self.logger.exception(ex)
sys.exit(-1)
return pop_fitness
function_inputs = [4,-2,3.5,5,-11,-4.7]
desired_output = 44
def fitness_func(ga_instance, solution, solution_idx):
output = numpy.sum(solution*function_inputs)
solution_fitness = 1.0 / numpy.abs(output - desired_output)
print(' ', ga_instance.generations_completed, solution_idx, solution_fitness, output, solution)
return solution_fitness
# Function called at the end of each generation
def callback_generation(ga_instance):
# pop_fitness=ga_instance.last_generation_fitness
solution, solution_fitness, solution_idx = ga_instance.best_solution(pop_fitness=ga_instance.last_generation_fitness)
output = numpy.sum(solution*function_inputs)
print('\n EOG', ga_instance.generations_completed, solution_idx, solution_fitness, output, solution)
num_generations = 4
num_parents_mating = 4
sol_per_pop = 10
num_genes = len(function_inputs)
stop_criteria = "reach_10"
ga_instance = NewGA(num_generations=num_generations,
num_parents_mating=num_parents_mating,
fitness_func=fitness_func,
sol_per_pop=sol_per_pop,
num_genes=num_genes,
suppress_warnings=True,
stop_criteria=stop_criteria,
on_generation=callback_generation)
ga_instance.run()
That fixes the issues. Thanks a lot @ahmedfgad. I really appreciate your support and I do find Pygad really convenient to use.
I understand that fitness values will not be reused if the following settings are used:
But is there a way to disable reuse and force recalculation of fitness values for each new generation if any of these options is set to a different value?
Specifically, I want to have
keep_elitism=1
, but recalculate the fitness value of the best solution in each new generation.