Source code for desdeo_emo.EAs.BaseEA

from typing import Dict, Literal, Tuple, Type, Union

import pandas as pd
import numpy as np
from desdeo_emo.population.Population import Population
from desdeo_emo.selection.SelectionBase import SelectionBase

from desdeo_problem import MOProblem, DataProblem, EvaluationResults
from desdeo_emo.problem import IOPISProblem
from desdeo_tools.interaction import SimplePlotRequest
from desdeo_tools.interaction.request import BaseRequest
from scipy.special import comb
from desdeo_tools.utilities import non_dominated
from desdeo_emo.utilities.non_dominated import check_domination


[docs]class eaError(Exception): """Raised when an error related to EA occurs"""
[docs]class BaseEA: """This class provides the basic structure for Evolutionary algorithms.""" def __init__( self, a_priori: bool = False, interact: bool = False, selection_operator: Type[ SelectionBase ] = None, # TODO: No algorithm uses this option. Remove? n_iterations: int = 10, n_gen_per_iter: int = 100, total_function_evaluations: int = 0, use_surrogates: bool = False, keep_archive: bool = False, save_non_dominated: bool = False, ): """Initialize EA here. Set up parameters, create EA specific objects.""" self.a_priori: bool = a_priori self.interact: bool = interact self.n_iterations: int = n_iterations self.n_gen_per_iter: int = n_gen_per_iter self.total_gen_count: int = n_gen_per_iter * n_iterations self.total_function_evaluations = total_function_evaluations self.selection_operator = selection_operator self.use_surrogates: bool = use_surrogates # Internal counters and state trackers self._iteration_counter: int = 0 self._gen_count_in_curr_iteration: int = 0 self._current_gen_count: int = 0 self._function_evaluation_count: int = 0 self._interaction_location: Union[ None, Literal["Population", "Selection", "EA", "Problem"] ] = None self.interaction_type_set_bool: bool = False self.allowable_interaction_types: Union[None, Dict] = None self.population: Union[None, Population] = None self.keep_archive = keep_archive self.archive: dict = {} self.save_non_dominated = save_non_dominated """@property def interaction_location( self ) -> Union[None, Literal["Population", "Selection", "EA", "Problem"]]: ""Return the location of interaction handling "" return self._interaction_location""" @property def allowable_interaction_types(self) -> dict: if self._allowable_interaction_types is not None: return self._allowable_interaction_types else: self.allowable_interaction_types = self.set_interaction_type(None) return self._allowable_interaction_types @allowable_interaction_types.setter def allowable_interaction_types(self, value: dict): self._allowable_interaction_types = value
[docs] def start(self): """Mimics the structure of the mcdm methods. Returns the request objects from self.retuests().""" if self.population is None: raise eaError("Population not initialized.") if self.selection_operator is None: raise eaError("Selection operator not initialized.") if self.interact and not self.interaction_type_set_bool: raise eaError( "Interaction type not set. Use the set_interaction_type() method." ) if self.save_non_dominated: self.non_dominated = {} non_dom_indices = non_dominated(self.population.fitness) self.non_dominated["individuals"] = self.population.individuals[ non_dom_indices ] self.non_dominated["objectives"] = self.population.objectives[ non_dom_indices ] self.non_dominated["fitness"] = self.population.fitness[non_dom_indices] # TODO: add uncertainity, constraints, and generation numbers return self.requests()
[docs] def end(self): """To be run at the end of the evolution process.""" if self.keep_archive: objective_names = self.population.problem.objective_names if isinstance(objective_names[0], list): objective_names = [ item for sublist in objective_names for item in sublist ] fitness_names = self.population.problem.fitness_names if isinstance(fitness_names[0], list): objective_names = [ item for sublist in fitness_names for item in sublist ] self.archive[self._current_gen_count] = { "iteration": self._iteration_counter, "decision variables": pd.DataFrame( self.population.individuals, columns=self.population.problem.variable_names, ), "objectives": pd.DataFrame( self.population.objectives, columns=objective_names, ), "fitness": pd.DataFrame( self.population.fitness, columns=fitness_names, ), } return self.archive
[docs] def _next_gen(self): """Run one generation of an EA. Change nothing about the parameters."""
[docs] def iterate(self, preference=None) -> Tuple: """Run one iteration of EA. One iteration consists of a constant or variable number of generations. This method leaves EA.params unchanged, except the current iteration count and gen count. """ self.manage_preferences(preference) self.pre_iteration() self._gen_count_in_curr_iteration = 0 while self.continue_iteration(): self._next_gen() self._iteration_counter += 1 self.post_iteration() return self.requests()
[docs] def pre_iteration(self): """Run this code before every iteration.""" if self.keep_archive: objective_names = self.population.problem.objective_names if isinstance(objective_names[0], list): objective_names = [ item for sublist in objective_names for item in sublist ] fitness_names = self.population.problem.fitness_names if isinstance(fitness_names[0], list): objective_names = [ item for sublist in fitness_names for item in sublist ] self.archive[self._current_gen_count] = { "iteration": self._iteration_counter, "decision variables": pd.DataFrame( self.population.individuals, columns=self.population.problem.variable_names, ), "objectives": pd.DataFrame( self.population.objectives, columns=objective_names, ), "fitness": pd.DataFrame( self.population.fitness, columns=fitness_names, ), }
[docs] def post_iteration(self): """Run this code after every iteration.""" return
[docs] def continue_iteration(self): """Checks whether the current iteration should be continued or not.""" return ( self._gen_count_in_curr_iteration < self.n_gen_per_iter and self.check_FE_count() )
[docs] def continue_evolution(self) -> bool: """Checks whether the current iteration should be continued or not.""" return self._iteration_counter < self.n_iterations and self.check_FE_count()
[docs] def check_FE_count(self) -> bool: """Checks whether termination criteria via function evaluation count has been met or not. Returns: bool: True is function evaluation count limit NOT met. """ if self.total_function_evaluations == 0: return True elif self._function_evaluation_count <= self.total_function_evaluations: return True return False
[docs] def set_interaction_type( self, interaction_type: Union[None, str] ) -> Union[None, str]: if self._interaction_location == "Selection": try: self.interaction_type_set_bool = True return self.selection_operator.set_interaction_type(interaction_type) except NotImplementedError as e: self.interaction_type_set_bool = False raise eaError( "Interaction not implemented in the selection operator." ) from e if self._interaction_location == "Problem": try: self.interaction_type_set_bool = True return except NotImplementedError as e: self.interaction_type_set_bool = False raise eaError( "Interaction not implemented in the problem class." ) from e if self._interaction_location == "EA": self.interaction_type_set_bool = False raise NotImplementedError("No interaction type implemented yet.") if self._interaction_location == "Population": try: self.interaction_type_set_bool = True return self.selection_operator.set_interaction_type(interaction_type) except NotImplementedError as e: self.interaction_type_set_bool = False raise eaError( "Interaction not implemented in the population class." ) from e
[docs] def manage_preferences(self, preference=None): """Forward the preference to the correct preference handling method. Args: preference (_type_, optional): _description_. Defaults to None. Raises: eaError: Preference handling not implemented. """ if self._interaction_location is None: return if self._interaction_location == "Population": try: self.population.manage_preferences(preference) return except NotImplementedError as e: raise eaError( "Interaction handling not implemented in population." ) from e if self._interaction_location == "Selection": try: self.selection_operator.manage_preferences(self.population, preference) return except NotImplementedError as e: raise eaError( "Interaction handling not implemented in selection." ) from e if self._interaction_location == "EA": raise eaError("Override this method in child class.") if self._interaction_location == "Problem": try: self.population.problem.manage_preferences(self.population, preference) self.population.reevaluate_fitness() return except NotImplementedError as e: raise eaError( "Interaction handling not implemented in problem class." ) from e
[docs] def requests(self) -> Tuple: pass
[docs] def non_dominated_archive( self, individuals: np.ndarray, results: EvaluationResults ) -> None: assert self.save_non_dominated new_fitness = results.fitness new_objectives = results.objectives if new_fitness.ndim == 1: new_fitness = np.atleast_2d(new_fitness) new_objectives = np.atleast_2d(results.objectives) if individuals.ndim == 1: individuals = np.atleast_2d(individuals) new_indices, old_indices = check_domination( new_fitness, self.non_dominated["fitness"] ) self.non_dominated["individuals"] = np.vstack( (self.non_dominated["individuals"][old_indices], individuals[new_indices]) ) self.non_dominated["objectives"] = np.vstack( ( self.non_dominated["objectives"][old_indices], new_objectives[new_indices], ) ) self.non_dominated["fitness"] = np.vstack( (self.non_dominated["fitness"][old_indices], new_fitness[new_indices]) )
[docs]class BaseDecompositionEA(BaseEA): """The Base class for decomposition based EAs. This class contains most of the code to set up the parameters and operators. It also contains the logic of a simple decomposition EA. Parameters ---------- problem : MOProblem The problem class object specifying the details of the problem. selection_operator : Type[SelectionBase], optional The selection operator to be used by the EA, by default None. population_size : int, optional The desired population size, by default None, which sets up a default value of population size depending upon the dimensionaly of the problem. population_params : Dict, optional The parameters for the population class, by default None. See desdeo_emo.population.Population for more details. initial_population : Population, optional An initial population class, by default None. Use this if you want to set up a specific starting population, such as when the output of one EA is to be used as the input of another. lattice_resolution : int, optional The number of divisions along individual axes in the objective space to be used while creating the reference vector lattice by the simplex lattice design. By default None a_priori : bool, optional A bool variable defining whether a priori preference is to be used or not. By default False interact : bool, optional A bool variable defining whether interactive preference is to be used or not. By default False n_iterations : int, optional The total number of iterations to be run, by default 10. This is not a hard limit and is only used for an internal counter. n_gen_per_iter : int, optional The total number of generations in an iteration to be run, by default 100. This is not a hard limit and is only used for an internal counter. total_function_evaluations :int, optional Set an upper limit to the total number of function evaluations. When set to zero, this argument is ignored and other termination criteria are used. """ def __init__( self, problem: MOProblem = None, initial_population: Population = None, population_size: int = None, population_params: Dict = None, lattice_resolution: int = None, interact: bool = False, n_iterations: int = 10, n_gen_per_iter: int = 100, use_surrogates: bool = False, total_function_evaluations: int = 0, keep_archive: bool = False, save_non_dominated: bool = False, ): super().__init__( interact=interact, n_iterations=n_iterations, n_gen_per_iter=n_gen_per_iter, total_function_evaluations=total_function_evaluations, use_surrogates=use_surrogates, keep_archive=keep_archive, save_non_dominated=save_non_dominated, ) if interact: if isinstance(problem, (MOProblem, DataProblem)): self._interaction_location = "Selection" if isinstance(problem, (IOPISProblem)): self._interaction_location = "Problem" self.set_interaction_type(None) if initial_population is not None: if problem is not None: raise eaError("Provide only one of initial_population or problem.") # Population should be compatible. self.population = initial_population # TODO put checks here. num_fitnesses = self.population.problem.n_of_fitnesses if population_size is None and lattice_resolution is None: population_size = self.population.pop_size else: if problem is None: raise eaError("Provide one of initial_population or problem.") num_fitnesses = problem.n_of_fitnesses if population_size is None: if lattice_resolution is None: lattice_res_options = [49, 13, 7, 5, 4, 3, 3, 3, 3] if num_fitnesses < 11: lattice_resolution = lattice_res_options[num_fitnesses - 2] else: lattice_resolution = 3 population_size = comb( lattice_resolution + num_fitnesses - 1, num_fitnesses - 1, exact=True, ) self.population = Population( problem, population_size, population_params, use_surrogates ) self._function_evaluation_count += population_size # self.reference_vectors = ReferenceVectors(lattice_resolution, num_fitnesses) # print("Using BaseDecompositionEA init")
[docs] def _next_gen(self): """Run one generation of decomposition based EA. Intended to be used by next_iteration. """ offspring = self.population.mate() # (params=self.params) if self.save_non_dominated: results = self.population.add(offspring, self.use_surrogates) self.non_dominated_archive(offspring, results) else: self.population.add(offspring, self.use_surrogates) selected = self._select() self.population.keep(selected) self._current_gen_count += 1 self._gen_count_in_curr_iteration += 1 if not self.use_surrogates: self._function_evaluation_count += offspring.shape[0]
[docs] def pre_iteration(self): super().pre_iteration() if not self.interact: self.selection_operator.adapt_RVs(self.population.fitness) if self._interaction_location != "Selection": self.selection_operator.adapt_RVs(self.population.fitness)
[docs] def _select(self) -> list: """Describe a selection mechanism. Return indices of selected individuals. Returns ------- list List of indices of individuals to be selected. """ return self.selection_operator.do(self.population)
[docs] def request_plot(self) -> SimplePlotRequest: dimensions_data = pd.DataFrame( index=["minimize", "ideal", "nadir"], columns=self.population.problem.get_objective_names(), ) dimensions_data.loc["minimize"] = self.population.problem._max_multiplier dimensions_data.loc["ideal"] = self.population.ideal_objective_vector dimensions_data.loc["nadir"] = self.population.nadir_objective_vector data = pd.DataFrame( self.population.objectives, columns=self.population.problem.objective_names ) return SimplePlotRequest( data=data, dimensions_data=dimensions_data, message="Objective Values" )
[docs] def request_preferences(self) -> Type[BaseRequest]: if self.interact is False: return if self._interaction_location == "Problem": return self.population.problem.request_preferences(self.population) if self._interaction_location == "Selection": return self.selection_operator.request_preferences(self.population)
[docs] def requests(self) -> Tuple: return (self.request_preferences(), self.request_plot())
[docs] def end(self): """Conducts non-dominated sorting at the end of the evolution process Returns: tuple: The first element is a 2-D array of the decision vectors of the non-dominated solutions. The second element is a 2-D array of the corresponding objective values. """ base_return = super().end() non_dom = self.population.non_dominated_objectives() return ( self.population.individuals[non_dom, :], self.population.objectives[non_dom, :], base_return, )