Source code for ddmtolab.Algorithms.MTSO.MTEA_SaO

"""
Multi-task Evolutionary Algorithm with Self-adaptive Solvers (MTEA-SaO)

This module implements MTEA-SaO for multi-task single-objective optimization problems.

References
----------
    [1] Li, Yanchi, Wenyin Gong, and Shuijia Li. "Multitasking Optimization via an Adaptive Solver Multitasking Evolutionary Framework." Information Sciences (2022). https://doi.org/10.1016/j.ins.2022.10.099

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.02
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MTEA_SaO: """ Multi-task Evolutionary Algorithm with Self-adaptive Solvers for single-objective optimization. This algorithm features: - Two solver strategies: GA (SBX + PM) and DE (DE/rand/1/bin) - Self-adaptive solver selection based on success/failure history - Knowledge transfer between tasks via random solution injection - Adaptive population partitioning among solvers Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, t_gap=10, t_num=10, sa_gap=70, memory=30, ga_muc=2.0, ga_mum=5.0, de_f=0.5, de_cr=0.9, save_data=True, save_path='./Data', name='MTEA-SaO', disable_tqdm=True): """ Initialize MTEA-SaO algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) t_gap : int, optional Transfer gap - perform knowledge transfer every t_gap generations (default: 10) t_num : int, optional Number of solutions to transfer (default: 10) sa_gap : int, optional Self-adaptive gap - update solver allocation every sa_gap generations (default: 70) memory : int, optional Memory length for success/failure history (default: 30) ga_muc : float, optional Distribution index for GA crossover (SBX) (default: 2.0) ga_mum : float, optional Distribution index for GA mutation (PM) (default: 5.0) de_f : float, optional DE scaling factor (default: 0.5) de_cr : float, optional DE crossover probability (default: 0.9) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MTEASaO_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.t_gap = t_gap self.t_num = t_num self.sa_gap = sa_gap self.memory = memory self.ga_muc = ga_muc self.ga_mum = ga_mum self.de_f = de_f self.de_cr = de_cr self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MTEA-SaO algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Strategy settings st_num = 2 # Number of strategies (GA, DE) # Initialize strategy population sizes (equal split) # stn[t] = [size_strategy_1, size_strategy_2] stn = [] for t in range(nt): base_size = n_per_task[t] // st_num sizes = [base_size] * st_num sizes[-1] = n_per_task[t] - sum(sizes[:-1]) # Adjust last strategy size stn.append(sizes) # Success/failure history succ_history = [] # List of arrays with shape (nt, st_num) fail_history = [] # Progress bar total_nfes = sum(max_nfes_per_task) pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) gen = 0 # Main optimization loop while sum(nfes_per_task) < total_nfes: gen += 1 succ_iter = np.zeros((nt, st_num)) fail_iter = np.zeros((nt, st_num)) for t in range(nt): if nfes_per_task[t] >= max_nfes_per_task[t]: continue # Calculate median objectives and CV for comparison cvs = np.sum(np.maximum(0, cons[t]), axis=1) if cons[t] is not None and cons[t].size > 0 else np.zeros(len(objs[t])) median_obj = np.median(objs[t][:, 0]) median_cv = np.median(cvs) # Prepare parent population (with potential transfer) parent_decs = decs[t].copy() # Knowledge Transfer # Condition: t_num > 0 AND within the window before memory period AND at transfer gap if (self.t_num > 0 and (gen - 1) % self.sa_gap + 1 < (self.sa_gap - self.memory) and gen % self.t_gap == 0): # Get transfer solutions from other tasks transfer_decs = self._transfer(decs, t, problem.dims) if len(transfer_decs) > 0: # Randomly replace some parents with transferred solutions replace_indices = np.random.permutation(len(parent_decs))[:len(transfer_decs)] for i, idx in enumerate(replace_indices): parent_decs[idx] = transfer_decs[i] # Process each strategy start_idx = 0 for st in range(st_num): end_idx = start_idx + stn[t][st] if end_idx <= start_idx: start_idx = end_idx continue st_indices = list(range(start_idx, end_idx)) st_parent_decs = parent_decs[st_indices] if st == 0: # Strategy 1: GA (SBX + PM) with elitist selection off_decs = self._generation_ga(st_parent_decs) off_objs, off_cons = evaluation_single(problem, off_decs, t) nfes_per_task[t] += len(off_decs) pbar.update(len(off_decs)) # Elitist selection: compare parent vs offspring one-by-one for i, idx in enumerate(st_indices): parent_cv = cvs[idx] parent_obj = objs[t][idx, 0] off_cv = np.sum(np.maximum(0, off_cons[i])) if off_cons is not None and off_cons[i].size > 0 else 0 off_obj = off_objs[i, 0] # Constrained comparison: prefer lower CV, then lower objective if off_cv < parent_cv or (off_cv == parent_cv and off_obj < parent_obj): decs[t][idx] = off_decs[i] objs[t][idx] = off_objs[i] if cons[t] is not None: cons[t][idx] = off_cons[i] else: # Strategy 2: DE with tournament selection off_decs = self._generation_de(st_parent_decs) off_objs, off_cons = evaluation_single(problem, off_decs, t) nfes_per_task[t] += len(off_decs) pbar.update(len(off_decs)) # Tournament selection: compare parent vs offspring one-by-one for i, idx in enumerate(st_indices): parent_cv = cvs[idx] parent_obj = objs[t][idx, 0] off_cv = np.sum(np.maximum(0, off_cons[i])) if off_cons is not None and off_cons[i].size > 0 else 0 off_obj = off_objs[i, 0] # Constrained comparison: prefer lower CV, then lower objective if off_cv < parent_cv or (off_cv == parent_cv and off_obj <= parent_obj): decs[t][idx] = off_decs[i] objs[t][idx] = off_objs[i] if cons[t] is not None: cons[t][idx] = off_cons[i] # Calculate success/failure for this strategy # Recalculate CVs after potential updates current_cvs = np.sum(np.maximum(0, cons[t][st_indices]), axis=1) if cons[t] is not None and cons[t].size > 0 else np.zeros(len(st_indices)) current_objs = objs[t][st_indices, 0] # Success: CV < median_cv OR (CV == median_cv AND obj < median_obj) succ_mask = (current_cvs < median_cv) | ( (current_cvs == median_cv) & (current_objs < median_obj) ) succ_iter[t, st] = np.sum(succ_mask) # Failure: CV > median_cv OR (CV == median_cv AND obj > median_obj) fail_mask = (current_cvs > median_cv) | ( (current_cvs == median_cv) & (current_objs > median_obj) ) fail_iter[t, st] = np.sum(fail_mask) start_idx = end_idx # Append to history append_history(all_decs[t], decs[t], all_objs[t], objs[t], all_cons[t], cons[t]) # Update success/failure history succ_history.append(succ_iter) fail_history.append(fail_iter) # Trim history to memory length (per-task adjustment) max_history_len = self.memory * nt if len(succ_history) > max_history_len: succ_history = succ_history[-max_history_len:] fail_history = fail_history[-max_history_len:] # Update strategy population sizes for t in range(nt): # Aggregate history for task t (every nt-th entry belongs to all tasks) succ_t = np.zeros(st_num) fail_t = np.zeros(st_num) for i in range(len(succ_history)): succ_t += succ_history[i][t] fail_t += fail_history[i][t] # Calculate success probability succ_p = np.zeros(st_num) for st in range(st_num): total = succ_t[st] + fail_t[st] if total == 0: succ_p[st] = 0.01 else: succ_p[st] = succ_t[st] / total + 0.01 # Combine with previous allocation (momentum) succ_old = np.array(stn[t]) / sum(stn[t]) succ_p = succ_old / 2 + succ_p succ_p = succ_p / np.sum(succ_p) # Update allocation every sa_gap generations if gen % self.sa_gap == 0: new_sizes = (succ_p * n_per_task[t]).astype(int) new_sizes[-1] = n_per_task[t] - np.sum(new_sizes[:-1]) stn[t] = list(new_sizes) # Shuffle population to redistribute among strategies shuffle_indices = np.random.permutation(n_per_task[t]) decs[t] = decs[t][shuffle_indices] objs[t] = objs[t][shuffle_indices] if cons[t] is not None: cons[t] = cons[t][shuffle_indices] pbar.close() runtime = time.time() - start_time # Build and save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _transfer(self, decs, t, dims): """ Perform random knowledge transfer from other tasks. Parameters ---------- decs : list of np.ndarray Decision variables for all tasks t : int Current task index dims : list of int Dimensions for all tasks Returns ------- transfer_decs : np.ndarray Transferred decision variables aligned to target task dimension """ nt = len(decs) if nt <= 1: return np.array([]) transfer_decs = [] other_tasks = [k for k in range(nt) if k != t] for _ in range(self.t_num): # Random task and random individual rand_t = other_tasks[np.random.randint(len(other_tasks))] rand_p = np.random.randint(len(decs[rand_t])) dec = decs[rand_t][rand_p].copy() # Align dimensions to target task dec = align_dimensions(dec, dims[t]) transfer_decs.append(dec) return np.array(transfer_decs) def _generation_ga(self, parent_decs): """ Generate offspring using GA (SBX crossover + polynomial mutation). Parameters ---------- parent_decs : np.ndarray Parent decision variables of shape (pop_size, dim) Returns ------- off_decs : np.ndarray Offspring decision variables of shape (pop_size, dim) """ pop_size = len(parent_decs) if pop_size <= 1: # Only mutation for very small population off_decs = np.array([mutation(parent_decs[i].copy(), mu=self.ga_mum) for i in range(pop_size)]) return np.clip(off_decs, 0, 1) dim = parent_decs.shape[1] off_decs = np.zeros((pop_size, dim)) # Shuffle indices for pairing ind_order = np.random.permutation(pop_size) count = 0 for i in range(pop_size // 2): p1 = ind_order[i] p2 = ind_order[i + pop_size // 2] # Crossover off_dec1, off_dec2 = crossover(parent_decs[p1], parent_decs[p2], mu=self.ga_muc) # Mutation off_dec1 = mutation(off_dec1, mu=self.ga_mum) off_dec2 = mutation(off_dec2, mu=self.ga_mum) # Boundary handling off_decs[count] = np.clip(off_dec1, 0, 1) off_decs[count + 1] = np.clip(off_dec2, 0, 1) count += 2 # Handle odd population size if pop_size % 2 == 1: off_decs[count] = mutation(parent_decs[ind_order[-1]].copy(), mu=self.ga_mum) off_decs[count] = np.clip(off_decs[count], 0, 1) return off_decs[:pop_size] def _generation_de(self, parent_decs): """ Generate offspring using DE (DE/rand/1/bin). Parameters ---------- parent_decs : np.ndarray Parent decision variables of shape (pop_size, dim) Returns ------- off_decs : np.ndarray Offspring decision variables of shape (pop_size, dim) """ pop_size = len(parent_decs) dim = parent_decs.shape[1] if pop_size < 4: # Fallback to mutation only when population too small for DE off_decs = np.array([mutation(parent_decs[i].copy(), mu=self.ga_mum) for i in range(pop_size)]) return np.clip(off_decs, 0, 1) off_decs = np.zeros((pop_size, dim)) for i in range(pop_size): # Select 3 different individuals (different from i) candidates = list(range(pop_size)) candidates.remove(i) selected = np.random.choice(candidates, 3, replace=False) x1, x2, x3 = selected # DE/rand/1 mutation mutant = parent_decs[x1] + self.de_f * (parent_decs[x2] - parent_decs[x3]) # DE binomial crossover j_rand = np.random.randint(dim) mask = np.random.rand(dim) < self.de_cr mask[j_rand] = True # Ensure at least one dimension from mutant off_dec = np.where(mask, mutant, parent_decs[i]) # Boundary handling off_decs[i] = np.clip(off_dec, 0, 1) return off_decs