Source code for ddmtolab.Algorithms.MTMO.MO_MTEA_SaO

"""
Multi-objective Multi-task Evolutionary Algorithm with Self-adaptive Solvers (MO-MTEA-SaO)

This module implements MO-MTEA-SaO for multi-task multi-objective optimization problems.

References
----------
    [1] Li, Yanchi, Wenyin Gong, and Shuijia Li. "Multitasking Optimization via an Adaptive Solver Multitasking Evolutionary Framework." Information Sciences (2022).

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.18
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MO_MTEA_SaO: """ Multi-objective Multi-task Evolutionary Algorithm with Self-adaptive Solvers. This algorithm features: - Two solver strategies: NSGA-II + GA and SPEA2 + DE - Self-adaptive solver selection based on success/failure history - Knowledge transfer between tasks - Adaptive population partitioning among solvers Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, t_gap=10, t_num=10, sa_gap=70, memory=30, ga_muc=20.0, ga_mum=15.0, de_f=0.5, de_cr=0.9, save_data=True, save_path='./Data', name='MO-MTEA-SaO', disable_tqdm=True): """ Initialize MO-MTEA-SaO algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) t_gap : int, optional Transfer gap - perform knowledge transfer every t_gap generations (default: 10) t_num : int, optional Number of solutions to transfer (default: 10) sa_gap : int, optional Self-adaptive gap - update solver allocation every sa_gap generations (default: 70) memory : int, optional Memory length for success/failure history (default: 30) ga_muc : float, optional Distribution index for GA crossover (SBX) (default: 20.0) ga_mum : float, optional Distribution index for GA mutation (PM) (default: 15.0) de_f : float, optional DE scaling factor (default: 0.5) de_cr : float, optional DE crossover probability (default: 0.9) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MOMTEASaO_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.t_gap = t_gap self.t_num = t_num self.sa_gap = sa_gap self.memory = memory self.ga_muc = ga_muc self.ga_mum = ga_mum self.de_f = de_f self.de_cr = de_cr self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MO-MTEA-SaO algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Strategy settings st_num = 2 # Number of strategies (GA+NSGA-II, DE+SPEA2) # Initialize strategy population sizes (equal split) # stn[t] = [size_strategy_1, size_strategy_2] stn = [] for t in range(nt): base_size = n_per_task[t] // st_num sizes = [base_size] * st_num sizes[-1] = n_per_task[t] - sum(sizes[:-1]) # Adjust last strategy size stn.append(sizes) # Success/failure history succ_history = [] # List of (t, st) -> count fail_history = [] # Progress bar total_nfes = sum(max_nfes_per_task) pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) gen = 0 # Main optimization loop while sum(nfes_per_task) < total_nfes: gen += 1 succ_iter = np.zeros((nt, st_num)) fail_iter = np.zeros((nt, st_num)) for t in range(nt): if nfes_per_task[t] >= max_nfes_per_task[t]: continue # Calculate median objectives and CV for comparison cvs = np.sum(np.maximum(0, cons[t]), axis=1) if cons[t] is not None else np.zeros(len(objs[t])) median_obj = np.median(objs[t], axis=0) median_cv = np.median(cvs) # Prepare parent population (with potential transfer) parent_decs = decs[t].copy() # Knowledge Transfer if (self.t_num > 0 and (gen - 1) % self.sa_gap + 1 < (self.sa_gap - self.memory) and gen % self.t_gap == 0): # Get transfer solutions from other tasks transfer_decs = self._transfer(decs, t, problem.dims) if len(transfer_decs) > 0: # Randomly replace some parents with transferred solutions replace_indices = np.random.permutation(len(parent_decs))[:len(transfer_decs)] for i, idx in enumerate(replace_indices): parent_decs[idx] = transfer_decs[i] # Process each strategy start_idx = 0 for st in range(st_num): end_idx = start_idx + stn[t][st] if end_idx <= start_idx: start_idx = end_idx continue st_indices = list(range(start_idx, end_idx)) st_parent_decs = parent_decs[st_indices] if st == 0: # Strategy 1: GA + NSGA-II off_decs = self._generation_ga(st_parent_decs) off_objs, off_cons = evaluation_single(problem, off_decs, t) nfes_per_task[t] += len(off_decs) pbar.update(len(off_decs)) # Merge and NSGA-II selection merged_decs = np.vstack([decs[t][st_indices], off_decs]) merged_objs = np.vstack([objs[t][st_indices], off_objs]) merged_cons = np.vstack([cons[t][st_indices], off_cons]) if cons[t] is not None else off_cons rank, _, _ = self._nsga2_sort(merged_objs, merged_cons) selected_indices = np.argsort(rank)[:len(st_indices)] decs[t][st_indices] = merged_decs[selected_indices] objs[t][st_indices] = merged_objs[selected_indices] if cons[t] is not None: cons[t][st_indices] = merged_cons[selected_indices] else: # Strategy 2: DE + SPEA2 off_decs = self._generation_de(st_parent_decs) off_objs, off_cons = evaluation_single(problem, off_decs, t) nfes_per_task[t] += len(off_decs) pbar.update(len(off_decs)) # Merge and SPEA2 selection merged_decs = np.vstack([decs[t][st_indices], off_decs]) merged_objs = np.vstack([objs[t][st_indices], off_objs]) merged_cons = np.vstack([cons[t][st_indices], off_cons]) if cons[t] is not None else off_cons selected_decs, selected_objs, selected_cons = self._selection_spea2( merged_decs, merged_objs, merged_cons, len(st_indices) ) decs[t][st_indices] = selected_decs objs[t][st_indices] = selected_objs if cons[t] is not None: cons[t][st_indices] = selected_cons # Calculate success/failure current_cvs = np.sum(np.maximum(0, cons[t][st_indices]), axis=1) if cons[t] is not None else np.zeros(len(st_indices)) current_objs = objs[t][st_indices] # Success: CV < median_cv OR (CV == median_cv AND any obj < median_obj) succ_mask = (current_cvs < median_cv) | ( (current_cvs == median_cv) & np.any(current_objs < median_obj, axis=1) ) succ_iter[t, st] = np.sum(succ_mask) # Failure: CV > median_cv OR (CV == median_cv AND all obj > median_obj) fail_mask = (current_cvs > median_cv) | ( (current_cvs == median_cv) & np.all(current_objs > median_obj, axis=1) ) fail_iter[t, st] = np.sum(fail_mask) start_idx = end_idx # Append to history append_history(all_decs[t], decs[t], all_objs[t], objs[t], all_cons[t], cons[t]) # Update success/failure history succ_history.append(succ_iter) fail_history.append(fail_iter) # Trim history to memory length max_history_len = self.memory * nt if len(succ_history) > max_history_len: succ_history = succ_history[-max_history_len:] fail_history = fail_history[-max_history_len:] # Update strategy population sizes for t in range(nt): # Aggregate history for task t succ_t = np.zeros(st_num) fail_t = np.zeros(st_num) for i in range(len(succ_history)): succ_t += succ_history[i][t] fail_t += fail_history[i][t] # Calculate success probability succ_p = np.zeros(st_num) for st in range(st_num): total = succ_t[st] + fail_t[st] if total == 0: succ_p[st] = 0.01 else: succ_p[st] = succ_t[st] / total + 0.01 # Combine with previous allocation succ_old = np.array(stn[t]) / sum(stn[t]) succ_p = succ_old / 2 + succ_p succ_p = succ_p / np.sum(succ_p) # Update allocation every sa_gap generations if gen % self.sa_gap == 0: new_sizes = (succ_p * n_per_task[t]).astype(int) new_sizes[-1] = n_per_task[t] - np.sum(new_sizes[:-1]) stn[t] = list(new_sizes) # Shuffle population shuffle_indices = np.random.permutation(n_per_task[t]) decs[t] = decs[t][shuffle_indices] objs[t] = objs[t][shuffle_indices] if cons[t] is not None: cons[t] = cons[t][shuffle_indices] pbar.close() runtime = time.time() - start_time # Build and save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _transfer(self, decs, t, dims): """ Perform random knowledge transfer from other tasks. Parameters ---------- decs : list of np.ndarray Decision variables for all tasks t : int Current task index dims : list of int Dimensions for all tasks Returns ------- transfer_decs : np.ndarray Transferred decision variables """ nt = len(decs) if nt <= 1: return np.array([]) transfer_decs = [] other_tasks = [k for k in range(nt) if k != t] for _ in range(self.t_num): # Random task and random individual rand_t = other_tasks[np.random.randint(len(other_tasks))] rand_p = np.random.randint(len(decs[rand_t])) dec = decs[rand_t][rand_p].copy() # Align dimensions dec = align_dimensions(dec, dims[t]) transfer_decs.append(dec) return np.array(transfer_decs) def _generation_ga(self, parent_decs): """ Generate offspring using GA (SBX crossover + polynomial mutation). Parameters ---------- parent_decs : np.ndarray Parent decision variables of shape (pop_size, dim) Returns ------- off_decs : np.ndarray Offspring decision variables """ pop_size = len(parent_decs) if pop_size <= 1: # Only mutation off_decs = np.array([mutation(parent_decs[i].copy(), mu=self.ga_mum) for i in range(pop_size)]) return np.clip(off_decs, 0, 1) dim = parent_decs.shape[1] off_decs = np.zeros((pop_size, dim)) # Shuffle indices for pairing ind_order = np.random.permutation(pop_size) count = 0 for i in range(pop_size // 2): p1 = ind_order[i] p2 = ind_order[i + pop_size // 2] # Crossover off_dec1, off_dec2 = crossover(parent_decs[p1], parent_decs[p2], mu=self.ga_muc) # Mutation off_dec1 = mutation(off_dec1, mu=self.ga_mum) off_dec2 = mutation(off_dec2, mu=self.ga_mum) # Boundary handling off_decs[count] = np.clip(off_dec1, 0, 1) off_decs[count + 1] = np.clip(off_dec2, 0, 1) count += 2 # Handle odd population size if pop_size % 2 == 1: off_decs[count] = mutation(parent_decs[ind_order[-1]].copy(), mu=self.ga_mum) off_decs[count] = np.clip(off_decs[count], 0, 1) return off_decs[:pop_size] def _generation_de(self, parent_decs): """ Generate offspring using DE (DE/rand/1/bin). Parameters ---------- parent_decs : np.ndarray Parent decision variables of shape (pop_size, dim) Returns ------- off_decs : np.ndarray Offspring decision variables """ pop_size = len(parent_decs) dim = parent_decs.shape[1] if pop_size < 4: # Fallback to mutation only off_decs = np.array([mutation(parent_decs[i].copy(), mu=self.ga_mum) for i in range(pop_size)]) return np.clip(off_decs, 0, 1) off_decs = np.zeros((pop_size, dim)) for i in range(pop_size): # Select 3 different individuals (different from i) candidates = list(range(pop_size)) candidates.remove(i) selected = np.random.choice(candidates, 3, replace=False) x1, x2, x3 = selected # DE/rand/1 mutation mutant = parent_decs[x1] + self.de_f * (parent_decs[x2] - parent_decs[x3]) # DE binomial crossover j_rand = np.random.randint(dim) mask = np.random.rand(dim) < self.de_cr mask[j_rand] = True off_dec = np.where(mask, mutant, parent_decs[i]) # Boundary handling off_decs[i] = np.clip(off_dec, 0, 1) return off_decs def _nsga2_sort(self, objs, cons=None): """ Sort solutions based on NSGA-II criteria. Parameters ---------- objs : np.ndarray Objective values of shape (pop_size, n_obj) cons : np.ndarray, optional Constraint values of shape (pop_size, n_con) Returns ------- rank : np.ndarray Ranking of each solution front_no : np.ndarray Front number of each solution crowd_dis : np.ndarray Crowding distance of each solution """ pop_size = objs.shape[0] if cons is not None and cons.size > 0: front_no, _ = nd_sort(objs, cons, pop_size) else: front_no, _ = nd_sort(objs, pop_size) crowd_dis = crowding_distance(objs, front_no) sorted_indices = np.lexsort((-crowd_dis, front_no)) rank = np.empty(pop_size, dtype=int) rank[sorted_indices] = np.arange(pop_size) return rank, front_no, crowd_dis def _selection_spea2(self, decs, objs, cons, n): """ SPEA2 environmental selection. Parameters ---------- decs : np.ndarray Decision variables of shape (pop_size, dim) objs : np.ndarray Objective values of shape (pop_size, n_obj) cons : np.ndarray Constraint values of shape (pop_size, n_con) n : int Target population size Returns ------- selected_decs : np.ndarray Selected decision variables selected_objs : np.ndarray Selected objective values selected_cons : np.ndarray Selected constraint values """ pop_size = len(objs) if pop_size == 0 or n == 0: return decs[:0], objs[:0], cons[:0] if cons is not None else None n = min(n, pop_size) # Calculate constraint violation if cons is not None and cons.size > 0: cvs = np.sum(np.maximum(0, cons), axis=1) else: cvs = np.zeros(pop_size) # Calculate SPEA2 fitness fitness = self._cal_spea2_fitness(objs, cvs) # Environmental selection next_mask = fitness < 1 n_selected = np.sum(next_mask) if n_selected < n: sorted_indices = np.argsort(fitness) next_mask = np.zeros(pop_size, dtype=bool) next_mask[sorted_indices[:n]] = True elif n_selected > n: selected_indices = np.where(next_mask)[0] selected_objs_temp = objs[selected_indices] del_indices = self._truncation(selected_objs_temp, n_selected - n) next_mask[selected_indices[del_indices]] = False # Apply selection and sort by fitness selected_indices = np.where(next_mask)[0] sorted_fitness_indices = np.argsort(fitness[selected_indices]) selected_indices = selected_indices[sorted_fitness_indices] return decs[selected_indices], objs[selected_indices], cons[selected_indices] if cons is not None else None def _cal_spea2_fitness(self, objs, cvs): """ Calculate SPEA2 fitness values. Parameters ---------- objs : np.ndarray Objective values of shape (pop_size, n_obj) cvs : np.ndarray Constraint violations of shape (pop_size,) Returns ------- fitness : np.ndarray SPEA2 fitness values of shape (pop_size,) """ n = len(objs) if n == 0: return np.array([]) # Detect dominance relations dominate = np.zeros((n, n), dtype=bool) for i in range(n): for j in range(i + 1, n): if cvs[i] < cvs[j]: dominate[i, j] = True elif cvs[i] > cvs[j]: dominate[j, i] = True else: better_i = np.any(objs[i] < objs[j]) worse_i = np.any(objs[i] > objs[j]) if better_i and not worse_i: dominate[i, j] = True elif worse_i and not better_i: dominate[j, i] = True # Strength s = np.sum(dominate, axis=1) # Raw fitness r = np.zeros(n) for i in range(n): dominating_indices = np.where(dominate[:, i])[0] r[i] = np.sum(s[dominating_indices]) # Density distance = cdist(objs, objs) np.fill_diagonal(distance, np.inf) distance_sorted = np.sort(distance, axis=1) k_neighbor = int(np.floor(np.sqrt(n))) k_neighbor = max(0, min(k_neighbor, n - 1)) d = 1.0 / (distance_sorted[:, k_neighbor] + 2) return r + d def _truncation(self, objs, k): """ Truncation operator for SPEA2. Parameters ---------- objs : np.ndarray Objective values of shape (pop_size, n_obj) k : int Number of solutions to delete Returns ------- del_indices : np.ndarray Indices of solutions to delete """ n = len(objs) del_mask = np.zeros(n, dtype=bool) distance = cdist(objs, objs) np.fill_diagonal(distance, np.inf) while np.sum(del_mask) < k: remain_indices = np.where(~del_mask)[0] remain_dist = distance[remain_indices][:, remain_indices] sorted_dist = np.sort(remain_dist, axis=1) sorted_rows_indices = np.lexsort(np.rot90(sorted_dist)) del_idx_in_remain = sorted_rows_indices[0] del_mask[remain_indices[del_idx_in_remain]] = True return np.where(del_mask)[0]