Source code for ddmtolab.Algorithms.STMO.MOEA_DD

"""
Multi-objective Evolutionary Algorithm Based on Decomposition and Dominance (MOEA/DD)

This module implements MOEA/DD for multi/many-objective optimization problems.

References
----------
    [1] Li, Ke, et al. "An evolutionary many-objective optimization algorithm based on dominance and decomposition." IEEE transactions on evolutionary computation 19.5 (2014): 694-716.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.18
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import pdist, squareform
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MOEA_DD: """ Multi-objective Evolutionary Algorithm Based on Decomposition and Dominance. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, delta=0.9, muc=20.0, mum=15.0, save_data=True, save_path='./Data', name='MOEA-DD', disable_tqdm=True): """ Initialize MOEA/DD. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) delta : float, optional Probability of choosing parents locally (default: 0.9) muc : float, optional Distribution index for simulated binary crossover (SBX) (default: 20.0) mum : float, optional Distribution index for polynomial mutation (PM) (default: 15.0) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MOEADD_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.delta = delta self.muc = muc self.mum = mum self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MOEA/DD algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks no = problem.n_objs n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate uniformly distributed weight vectors for each task W = [] T = [] # Neighborhood size B = [] # Neighbor indices for i in range(nt): w_i, n = uniform_point(n_per_task[i], no[i]) W.append(w_i) n_per_task[i] = n # Set neighborhood size to 10% of population T.append(int(np.ceil(n / 10))) # Detect the neighbors of each weight vector based on Euclidean distance distances = squareform(pdist(w_i)) neighbors = np.argsort(distances, axis=1)[:, :T[i]] B.append(neighbors) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Initialize region for each task region = [] for i in range(nt): # Calculate cosine similarity: 1 - cosine distance norm_objs = objs[i] / (np.linalg.norm(objs[i], axis=1, keepdims=True) + 1e-10) norm_w = W[i] / (np.linalg.norm(W[i], axis=1, keepdims=True) + 1e-10) cosine_similarity = np.dot(norm_objs, norm_w.T) region_i = np.argmax(cosine_similarity, axis=1) region.append(region_i) # Initialize front_no for each task front_no = [] for i in range(nt): front_no_i, _ = nd_sort(objs[i], cons[i], n_per_task[i]) front_no.append(front_no_i) # Initialize ideal point Z for each task Z = [] for i in range(nt): Z.append(np.min(objs[i], axis=0)) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) # Main loop while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for task_id in active_tasks: # For each solution for i in range(n_per_task[task_id]): # Choose the parents ei = np.where(np.isin(region[task_id], B[task_id][i]))[0] if np.random.rand() < self.delta and len(ei) >= 2: # Local selection cv_ei = np.sum(np.maximum(0, cons[task_id][ei]), axis=1) if cons[ task_id] is not None else np.zeros( len(ei)) parents = tournament_selection(2, 2, cv_ei) parents = ei[parents] else: # Global selection cv = np.sum(np.maximum(0, cons[task_id]), axis=1) if cons[task_id] is not None else np.zeros( n_per_task[task_id]) parents = tournament_selection(2, 2, cv) # Generate an offspring parent_decs = decs[task_id][parents, :] off_decs = ga_generation(parent_decs, muc=self.muc, mum=self.mum) off_objs, off_cons = evaluation_single(problem, off_decs[:1], task_id) # Assign offspring to region norm_off_obj = off_objs[0] / (np.linalg.norm(off_objs[0]) + 1e-10) norm_w = W[task_id] / (np.linalg.norm(W[task_id], axis=1, keepdims=True) + 1e-10) cosine_similarity = np.dot(norm_off_obj, norm_w.T) off_region = np.argmax(cosine_similarity) # Add the offspring to the population decs[task_id] = np.vstack([decs[task_id], off_decs[0]]) objs[task_id] = np.vstack([objs[task_id], off_objs[0]]) if cons[task_id] is not None and off_cons is not None: cons[task_id] = np.vstack([cons[task_id], off_cons[0]]) region[task_id] = np.append(region[task_id], off_region) # Update front_no (add mode) front_no[task_id] = update_front(objs[task_id], front_no[task_id]) # Calculate constraint violations cv = np.sum(np.maximum(0, cons[task_id]), axis=1) if cons[task_id] is not None else np.zeros( len(objs[task_id])) # Update the ideal point Z[task_id] = np.minimum(Z[task_id], off_objs[0]) # Delete a solution from the population if np.any(cv > 0): S = np.argsort(cv)[::-1] S = S[:np.sum(cv > 0)] flag = False x = None for j in range(len(S)): if np.sum(region[task_id] == region[task_id][S[j]]) > 1: flag = True x = S[j] break if not flag: x = S[0] elif np.max(front_no[task_id]) == 1: x = locate_worst(objs[task_id], W[task_id], region[task_id], front_no[task_id], Z[task_id]) else: fl = np.where(front_no[task_id] == np.max(front_no[task_id]))[0] if len(fl) == 1: if np.sum(region[task_id] == region[task_id][fl[0]]) > 1: x = fl[0] else: x = locate_worst(objs[task_id], W[task_id], region[task_id], front_no[task_id], Z[task_id]) else: sub_region = np.unique(region[task_id][fl]) crowd = np.bincount(region[task_id][np.isin(region[task_id], sub_region)], minlength=W[task_id].shape[0]) phi = np.where(crowd == np.max(crowd))[0] pbi = cal_pbi(objs[task_id], W[task_id], region[task_id], Z[task_id], np.isin(region[task_id], phi)) pbi_sum = np.zeros(W[task_id].shape[0]) for j in range(len(pbi)): pbi_sum[region[task_id][j]] += pbi[j] phi = np.argmax(pbi_sum) phih = np.where(region[task_id] == phi)[0] if len(phih) > 1: x = phih[np.argmax(pbi[phih])] else: x = locate_worst(objs[task_id], W[task_id], region[task_id], front_no[task_id], Z[task_id]) # Update front_no before removing (delete mode) front_no[task_id] = update_front(objs[task_id], front_no[task_id], x) # Remove the worst solution decs[task_id] = np.delete(decs[task_id], x, axis=0) objs[task_id] = np.delete(objs[task_id], x, axis=0) if cons[task_id] is not None: cons[task_id] = np.delete(cons[task_id], x, axis=0) region[task_id] = np.delete(region[task_id], x) # Update evaluation count nfes_per_task[task_id] += 1 pbar.update(1) # Check if evaluation budget is exhausted if nfes_per_task[task_id] >= max_nfes_per_task[task_id]: break # Update history append_history(all_decs[task_id], decs[task_id], all_objs[task_id], objs[task_id], all_cons[task_id], cons[task_id]) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def update_front(pop_obj, front_no, x=None): """ Update the front number of each solution when a solution is added or deleted. Parameters ---------- pop_obj : np.ndarray Objective values of shape (N, M) front_no : np.ndarray Front numbers x : int, optional Index of solution to delete. If None, assumes a new solution is added at the end (default: None) Returns ------- FrontNo : np.ndarray Updated front numbers """ N, M = pop_obj.shape if x is None: # Add a new solution (has been stored in the last of PopObj) front_no = np.append(front_no, 0) move = np.zeros(N, dtype=bool) move[N - 1] = True current_f = 1 # Locate the front No. of the new solution while True: dominated = False for i in range(N - 1): if front_no[i] == current_f: m = 0 while m < M and pop_obj[i, m] <= pop_obj[N - 1, m]: m += 1 dominated = (m == M) if dominated: break if not dominated: break else: current_f += 1 # Move down the dominated solutions front by front while np.any(move): next_move = np.zeros(N, dtype=bool) for i in range(N): if front_no[i] == current_f: dominated = False for j in range(N): if move[j]: m = 0 while m < M and pop_obj[j, m] <= pop_obj[i, m]: m += 1 dominated = (m == M) if dominated: break next_move[i] = dominated front_no[move] = current_f current_f += 1 move = next_move else: # Delete the x-th solution x = int(x) move = np.zeros(N, dtype=bool) move[x] = True current_f = int(front_no[x]) + 1 while np.any(move): next_move = np.zeros(N, dtype=bool) for i in range(N): if front_no[i] == current_f: dominated = False for j in range(N): if move[j]: m = 0 while m < M and pop_obj[j, m] <= pop_obj[i, m]: m += 1 dominated = (m == M) if dominated: break next_move[i] = dominated for i in range(N): if next_move[i]: dominated = False for j in range(N): if front_no[j] == current_f - 1 and not move[j]: m = 0 while m < M and pop_obj[j, m] <= pop_obj[i, m]: m += 1 dominated = (m == M) if dominated: break next_move[i] = not dominated front_no[move] = current_f - 2 current_f += 1 move = next_move front_no = np.delete(front_no, x) return front_no def locate_worst(pop_obj, W, region, front_no, Z): """ Detect the worst solution in the population. Parameters ---------- pop_obj : ndarray Population objective values, shape (n_solutions, n_objectives). W : ndarray Weight vectors, shape (n_weights, n_objectives). region : ndarray Region assignment for each solution, shape (n_solutions,). front_no : ndarray Pareto front number for each solution, shape (n_solutions,). Z : ndarray Ideal point (reference point), shape (n_objectives,). Returns ------- int Index of the worst solution in the population. """ crowd = np.bincount(region, minlength=W.shape[0]) phi = np.where(crowd == np.max(crowd))[0] pbi = cal_pbi(pop_obj, W, region, Z, np.isin(region, phi)) pbi_sum = np.zeros(W.shape[0]) for j in range(len(pbi)): pbi_sum[region[j]] += pbi[j] phi = np.argmax(pbi_sum) phih = np.where(region == phi)[0] R = phih[front_no[phih] == np.max(front_no[phih])] x = R[np.argmax(pbi[R])] return int(x) def cal_pbi(pop_obj, W, region, Z, sub): """ Calculate the PBI (Penalty-based Boundary Intersection) value between each solution and its associated weight vector. Parameters ---------- pop_obj : ndarray Population objective values, shape (n_solutions, n_objectives). W : ndarray Weight vectors, shape (n_weights, n_objectives). region : ndarray Region assignment for each solution, shape (n_solutions,). Z : ndarray Ideal point (reference point), shape (n_objectives,). sub : ndarray of bool Boolean mask indicating which solutions to calculate PBI for, shape (n_solutions,). Returns ------- pbi : ndarray PBI values for all solutions, shape (n_solutions,). Only solutions where sub is True have non-zero values. """ m = W.shape[1] z_rep = np.tile(Z, (np.sum(sub), 1)) w_sub = W[region[sub], :] norm_w = np.sqrt(np.sum(w_sub ** 2, axis=1)) d1 = np.abs(np.sum((pop_obj[sub, :] - z_rep) * w_sub, axis=1)) / norm_w d1_norm_w = d1 / norm_w projection = z_rep + w_sub * d1_norm_w[:, np.newaxis] d2 = np.sqrt(np.sum((pop_obj[sub, :] - projection) ** 2, axis=1)) pbi = np.zeros(pop_obj.shape[0]) pbi[sub] = d1 + 5 * d2 return pbi