Source code for ddmtolab.Algorithms.MTMO.MTDE_MKTA

"""
Multi-objective Multi-task Differential Evolution with Multiple Knowledge Types and Transfer Adaptation (MTDE-MKTA)

This module implements MTDE-MKTA for multi-task multi-objective optimization problems.

References
----------
    [1] Li, Yanchi, and Wenyin Gong. "Multiobjective Multitask Optimization With Multiple Knowledge Types and Transfer Adaptation." IEEE Transactions on Evolutionary Computation 29.1 (2025): 205-216.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.18
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MTDE_MKTA: """ Multi-objective Multi-task Differential Evolution with Multiple Knowledge Types and Transfer Adaptation. This algorithm features: - Self-adaptive parameters (F, CR, TR, KP) for each individual - Rank-based DE parent selection - Two knowledge transfer types: direct transfer and distribution-based transfer Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, tau1=0.2, tau2=0.1, save_data=True, save_path='./Data', name='MTDE-MKTA', disable_tqdm=True): """ Initialize MTDE-MKTA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) tau1 : float, optional Mutation probability for F and CR parameters (default: 0.2) tau2 : float, optional Mutation probability for TR and KP parameters (default: 0.1) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MTDEMKTA_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.tau1 = tau1 self.tau2 = tau2 self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MTDE-MKTA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Initialize adaptive parameters for each individual # params[t] is a dict with keys: 'F', 'CR', 'TR', 'KP' params = [] for t in range(nt): params_t = { 'F': 0.2 + np.random.rand(n_per_task[t]), # F in [0.2, 1.2] 'CR': np.random.rand(n_per_task[t]), # CR in [0, 1] 'TR': np.random.rand(n_per_task[t]), # Transfer rate in [0, 1] 'KP': np.random.rand(n_per_task[t]) # Knowledge type proportion in [0, 1] } params.append(params_t) # Initial SPEA2 selection and get fitness fitness = [] for t in range(nt): decs[t], objs[t], cons[t], params[t], fit_t = self._selection_spea2( decs[t], objs[t], cons[t], params[t], n_per_task[t] ) fitness.append(fit_t) # Initialize distribution models for each task models = [] for t in range(nt): model_t = { 'mean': np.mean(decs[t], axis=0), 'std': np.std(decs[t], axis=0) + 1e-100 } models.append(model_t) # Progress bar total_nfes = sum(max_nfes_per_task) pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) # Main optimization loop while sum(nfes_per_task) < total_nfes: active_tasks = [t for t in range(nt) if nfes_per_task[t] < max_nfes_per_task[t]] if not active_tasks: break # Update ranks for all tasks based on fitness ranks = [] for t in range(nt): # Get rank from fitness (lower fitness = better rank) sorted_indices = np.argsort(fitness[t]) rank_t = np.empty(len(fitness[t]), dtype=int) rank_t[sorted_indices] = np.arange(len(fitness[t])) ranks.append(rank_t) # Update distribution models with exponential moving average alpha = 0.5 for t in range(nt): models[t]['mean'] = alpha * models[t]['mean'] + (1 - alpha) * np.mean(decs[t], axis=0) models[t]['std'] = alpha * models[t]['std'] + (1 - alpha) * (np.std(decs[t], axis=0) + 1e-100) # Generate offspring for all tasks off_decs_all = [] off_params_all = [] for t in active_tasks: off_decs_t, off_params_t = self._generation( decs, params, ranks, models, t, n_per_task ) off_decs_all.append(off_decs_t) off_params_all.append(off_params_t) # Evaluate and select for each task for idx, t in enumerate(active_tasks): off_decs_t = off_decs_all[idx] off_params_t = off_params_all[idx] # Evaluate offspring off_objs_t, off_cons_t = evaluation_single(problem, off_decs_t, t) nfes_per_task[t] += len(off_decs_t) pbar.update(len(off_decs_t)) # Merge parents and offspring merged_decs = np.vstack([decs[t], off_decs_t]) merged_objs = np.vstack([objs[t], off_objs_t]) merged_cons = np.vstack([cons[t], off_cons_t]) if cons[t] is not None else off_cons_t merged_params = { 'F': np.concatenate([params[t]['F'], off_params_t['F']]), 'CR': np.concatenate([params[t]['CR'], off_params_t['CR']]), 'TR': np.concatenate([params[t]['TR'], off_params_t['TR']]), 'KP': np.concatenate([params[t]['KP'], off_params_t['KP']]) } # SPEA2 selection decs[t], objs[t], cons[t], params[t], fitness[t] = self._selection_spea2( merged_decs, merged_objs, merged_cons, merged_params, n_per_task[t] ) # Append to history append_history(all_decs[t], decs[t], all_objs[t], objs[t], all_cons[t], cons[t]) pbar.close() runtime = time.time() - start_time # Build and save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _generation(self, decs, params, ranks, models, t, n_per_task): """ Generate offspring for task t using rank-based DE with knowledge transfer. Parameters ---------- decs : list of np.ndarray Decision variables for all tasks params : list of dict Adaptive parameters for all tasks ranks : list of np.ndarray Ranks for all tasks (based on fitness) models : list of dict Distribution models for all tasks t : int Current task index n_per_task : list of int Population sizes for all tasks Returns ------- off_decs : np.ndarray Offspring decision variables of shape (N, dim) off_params : dict Offspring adaptive parameters """ Np = n_per_task[t] nt = len(decs) dim = decs[t].shape[1] off_decs = np.zeros((Np, dim)) off_params = { 'F': np.zeros(Np), 'CR': np.zeros(Np), 'TR': np.zeros(Np), 'KP': np.zeros(Np) } for i in range(Np): # Parameter disturbance with Gaussian noise off_F = np.random.normal(params[t]['F'][i], 0.1) off_F = np.clip(off_F, 0.2, 1.2) off_CR = np.random.normal(params[t]['CR'][i], 0.1) off_CR = np.clip(off_CR, 0, 1) off_TR = np.random.normal(params[t]['TR'][i], 0.1) off_TR = np.clip(off_TR, 0, 1) off_KP = np.random.normal(params[t]['KP'][i], 0.1) # Cyclic boundary for KP if off_KP < 0: off_KP = 1 + off_KP elif off_KP > 1: off_KP = off_KP - 1 # Parameter mutation with probability if np.random.rand() < self.tau1: off_F = 0.2 + np.random.rand() if np.random.rand() < self.tau1: off_CR = np.random.rand() if np.random.rand() < self.tau2: off_TR = np.random.rand() if np.random.rand() < self.tau2: off_KP = np.random.rand() off_params['F'][i] = off_F off_params['CR'][i] = off_CR off_params['TR'][i] = off_TR off_params['KP'][i] = off_KP # Rank-based DE parent selection # x1: selected with probability proportional to (Np - rank) / Np x1 = self._rank_selection(ranks[t], Np, exclude=[i]) x2 = self._rank_selection(ranks[t], Np, exclude=[i, x1]) x3 = self._random_selection(Np, exclude=[i, x1, x2]) xDeci = decs[t][i] xDec1 = decs[t][x1] xDec2 = decs[t][x2] xDec3 = decs[t][x3] # Knowledge transfer if np.random.rand() < off_TR: # Select a helper task (different from current task) k = np.random.randint(nt) while k == t: k = np.random.randint(nt) Np_k = n_per_task[k] # Two types of knowledge transfer based on KP if off_KP > 0.5: # Type 1: Direct transfer - randomly select a solution from helper task xDeck = decs[k][np.random.randint(Np_k)] # Align dimensions to current task xDeck = align_dimensions(xDeck, dim) else: # Type 2: Distribution-based transfer # Transform solution from helper task's distribution to current task's distribution xDeck = decs[k][np.random.randint(Np_k)] dim_k = len(xDeck) # Standardize using helper task's distribution xDeck_normalized = (xDeck - models[k]['mean']) / models[k]['std'] # Align dimensions after normalization xDeck_normalized_aligned = align_dimensions(xDeck_normalized, dim) # Get aligned mean and std for current task mean_t = models[t]['mean'] std_t = models[t]['std'] # Transform to current task's distribution xDeck = mean_t + std_t * xDeck_normalized_aligned # Use transferred solution as x2 xDec2 = xDeck # DE/rand/1 mutation mutant = xDec1 + off_F * (xDec2 - xDec3) # DE binomial crossover j_rand = np.random.randint(dim) mask = np.random.rand(dim) < off_CR mask[j_rand] = True offspring_dec = np.where(mask, mutant, xDeci) # Boundary handling offspring_dec = np.clip(offspring_dec, 0, 1) off_decs[i] = offspring_dec return off_decs, off_params def _rank_selection(self, rank, Np, exclude=None): """ Rank-based selection: select individual with probability proportional to (Np - rank) / Np. Parameters ---------- rank : np.ndarray Ranks of individuals (0-indexed, lower is better) Np : int Population size exclude : list, optional Indices to exclude from selection Returns ------- selected : int Selected individual index """ if exclude is None: exclude = [] max_attempts = 1000 for _ in range(max_attempts): x = np.random.randint(Np) # Accept with probability (Np - rank[x]) / Np # Higher probability for lower rank (better individuals) if x not in exclude and np.random.rand() < (Np - rank[x]) / Np: return x # Fallback: random selection excluding specified indices candidates = [i for i in range(Np) if i not in exclude] return np.random.choice(candidates) def _random_selection(self, Np, exclude=None): """ Random selection excluding specified indices. Parameters ---------- Np : int Population size exclude : list, optional Indices to exclude from selection Returns ------- selected : int Selected individual index """ if exclude is None: exclude = [] candidates = [i for i in range(Np) if i not in exclude] return np.random.choice(candidates) def _selection_spea2(self, decs, objs, cons, params, n): """ SPEA2 environmental selection. Parameters ---------- decs : np.ndarray Decision variables of shape (pop_size, dim) objs : np.ndarray Objective values of shape (pop_size, n_obj) cons : np.ndarray Constraint values of shape (pop_size, n_con) params : dict Adaptive parameters with keys 'F', 'CR', 'TR', 'KP' n : int Target population size Returns ------- selected_decs : np.ndarray Selected decision variables selected_objs : np.ndarray Selected objective values selected_cons : np.ndarray Selected constraint values selected_params : dict Selected adaptive parameters fitness : np.ndarray Fitness values of selected population """ pop_size = len(objs) if pop_size == 0: return decs, objs, cons, params, np.array([]) n = min(n, pop_size) # Calculate constraint violation if cons is not None and cons.size > 0: cvs = np.sum(np.maximum(0, cons), axis=1) else: cvs = np.zeros(pop_size) # Calculate SPEA2 fitness fitness = self._cal_spea2_fitness(objs, cvs) # Environmental selection next_mask = fitness < 1 n_selected = np.sum(next_mask) if n_selected < n: # Not enough: select top n by fitness sorted_indices = np.argsort(fitness) next_mask = np.zeros(pop_size, dtype=bool) next_mask[sorted_indices[:n]] = True elif n_selected > n: # Too many: truncation selected_indices = np.where(next_mask)[0] selected_objs_temp = objs[selected_indices] del_indices = self._truncation(selected_objs_temp, n_selected - n) next_mask[selected_indices[del_indices]] = False # Apply selection selected_indices = np.where(next_mask)[0] # Sort by fitness sorted_fitness_indices = np.argsort(fitness[selected_indices]) selected_indices = selected_indices[sorted_fitness_indices] selected_decs = decs[selected_indices] selected_objs = objs[selected_indices] selected_cons = cons[selected_indices] if cons is not None else None selected_params = { 'F': params['F'][selected_indices], 'CR': params['CR'][selected_indices], 'TR': params['TR'][selected_indices], 'KP': params['KP'][selected_indices] } selected_fitness = fitness[selected_indices] return selected_decs, selected_objs, selected_cons, selected_params, selected_fitness def _cal_spea2_fitness(self, objs, cvs): """ Calculate SPEA2 fitness values. Parameters ---------- objs : np.ndarray Objective values of shape (pop_size, n_obj) cvs : np.ndarray Constraint violations of shape (pop_size,) Returns ------- fitness : np.ndarray SPEA2 fitness values of shape (pop_size,) """ n = len(objs) if n == 0: return np.array([]) # Detect dominance relations dominate = np.zeros((n, n), dtype=bool) for i in range(n): for j in range(i + 1, n): if cvs[i] < cvs[j]: dominate[i, j] = True elif cvs[i] > cvs[j]: dominate[j, i] = True else: # Compare objectives (minimization) better_i = np.any(objs[i] < objs[j]) worse_i = np.any(objs[i] > objs[j]) if better_i and not worse_i: dominate[i, j] = True elif worse_i and not better_i: dominate[j, i] = True # Strength S(i) = number of solutions dominated by i s = np.sum(dominate, axis=1) # Raw fitness R(i) = sum of S(j) for all j that dominate i r = np.zeros(n) for i in range(n): dominating_indices = np.where(dominate[:, i])[0] r[i] = np.sum(s[dominating_indices]) # Density D(i) distance = cdist(objs, objs) np.fill_diagonal(distance, np.inf) distance_sorted = np.sort(distance, axis=1) k_neighbor = int(np.floor(np.sqrt(n))) k_neighbor = max(0, min(k_neighbor, n - 1)) d = 1.0 / (distance_sorted[:, k_neighbor] + 2) # Total fitness = R + D fitness = r + d return fitness def _truncation(self, objs, k): """ Truncation operator for SPEA2. Parameters ---------- objs : np.ndarray Objective values of shape (pop_size, n_obj) k : int Number of solutions to delete Returns ------- del_indices : np.ndarray Indices of solutions to delete """ n = len(objs) del_mask = np.zeros(n, dtype=bool) distance = cdist(objs, objs) np.fill_diagonal(distance, np.inf) while np.sum(del_mask) < k: remain_indices = np.where(~del_mask)[0] remain_dist = distance[remain_indices][:, remain_indices] sorted_dist = np.sort(remain_dist, axis=1) sorted_rows_indices = np.lexsort(np.rot90(sorted_dist)) del_idx_in_remain = sorted_rows_indices[0] del_mask[remain_indices[del_idx_in_remain]] = True return np.where(del_mask)[0]