Source code for ddmtolab.Algorithms.MTMO.MTEA_DCK

"""
Multi-task Evolutionary Algorithm via Diversity- and Convergence-Oriented Knowledge Transfer (MTEA-DCK)

This module implements MTEA-DCK for multi-task multi-objective optimization problems.

References
----------
    [1] Y. Li, D. Li, W. Gong, and Q. Gu, "Multiobjective Multitask Optimization via Diversity- and Convergence-Oriented Knowledge Transfer," IEEE Trans. Syst., Man, Cybern., Syst., vol. 55, no. 3, pp. 2367-2379, 2025.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MTEA_DCK: """ Multi-task Evolutionary Algorithm via Diversity- and Convergence-Oriented Knowledge Transfer. This algorithm features: - Competitive Swarm Optimizer (CSO) framework with winner/loser pairing - DE-based generation with diversified knowledge transfer (DKT) via region mapping - CSO-based generation with convergent knowledge transfer (CKT) via fragment swap - Adaptive per-individual parameters (F, CR, TRD) with Cauchy/Normal perturbation - SPEA2 environmental selection Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, Tau=0.1, TRC0=0.3, save_data=True, save_path='./Data', name='MTEA-DCK', disable_tqdm=True): """ Initialize MTEA-DCK algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) Tau : float, optional Probability of random parameter reset (default: 0.1) TRC0 : float, optional Initial transfer rate for convergent knowledge transfer (default: 0.3) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'MTEA-DCK') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.Tau = Tau self.TRC0 = TRC0 self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MTEA-DCK algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims d_max = max(dims) n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize populations in unified d_max space decs = [] objs = [] cons = [] for t in range(nt): decs_t = np.random.rand(n_per_task[t], d_max) objs_t, cons_t = evaluation_single(problem, decs_t[:, :dims[t]], t) decs.append(decs_t) objs.append(objs_t) cons.append(cons_t) nfes_per_task = [n_per_task[t] for t in range(nt)] # Compute SPEA2 fitness and initialize per-individual parameters fitness = [] vel = [] F_param = [] CR_param = [] TRD_param = [] for t in range(nt): fitness.append(spea2_fitness(objs[t], cons[t])) vel.append(np.zeros((n_per_task[t], d_max))) F_param.append(np.random.rand(n_per_task[t])) CR_param.append(np.random.rand(n_per_task[t])) TRD_param.append(np.random.rand(n_per_task[t])) # Native-space history all_decs = [[d[:, :dims[t]].copy()] for t, d in enumerate(decs)] all_objs = [[o.copy()] for o in objs] all_cons = [[c.copy()] for c in cons] total_nfes = sum(max_nfes_per_task) pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < total_nfes: # Winner/Loser tournament for each task winners = [] losers = [] uni_upper = [] uni_lower = [] for t in range(nt): N_t = n_per_task[t] half = N_t // 2 rnd_idx = np.random.permutation(N_t) lose_idx = rnd_idx[:half].copy() win_idx = rnd_idx[half:2 * half].copy() # Swap: ensure winner has better (lower) fitness swap_mask = fitness[t][win_idx] > fitness[t][lose_idx] temp = win_idx[swap_mask].copy() win_idx[swap_mask] = lose_idx[swap_mask] lose_idx[swap_mask] = temp winners.append(win_idx) losers.append(lose_idx) # Winner region bounds uni_upper.append(np.max(decs[t][win_idx], axis=0)) uni_lower.append(np.min(decs[t][win_idx], axis=0)) factor = sum(nfes_per_task) / total_nfes for t in range(nt): if nfes_per_task[t] >= max_nfes_per_task[t]: continue N_t = n_per_task[t] union_idx = np.concatenate([winners[t], losers[t]]) # DE generation with diversified KT (from winners) off_de = self._generation_de( decs, vel, F_param, CR_param, TRD_param, winners, losers, union_idx, t, nt, uni_upper, uni_lower) off_de_decs, off_de_V, off_de_F, off_de_CR, off_de_TRD = off_de # Evaluate DE offspring off_de_objs, off_de_cons = evaluation_single( problem, off_de_decs[:, :dims[t]], t) # CSO generation with convergent KT (from losers) off_cso = self._generation_cso( decs, vel, F_param, CR_param, TRD_param, winners, losers, t, nt, factor) off_cso_decs, off_cso_V, off_cso_F, off_cso_CR, off_cso_TRD = off_cso # Evaluate CSO offspring off_cso_objs, off_cso_cons = evaluation_single( problem, off_cso_decs[:, :dims[t]], t) n_new = len(off_de_objs) + len(off_cso_objs) nfes_per_task[t] += n_new pbar.update(n_new) # Merge all populations merged_decs = np.vstack([decs[t], off_de_decs, off_cso_decs]) merged_objs = np.vstack([objs[t], off_de_objs, off_cso_objs]) merged_cons = np.vstack([cons[t], off_de_cons, off_cso_cons]) merged_V = np.vstack([vel[t], off_de_V, off_cso_V]) merged_F = np.concatenate([F_param[t], off_de_F, off_cso_F]) merged_CR = np.concatenate([CR_param[t], off_de_CR, off_cso_CR]) merged_TRD = np.concatenate([TRD_param[t], off_de_TRD, off_cso_TRD]) # SPEA2 environmental selection idx = self._spea2_select(merged_objs, merged_cons, N_t) decs[t] = merged_decs[idx] objs[t] = merged_objs[idx] cons[t] = merged_cons[idx] vel[t] = merged_V[idx] F_param[t] = merged_F[idx] CR_param[t] = merged_CR[idx] TRD_param[t] = merged_TRD[idx] fitness[t] = spea2_fitness(objs[t], cons[t]) # Append native-space history append_history(all_decs[t], decs[t][:, :dims[t]], all_objs[t], objs[t], all_cons[t], cons[t]) pbar.close() runtime = time.time() - start_time results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _spea2_select(self, objs, cons, N): """ SPEA2 environmental selection. Parameters ---------- objs : np.ndarray Objective values, shape (pop_size, n_objs) cons : np.ndarray Constraint violations, shape (pop_size, n_cons) N : int Number of individuals to select Returns ------- np.ndarray Indices of selected individuals """ fit = spea2_fitness(objs, cons) next_mask = fit < 1 n_nd = np.sum(next_mask) if n_nd <= N: rank = np.argsort(fit) return rank[:N] else: nd_indices = np.where(next_mask)[0] selected = spea2_truncation(objs[next_mask], N) return nd_indices[selected] def _generation_de(self, decs, vel, F_param, CR_param, TRD_param, winners, losers, union_idx, t, nt, uni_upper, uni_lower): """ DE generation with diversified knowledge transfer. For each winner individual, generates an offspring using DE/current-to-rand/1 with optional cross-task diversified knowledge transfer via region mapping and particle reversal. Parameters ---------- decs : list of np.ndarray Decision variables per task vel : list of np.ndarray Velocities per task F_param, CR_param, TRD_param : list of np.ndarray Adaptive parameters per task winners, losers : list of np.ndarray Winner and loser indices per task union_idx : np.ndarray Union of winner and loser indices for task t t : int Current task index nt : int Number of tasks uni_upper, uni_lower : list of np.ndarray Winner region bounds per task Returns ------- tuple (off_decs, off_V, off_F, off_CR, off_TRD) """ n_win = len(winners[t]) d = decs[t].shape[1] task_pool = [k for k in range(nt) if k != t] off_decs = np.zeros((n_win, d)) off_V = np.zeros((n_win, d)) off_F = np.zeros(n_win) off_CR = np.zeros(n_win) off_TRD = np.zeros(n_win) for i in range(n_win): wi = winners[t][i] # Adaptive parameter perturbation new_F = self._cauchy_rand(F_param[t][wi], 0.1) while new_F <= 0: new_F = self._cauchy_rand(F_param[t][wi], 0.1) new_F = min(1.0, new_F) new_CR = np.clip(np.random.normal(CR_param[t][wi], 0.1), 0, 1) new_TRD = np.clip(np.random.normal(TRD_param[t][wi], 0.1), 0, 1) # Tau-based random reset if np.random.rand() < self.Tau: new_F = np.random.rand() if np.random.rand() < self.Tau: new_CR = np.random.rand() if np.random.rand() < self.Tau: new_TRD = np.random.rand() off_F[i] = new_F off_CR[i] = new_CR off_TRD[i] = new_TRD # Select donor indices x1 = np.random.randint(n_win) while x1 == i: x1 = np.random.randint(n_win) x2 = np.random.randint(n_win) while x2 == i or x2 == x1: x2 = np.random.randint(n_win) x3 = np.random.randint(len(union_idx)) while x3 == i or x3 == x1 or x3 == x2: x3 = np.random.randint(len(union_idx)) w_x1 = winners[t][x1] w_x2 = winners[t][x2] u_x3 = union_idx[x3] if np.random.rand() < new_TRD and len(task_pool) > 0: # Diversified Knowledge Transfer k = task_pool[np.random.randint(len(task_pool))] src_win_idx = x1 % len(winners[k]) src_i = winners[k][src_win_idx] diver_dec = decs[k][src_i].copy() diver_v = vel[k][src_i].copy() # Particle reversal (50% chance) if np.random.rand() < 0.5: diver_dec = (uni_lower[k] + uni_upper[k]) - diver_dec diver_v = -diver_v # Region mapping: source winner region -> target winner region range_k = uni_upper[k] - uni_lower[k] range_t = uni_upper[t] - uni_lower[t] safe = range_k > 1e-20 diver_dec_mapped = diver_dec.copy() diver_dec_mapped[safe] = ( (diver_dec[safe] - uni_lower[k][safe]) / range_k[safe] * range_t[safe] + uni_lower[t][safe]) diver_dec_mapped[~safe] = uni_lower[t][~safe] # Polynomial mutation on mapped dec diver_dec_mapped = mutation(diver_dec_mapped, mu=20) # Velocity mapping diver_v_mapped = diver_v.copy() diver_v_mapped[safe] = diver_v[safe] / range_k[safe] * range_t[safe] diver_v_mapped[~safe] = 0.0 # DE mutation with diversified source off_dec = (diver_dec_mapped + new_F * (decs[t][w_x1] - diver_dec_mapped) + new_F * (decs[t][w_x2] - decs[t][u_x3])) off_v = (diver_v_mapped + new_F * (vel[t][w_x1] - diver_v_mapped) + new_F * (vel[t][w_x2] - vel[t][u_x3])) else: # Standard DE/current-to-rand/1 off_dec = (decs[t][wi] + new_F * (decs[t][w_x1] - decs[t][wi]) + new_F * (decs[t][w_x2] - decs[t][u_x3])) off_v = (vel[t][wi] + new_F * (vel[t][w_x1] - vel[t][wi]) + new_F * (vel[t][w_x2] - vel[t][u_x3])) # DE binomial crossover on [Dec; V] together mutant = np.vstack([off_dec.reshape(1, -1), off_v.reshape(1, -1)]) parent = np.vstack([decs[t][wi].reshape(1, -1), vel[t][wi].reshape(1, -1)]) crossed = self._de_crossover(mutant, parent, new_CR) off_dec = crossed[0] off_v = crossed[1] # Boundary repair (only for Dec, using winner as parent) off_dec = self._boundary_repair(off_dec, decs[t][wi]) off_decs[i] = off_dec off_V[i] = off_v return off_decs, off_V, off_F, off_CR, off_TRD def _generation_cso(self, decs, vel, F_param, CR_param, TRD_param, winners, losers, t, nt, factor): """ CSO generation with convergent knowledge transfer. For each loser individual, generates an offspring using velocity-based position update toward the paired winner, with optional cross-task convergent knowledge transfer via fragment swap. Parameters ---------- decs : list of np.ndarray Decision variables per task vel : list of np.ndarray Velocities per task F_param, CR_param, TRD_param : list of np.ndarray Adaptive parameters per task winners, losers : list of np.ndarray Winner and loser indices per task t : int Current task index nt : int Number of tasks factor : float Progress ratio (sum_nfes / total_nfes) Returns ------- tuple (off_decs, off_V, off_F, off_CR, off_TRD) """ n_lose = len(losers[t]) d = decs[t].shape[1] task_pool = [k for k in range(nt) if k != t] TRC = self.TRC0 * ((1 - factor) ** 2) off_decs = np.zeros((n_lose, d)) off_V = np.zeros((n_lose, d)) off_F = np.zeros(n_lose) off_CR = np.zeros(n_lose) off_TRD = np.zeros(n_lose) for i in range(n_lose): li = losers[t][i] wi = winners[t][i] # Parameter learning from winner off_F[i] = F_param[t][wi] off_CR[i] = CR_param[t][wi] off_TRD[i] = TRD_param[t][wi] # Velocity update if np.random.rand() < TRC and len(task_pool) > 0: # Convergent Knowledge Transfer k = task_pool[np.random.randint(len(task_pool))] src_win_idx = np.random.randint(len(winners[k])) conver_dec = decs[k][winners[k][src_win_idx]].copy() # Fragment swap: replace some dims with target winner swap_mask = np.random.rand(d) < np.random.rand() conver_dec[swap_mask] = decs[t][wi][swap_mask] new_v = (np.random.rand() * vel[t][li] + np.random.rand() * (conver_dec - decs[t][li])) else: # Standard CSO velocity update toward winner new_v = (np.random.rand() * vel[t][li] + np.random.rand() * (decs[t][wi] - decs[t][li])) # Position update off_dec = decs[t][li] + new_v # Boundary repair (using loser as parent) off_dec = self._boundary_repair(off_dec, decs[t][li]) off_decs[i] = off_dec off_V[i] = new_v return off_decs, off_V, off_F, off_CR, off_TRD @staticmethod def _cauchy_rand(loc, scale): """Generate a Cauchy random number with given location and scale.""" return loc + scale * np.random.standard_cauchy() @staticmethod def _de_crossover(mutant, parent, cr): """ Binomial crossover for 2xD matrix (Dec and V together). Parameters ---------- mutant : np.ndarray Mutant vectors, shape (2, D) parent : np.ndarray Parent vectors, shape (2, D) cr : float Crossover rate Returns ------- np.ndarray Crossed-over vectors, shape (2, D) """ n_rows, d = mutant.shape result = parent.copy() mask = np.random.rand(n_rows, d) < cr for r in range(n_rows): j_rand = np.random.randint(d) mask[r, j_rand] = True result[mask] = mutant[mask] return result @staticmethod def _boundary_repair(dec, parent_dec): """ Random repair boundary handling. For lower boundary violations, replace with random value in [0, parent]. For upper boundary violations, replace with random value in [parent, 1]. Parameters ---------- dec : np.ndarray Decision vector to repair parent_dec : np.ndarray Parent decision vector for repair reference Returns ------- np.ndarray Repaired decision vector in [0, 1] """ d = len(dec) vio_low = dec < 0 if np.any(vio_low): rnd_lower = np.random.rand(d) * parent_dec dec[vio_low] = rnd_lower[vio_low] vio_up = dec > 1 if np.any(vio_up): rnd_upper = parent_dec + np.random.rand(d) * (1 - parent_dec) dec[vio_up] = rnd_upper[vio_up] return np.clip(dec, 0, 1)