Source code for ddmtolab.Algorithms.STMO.SPEA2

"""
Strength Pareto Evolutionary Algorithm 2 (SPEA2)

This module implements SPEA2 for multi-objective optimization problems.

References
----------
    [1] Zitzler, E., Laumanns, M., & Thiele, L. (2001). SPEA2: Improving the Strength Pareto Evolutionary Algorithm For Multiobjective Optimization. In Evolutionary Methods for Design, Optimization and Control with Applications to Industrial Problems. Proceedings of the EUROGEN'2001.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.13
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class SPEA2: """ Strength Pareto Evolutionary Algorithm 2 for multi-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, muc=20.0, mum=15.0, epsilon=0, save_data=True, save_path='./Data', name='SPEA2', disable_tqdm=True): """ Initialize SPEA2 algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) muc : float, optional Distribution index for simulated binary crossover (SBX) (default: 20.0) mum : float, optional Distribution index for polynomial mutation (PM) (default: 15.0) epsilon : float, optional Constraint epsilon value for epsilon-constraint method (default: 0) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'SPEA2_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.muc = muc self.mum = mum self.epsilon = epsilon self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the SPEA2 algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Calculate initial fitness for each task fitness = [] for i in range(nt): cv = self._calculate_constraint_violations(cons[i]) fitness_i = self._cal_fitness(objs[i], cv) fitness.append(fitness_i.copy()) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Environmental selection to get current population selected_indices, selected_fitness = self._spea2_selection(objs[i], cons[i], n_per_task[i]) objs[i] = objs[i][selected_indices] decs[i] = decs[i][selected_indices] cons[i] = cons[i][selected_indices] if cons[i] is not None else None fitness[i] = selected_fitness # Parent selection via binary tournament based on fitness matingpool = tournament_selection(2, n_per_task[i], fitness[i]) # Generate offspring through crossover and mutation off_decs = ga_generation(decs[i][matingpool, :], muc=self.muc, mum=self.mum) off_objs, off_cons = evaluation_single(problem, off_decs, i) # Merge parent and offspring populations objs[i], decs[i], cons[i] = vstack_groups((objs[i], off_objs), (decs[i], off_decs), (cons[i], off_cons)) nfes_per_task[i] += n_per_task[i] pbar.update(n_per_task[i]) append_history(all_decs[i], decs[i], all_objs[i], objs[i], all_cons[i], cons[i]) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _calculate_constraint_violations(self, cons): """ Calculate constraint violations. Parameters ---------- cons : ndarray or None Constraint values with shape (pop_size, n_cons) Returns ------- cv : ndarray Constraint violation values with shape (pop_size,) """ if cons is not None: cv = np.sum(np.maximum(0, cons), axis=1) cv[cv < self.epsilon] = 0 else: cv = np.zeros(cons.shape[0]) if cons is not None else np.array([]) return cv def _spea2_selection(self, objs, cons, N): """ Environmental selection for SPEA2 algorithm. Parameters ---------- objs : ndarray Objective values with shape (pop_size, n_objs) cons : ndarray or None Constraint values with shape (pop_size, n_cons) N : int Number of individuals to select Returns ------- selected_indices : ndarray Indices of selected individuals selected_fitness : ndarray Fitness values of selected individuals """ pop_size = objs.shape[0] # Calculate constraint violations cv = self._calculate_constraint_violations(cons) # Calculate fitness for all individuals fitness = self._cal_fitness(objs, cv) # Environmental selection next_selected = fitness < 1 if np.sum(next_selected) < N: # Need to add more individuals sorted_indices = np.argsort(fitness) next_selected[sorted_indices[:N]] = True elif np.sum(next_selected) > N: # Need to remove some individuals using truncation to_remove = self._truncation(objs[next_selected], np.sum(next_selected) - N) temp_indices = np.where(next_selected)[0] next_selected[temp_indices[to_remove]] = False # Get selected indices and fitness selected_indices = np.where(next_selected)[0] selected_fitness = fitness[selected_indices] # Sort selected individuals by fitness sort_order = np.argsort(selected_fitness) selected_indices = selected_indices[sort_order] selected_fitness = selected_fitness[sort_order] return selected_indices, selected_fitness def _cal_fitness(self, objs, cv): """ Calculate SPEA2 fitness values. Parameters ---------- objs : ndarray Objective values with shape (N, M) cv : ndarray Constraint violation values with shape (N,) Returns ------- fitness : ndarray Fitness values with shape (N,) """ N = objs.shape[0] # Detect dominance relations dominate = np.zeros((N, N), dtype=bool) for i in range(N): for j in range(N): if i == j: continue # Compare constraint violations if cv[i] < cv[j]: dominate[i, j] = True elif cv[i] > cv[j]: dominate[j, i] = True else: # Same constraint violation, compare objectives less = np.any(objs[i, :] < objs[j, :]) greater = np.any(objs[i, :] > objs[j, :]) if less and not greater: dominate[i, j] = True elif greater and not less: dominate[j, i] = True S = np.sum(dominate, axis=1) # Calculate R(i): sum of S values of solutions that dominate i R = np.zeros(N) for i in range(N): # Find solutions that dominate i dominating_i = dominate[:, i] if np.any(dominating_i): R[i] = np.sum(S[dominating_i]) # Calculate D(i): density estimation distances = cdist(objs, objs, metric='euclidean') # Set diagonal to infinity np.fill_diagonal(distances, np.inf) # Sort distances for each solution sorted_distances = np.sort(distances, axis=1) k = int(np.floor(np.sqrt(N))) if k > 0: k_idx = min(k, sorted_distances.shape[1]) - 1 # -1 for 0-based indexing D = 1.0 / (sorted_distances[:, k_idx] + 2) else: D = np.zeros(N) # Calculate fitness: F = R + D fitness = R + D return fitness def _truncation(self, objs, K): """ Truncation operator for SPEA2. Parameters ---------- objs : ndarray Objective values of candidate solutions K : int Number of solutions to remove Returns ------- to_remove : ndarray Boolean array indicating which solutions to remove """ N = objs.shape[0] to_remove = np.zeros(N, dtype=bool) if K <= 0: return to_remove # Calculate pairwise distances distances = cdist(objs, objs, metric='euclidean') # Set diagonal to infinity np.fill_diagonal(distances, np.inf) while np.sum(to_remove) < K: # Find remaining solutions remaining = np.where(~to_remove)[0] if len(remaining) <= 1: # If only one or zero remaining, break break # Get distances between remaining solutions remaining_distances = distances[np.ix_(remaining, remaining)] # Sort distances for each remaining solution sorted_distances = np.sort(remaining_distances, axis=1) # Find solution with smallest distance to its nearest neighbor min_idx = 0 for i in range(1, len(remaining)): # Compare rows element-wise for j in range(sorted_distances.shape[1]): if sorted_distances[i, j] < sorted_distances[min_idx, j]: min_idx = i break elif sorted_distances[i, j] > sorted_distances[min_idx, j]: break # If all equal, keep the smaller index else: if remaining[i] < remaining[min_idx]: min_idx = i # Remove the selected solution to_remove[remaining[min_idx]] = True return to_remove