Source code for ddmtolab.Algorithms.MTMO.MTEA_D_DN

"""
Multi-task Multi-objective Evolutionary Algorithm Based on Decomposition with Dynamic Neighborhood (MTEA-D-DN)

This module implements MTEA-D-DN for multi-task multi-objective optimization problems.

References
----------
    [1] Wang, Xianpeng, Zhiming Dong, Lixin Tang, and Qingfu Zhang. "Multiobjective Multitask Optimization - Neighborhood as a Bridge for Knowledge Transfer." IEEE Transactions on Evolutionary Computation 27.1 (2023): 155-169.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.18
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.spatial.distance import pdist, squareform
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MTEA_D_DN: """ Multi-task Multi-objective Evolutionary Algorithm Based on Decomposition with Dynamic Neighborhood. This algorithm uses neighborhood structure as a bridge for knowledge transfer between tasks. It maintains two types of neighborhoods: - B: Primary neighborhood within the same task (based on weight vector distance) - B2: Secondary neighborhood from other tasks (for knowledge transfer) Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, beta=0.2, F=0.5, CR=0.9, mum=20.0, save_data=True, save_path='./Data', name='MTEA-D-DN', disable_tqdm=True): """ Initialize MTEA-D-DN algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) beta : float, optional Probability of choosing parents locally (from neighborhood) (default: 0.2) F : float, optional Scaling factor for DE mutation (default: 0.5) CR : float, optional Crossover rate for DE (default: 0.9) mum : float, optional Distribution index for polynomial mutation (default: 20.0) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MTEADDN_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.beta = beta self.F = F self.CR = CR self.mum = mum self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MTEA-D-DN algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks no = problem.n_objs n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate uniformly distributed weight vectors for each task W = [] N = [] # Actual population sizes (may differ from n_per_task due to uniform point generation) DT = [] # Neighborhood sizes B = [] # Primary neighbor indices (within same task) for t in range(nt): w_t, n_t = uniform_point(n_per_task[t], no[t]) W.append(w_t) N.append(n_t) n_per_task[t] = n_t # Neighborhood size: ceil(N/20) dt = int(np.ceil(n_t / 20)) DT.append(dt) # Detect neighbors based on Euclidean distance of weight vectors distances = squareform(pdist(w_t)) neighbors = np.argsort(distances, axis=1)[:, :dt] B.append(neighbors) # Initialize population and evaluate for each task decs = initialization(problem, N) objs, cons = evaluation(problem, decs) nfes_per_task = N.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Initialize ideal points for each task Z = [] for t in range(nt): Z.append(np.min(objs[t], axis=0)) # Initialize secondary neighborhoods (B2) for knowledge transfer B2k = [] # Target task indices for secondary neighborhood B2 = [] # Secondary neighbor indices (from other tasks) for t in range(nt): tar_pool = [k for k in range(nt) if k != t] b2k_t = [] b2_t = [] for i in range(N[t]): # Randomly select a target task target_task = tar_pool[np.random.randint(len(tar_pool))] b2k_t.append(target_task) # Randomly select DT[t] neighbors from the target task b2_t.append(np.random.permutation(N[target_task])[:DT[t]]) B2k.append(b2k_t) B2.append(b2_t) # Progress bar total_nfes = sum(max_nfes_per_task) pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) # Main optimization loop while sum(nfes_per_task) < total_nfes: active_tasks = [t for t in range(nt) if nfes_per_task[t] < max_nfes_per_task[t]] if not active_tasks: break for t in active_tasks: for i in range(N[t]): if rand() < self.beta: # Local mating: choose parents from neighborhoods (same + other task) P1 = B[t][i].copy() # Primary neighborhood indices P2 = B2[t][i].copy() # Secondary neighborhood indices target_task = B2k[t][i] # Create combined parent pool with task labels tasks = np.concatenate([ np.full(len(P1), t, dtype=int), np.full(len(P2), target_task, dtype=int) ]) P = np.concatenate([P1, P2]) # Shuffle the combined pool perm = np.random.permutation(len(tasks)) tasks = tasks[perm] P = P[perm] # Generate offspring using DE/rand/1/bin + polynomial mutation parent_decs = [decs[t][i], decs[tasks[0]][P[0]], decs[tasks[1]][P[1]]] offspring_dec = self._generation(parent_decs, problem.dims[t]) if rand() < 0.5: # Knowledge transfer: evaluate on target task and update k = target_task offspring_dec_padded = align_dimensions(offspring_dec, problem.dims[k]) off_obj, off_con = evaluation_single(problem, offspring_dec_padded.reshape(1, -1), k) nfes_per_task[k] += 1 pbar.update(1) # Update ideal point Z[k] = np.minimum(Z[k], off_obj[0]) # Tchebycheff scalarization for neighbors in target task g_old = self._tchebycheff(objs[k][P2], Z[k], W[k][P2]) g_new = self._tchebycheff(np.tile(off_obj, (len(P2), 1)), Z[k], W[k][P2]) # Update neighbors where offspring is better update_mask = g_old >= g_new for idx, p_idx in enumerate(P2): if update_mask[idx]: decs[k][p_idx] = offspring_dec_padded objs[k][p_idx] = off_obj[0] if cons[k] is not None: cons[k][p_idx] = off_con[0] # Update secondary neighborhood if no improvement if not np.any(update_mask): # Re-initialize secondary neighborhood for this subproblem tar_pool = [j for j in range(nt) if j != t] B2k[t][i] = tar_pool[np.random.randint(len(tar_pool))] B2[t][i] = np.random.permutation(N[B2k[t][i]])[:DT[t]] elif np.any(update_mask): # Keep only successful neighbors B2[t][i] = P2[update_mask] else: # Evaluate on current task off_obj, off_con = evaluation_single(problem, offspring_dec.reshape(1, -1), t) nfes_per_task[t] += 1 pbar.update(1) # Update ideal point Z[t] = np.minimum(Z[t], off_obj[0]) # Tchebycheff scalarization for primary neighbors g_old = self._tchebycheff(objs[t][P1], Z[t], W[t][P1]) g_new = self._tchebycheff(np.tile(off_obj, (len(P1), 1)), Z[t], W[t][P1]) # Update neighbors where offspring is better update_mask = g_old >= g_new for idx, p_idx in enumerate(P1): if update_mask[idx]: decs[t][p_idx] = offspring_dec objs[t][p_idx] = off_obj[0] if cons[t] is not None: cons[t][p_idx] = off_con[0] else: # Global mating: choose parents from entire population P = np.random.permutation(N[t]) # Generate offspring parent_decs = [decs[t][i], decs[t][P[0]], decs[t][P[1]]] offspring_dec = self._generation(parent_decs, problem.dims[t]) # Evaluate offspring off_obj, off_con = evaluation_single(problem, offspring_dec.reshape(1, -1), t) nfes_per_task[t] += 1 pbar.update(1) # Update ideal point Z[t] = np.minimum(Z[t], off_obj[0]) # Tchebycheff scalarization g_old = self._tchebycheff(objs[t][P], Z[t], W[t][P]) g_new = self._tchebycheff(np.tile(off_obj, (len(P), 1)), Z[t], W[t][P]) # Update neighbors where offspring is better update_mask = g_old >= g_new for idx, p_idx in enumerate(P): if update_mask[idx]: decs[t][p_idx] = offspring_dec objs[t][p_idx] = off_obj[0] if cons[t] is not None: cons[t][p_idx] = off_con[0] # Append to history append_history(all_decs[t], decs[t], all_objs[t], objs[t], all_cons[t], cons[t]) pbar.close() runtime = time.time() - start_time # Build and save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _generation(self, parent_decs, target_dim): """ Generate offspring using DE/rand/1/bin mutation and polynomial mutation. Parameters ---------- parent_decs : list of np.ndarray Parent decision variables (3 parents with potentially different dimensions) parent_decs[0]: target vector parent_decs[1], parent_decs[2]: difference vectors target_dim : int Target dimension for offspring Returns ------- offspring_dec : np.ndarray Offspring decision variable of shape (target_dim,) """ # Align all parents to target dimension aligned_parents = [align_dimensions(p, target_dim) for p in parent_decs] # DE mutation: v = x1 + F * (x2 - x3) mutant = aligned_parents[0] + self.F * (aligned_parents[1] - aligned_parents[2]) # DE crossover (binomial) dim = target_dim j_rand = np.random.randint(dim) mask = np.random.rand(dim) < self.CR mask[j_rand] = True offspring_dec = np.where(mask, mutant, aligned_parents[0]) # Polynomial mutation offspring_dec = mutation(offspring_dec, mu=self.mum) # Boundary handling offspring_dec = np.clip(offspring_dec, 0, 1) return offspring_dec def _tchebycheff(self, objs, z, weights): """ Calculate Tchebycheff scalarization values. Parameters ---------- objs : np.ndarray Objective values of shape (N, M) z : np.ndarray Ideal point of shape (M,) weights : np.ndarray Weight vectors of shape (N, M) Returns ------- g : np.ndarray Tchebycheff values of shape (N,) """ # Avoid division by zero weights_safe = np.maximum(weights, 1e-10) return np.max(np.abs(objs - z) * weights_safe, axis=1)
def rand(): """Generate a random number in [0, 1).""" return np.random.rand()