Source code for ddmtolab.Algorithms.STMO.MOEA_D_STM

"""
MOEA/D with Stable Matching (MOEA/D-STM)

This module implements MOEA/D-STM for multi-objective optimization problems.

References
----------
    [1] Li, Ke, et al. "Stable matching-based selection in evolutionary multiobjective optimization." IEEE Transactions on Evolutionary Computation 18.6 (2014): 909-923.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.01
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MOEA_D_STM: """ MOEA/D with Stable Matching for multi-objective optimization. This algorithm uses a stable matching model to select solutions for subproblems, ensuring a stable assignment between solutions and weight vectors. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '0', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, T=None, save_data=True, save_path='./Data', name='MOEA-D-STM', disable_tqdm=True): """ Initialize MOEA/D-STM algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) T : int or List[int], optional Size of neighborhood (default: ceil(n/10)) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MOEADSTM_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.T = T self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MOEA/D-STM algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks no = problem.n_objs n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Set neighborhood size for each task if self.T is None: T_per_task = [int(np.ceil(n / 10)) for n in n_per_task] else: T_per_task = par_list(self.T, nt) # Generate uniformly distributed weight vectors for each task Weight = [] B = [] # Neighborhood for each task for i in range(nt): Weight_i, n = uniform_point(n_per_task[i], no[i]) Weight.append(Weight_i) n_per_task[i] = n # Detect the neighbors of each solution distance = cdist(Weight_i, Weight_i) B_i = np.argsort(distance, axis=1)[:, :T_per_task[i]] B.append(B_i) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, _ = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs = init_history(decs, objs) # Initialize algorithm-specific variables for each task z = [] # Ideal point Pi = [] # Utility for each subproblem oldObj = [] # Old Tchebycheff function value for i in range(nt): z_i = np.min(objs[i], axis=0) z.append(z_i) Pi_i = np.ones(n_per_task[i]) Pi.append(Pi_i) # Calculate old Tchebycheff values oldObj_i = np.max(np.abs((objs[i] - z_i) / Weight[i]), axis=1) oldObj.append(oldObj_i) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Perform 5 sub-generations for subgen in range(5): # Choose I: boundary solutions + tournament selection # Boundary solutions are those with weight close to axes boundary = np.where(np.sum(Weight[i] < 1e-3, axis=1) == no[i] - 1)[0] n_select = int(np.floor(n_per_task[i] / 5)) - len(boundary) if n_select > 0: # Tournament selection based on negative utility tournament_indices = self._tournament_selection(10, n_select, -Pi[i]) I = np.concatenate([boundary, tournament_indices]) else: I = boundary # Generate offspring for each solution in I P = np.zeros((len(I), 3), dtype=int) for idx, sol_idx in enumerate(I): if nfes_per_task[i] >= max_nfes_per_task[i]: break # Choose the parents if np.random.rand() < 0.9: # From neighborhood P[idx, :] = B[i][sol_idx, np.random.permutation(T_per_task[i])[:3]] else: # From entire population P[idx, :] = np.random.permutation(n_per_task[i])[:3] # Generate offspring using DE if len(I) > 0 and nfes_per_task[i] < max_nfes_per_task[i]: # Prepare parent arrays parent1 = decs[i][P[:, 0]] parent2 = decs[i][P[:, 1]] parent3 = decs[i][P[:, 2]] # DE operator off_decs = self._operator_de(parent1, parent2, parent3) # Evaluate offspring off_objs, _ = evaluation_single(problem, off_decs, i) # Update ideal point z[i] = np.minimum(z[i], np.min(off_objs, axis=0)) # Combine population and offspring combined_decs = np.vstack([decs[i], off_decs]) combined_objs = np.vstack([objs[i], off_objs]) # STM selection decs[i], objs[i] = self._stm_selection( combined_objs, combined_decs, Weight[i], z[i], np.max(objs[i], axis=0) ) nfes_per_task[i] += len(off_decs) pbar.update(len(off_decs)) # Update Pi every 10 generations current_gen = int(np.ceil(nfes_per_task[i] / n_per_task[i])) if current_gen % 10 == 0: # Calculate new Tchebycheff values newObj_i = np.max(np.abs((objs[i] - z[i]) / Weight[i]), axis=1) DELTA = oldObj[i] - newObj_i # Update utility Temp = DELTA < 0.001 Pi[i][~Temp] = 1 Pi[i][Temp] = (0.95 + 0.05 * DELTA[Temp] / 0.001) * Pi[i][Temp] oldObj[i] = newObj_i # Update history append_history(all_decs[i], decs[i], all_objs[i], objs[i]) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
def _tournament_selection(self, tournament_size, n_select, fitness): """ Tournament selection based on fitness values. Parameters ---------- tournament_size : int Number of individuals in each tournament n_select : int Number of individuals to select fitness : ndarray Fitness values (higher is better after negation) Returns ------- ndarray Selected indices """ pop_size = len(fitness) selected = [] for _ in range(n_select): # Randomly select tournament_size individuals candidates = np.random.choice(pop_size, size=min(tournament_size, pop_size), replace=False) # Select the best one winner = candidates[np.argmax(fitness[candidates])] selected.append(winner) return np.array(selected) def _operator_de(self, parent1, parent2, parent3, CR=1.0, F=0.5): """ Differential Evolution operator with polynomial mutation. Parameters ---------- parent1 : ndarray First parent (N x D) parent2 : ndarray Second parent (N x D) parent3 : ndarray Third parent (N x D) CR : float, optional Crossover rate (default: 1.0) F : float, optional Scaling factor (default: 0.5) Returns ------- ndarray Offspring solutions """ N, D = parent1.shape # DE mutation: V = X1 + F * (X2 - X3) V = parent1 + F * (parent2 - parent3) # Crossover offspring = parent1.copy() mask = np.random.rand(N, D) < CR offspring[mask] = V[mask] # Boundary handling offspring = np.clip(offspring, 0, 1) # Polynomial mutation proM = 1.0 disM = 20 site = np.random.rand(N, D) < proM / D mu = np.random.rand(N, D) # Lower half temp = site & (mu <= 0.5) delta = (2 * mu + (1 - 2 * mu) * (1 - offspring) ** (disM + 1)) ** (1 / (disM + 1)) - 1 offspring[temp] = offspring[temp] + delta[temp] # Upper half temp = site & (mu > 0.5) delta = 1 - (2 * (1 - mu) + 2 * (mu - 0.5) * (1 - (1 - offspring)) ** (disM + 1)) ** (1 / (disM + 1)) offspring[temp] = offspring[temp] + delta[temp] # Final boundary handling offspring = np.clip(offspring, 0, 1) return offspring def _stm_selection(self, objs, decs, W, z, znad): """ Stable Matching based selection. This implements the stable matching model where solutions are matched to subproblems in a stable manner. Parameters ---------- objs : ndarray Objective values of all solutions (N x M) decs : ndarray Decision variables of all solutions (N x D) W : ndarray Weight vectors (NW x M) z : ndarray Ideal point (M,) znad : ndarray Nadir point (M,) Returns ------- tuple Selected decision variables and objectives (NW solutions) """ N = objs.shape[0] NW = W.shape[0] # Calculate the modified Tchebycheff value of each solution on each subproblem g = np.zeros((N, NW)) for i in range(N): # For each solution, calculate its Tchebycheff value on all subproblems g[i, :] = np.max(np.abs(objs[i] - z) / W, axis=1) # Calculate the perpendicular distance of each solution on each subproblem # Normalize objectives PopObj = (objs - z) / (znad - z + 1e-10) # Calculate cosine similarity Cosine = 1 - cdist(PopObj, W, metric='cosine') # Calculate perpendicular distance norms = np.sqrt(np.sum(PopObj ** 2, axis=1, keepdims=True)) Distance = norms * np.sqrt(1 - Cosine ** 2) # STM selection - Stable Matching Fp = np.zeros(NW, dtype=int) # Subproblem -> Solution mapping FX = np.zeros(N, dtype=int) # Solution -> Subproblem mapping Phi = np.zeros((NW, N), dtype=bool) # Proposal history # Continue until all subproblems are matched while np.any(Fp == 0): # Find unmatched subproblems RemainW = np.where(Fp == 0)[0] # Randomly select one unmatched subproblem i = RemainW[np.random.randint(len(RemainW))] # Find solutions that haven't been proposed to by this subproblem RemainX = np.where(~Phi[i, :])[0] # Select the best solution for this subproblem (minimum Tchebycheff) best_idx = np.argmin(g[RemainX, i]) j = RemainX[best_idx] # Mark this proposal Phi[i, j] = True if FX[j] == 0: # Solution j is not matched, accept immediately Fp[i] = j + 1 # +1 because 0 means unmatched FX[j] = i + 1 elif Distance[j, i] < Distance[j, FX[j] - 1]: # Solution j prefers subproblem i over its current match old_match = FX[j] - 1 Fp[i] = j + 1 Fp[old_match] = 0 # Old match becomes unmatched FX[j] = i + 1 # Extract selected solutions (Fp contains 1-indexed solution IDs) selected_indices = Fp - 1 # Convert back to 0-indexed return decs[selected_indices], objs[selected_indices]