Source code for ddmtolab.Algorithms.MTSO.MTES_KG

"""
Multi-Task Evolution Strategy with Knowledge-Guided External Sampling (MTES-KG)

This module implements the MTES-KG algorithm for multi-task single-objective optimization.
The algorithm extends CMA-ES with two types of knowledge-guided external sampling across tasks:
DoS (Domain of Solution knowledge) and SaS (Shape of function knowledge), along with adaptive
negative transfer mitigation.

References
----------
    [1] Y. Li, W. Gong, and S. Li. "Multitask Evolution Strategy With Knowledge-Guided External
        Sampling." IEEE Transactions on Evolutionary Computation, 28(6): 1733-1745, 2024.

Notes
-----
The code is developed in accordance with the MATLAB-based MTO-platform framework.

Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.21
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MTES_KG: """ Multi-Task Evolution Strategy with Knowledge-Guided External Sampling. Each task maintains an independent CMA-ES instance. Knowledge transfer between tasks is achieved through external samples generated via two strategies: - DoS (Domain of Solution knowledge): Samples from an auxiliary task's distribution, projected to within the current task's neighborhood - SaS (Shape of function knowledge): Transfers the search direction from an auxiliary task's successful solutions using CMA-ES coordinate system transformation An adaptive mechanism adjusts the number of external samples (tau) to mitigate negative transfer. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'equal', 'max_nfes': 'equal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, tau0=2, alpha=0.5, adj_gap=50, sigma0=0.3, save_data=True, save_path='./Data', name='MTES-KG', disable_tqdm=True): """ Initialize MTES-KG algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int, optional Population size (lambda) per task (default: 100) max_nfes : int, optional Maximum number of function evaluations per task (default: 10000) tau0 : int, optional Initial external sample number per task (default: 2) alpha : float, optional Probability of using DoS vs SaS for external sampling (default: 0.5) adj_gap : int, optional Generation gap for adjusting tau (default: 50) sigma0 : float, optional Initial step size for CMA-ES (default: 0.3) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'MTES-KG') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.tau0 = tau0 self.alpha = alpha self.adj_gap = adj_gap self.sigma0 = sigma0 self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MTES-KG algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_dim = max(dims) # CMA-ES operates in max dimension space lam = self.n # lambda: population size per task mu = lam // 2 # number of effective parents max_nfes_per_task = par_list(self.max_nfes, nt) total_max_nfes = self.max_nfes * nt # Per-task CMA-ES state initialization cma_params = [] for t in range(nt): cma_params.append(cmaes_init_params(n_dim, lam=lam, sigma0=self.sigma0)) # MTES_KG-specific per-task variables m_step = [0.0] * nt tau = [self.tau0] * nt num_ex_s = [[] for _ in range(nt)] suc_ex_s = [[] for _ in range(nt)] nfes_per_task_eig = [0] * nt # Generate initial samples and evaluate for history initialization init_decs = [] init_objs = [] init_cons = [] for t in range(nt): samples_t = cmaes_sample(cma_params[t]['m_dec'], cma_params[t]['sigma'], cma_params[t]['B'], cma_params[t]['D'], lam, clip=False) eval_decs = np.clip(samples_t[:, :dims[t]], 0, 1) objs_t, cons_t = evaluation_single(problem, eval_decs, t) init_decs.append(eval_decs) init_objs.append(objs_t) init_cons.append(cons_t) nfes = lam * nt all_decs, all_objs, all_cons = init_history(init_decs, init_objs, init_cons) # Storage for previous generation data (needed for SaS) old_samples = [None] * nt old_rank = [None] * nt gen = 0 pbar = tqdm(total=total_max_nfes, initial=nfes, desc=f"{self.name}", disable=self.disable_tqdm) while nfes < total_max_nfes: gen += 1 # === Step 1: Sample new internal solutions for each task === internal_samples = [None] * nt for t in range(nt): internal_samples[t] = cmaes_sample(cma_params[t]['m_dec'], cma_params[t]['sigma'], cma_params[t]['B'], cma_params[t]['D'], lam, clip=False) # Compute mean step size (average distance from mean) diffs = internal_samples[t] - cma_params[t]['m_dec'] m_step[t] = np.mean(np.sqrt(np.sum(diffs ** 2, axis=1))) # === Step 2: Sample external solutions (DoS/SaS) === external_samples = [None] * nt for t in range(nt): if tau[t] == 0: external_samples[t] = np.empty((0, n_dim)) continue # Select random auxiliary task other_tasks = list(range(nt)) other_tasks.remove(t) k = other_tasks[np.random.randint(len(other_tasks))] ext = np.zeros((tau[t], n_dim)) p_t = cma_params[t] p_k = cma_params[k] for i in range(tau[t]): if gen < 2: # First generation: random sample from own distribution z_i = np.random.randn(n_dim) ext[i] = p_t['m_dec'] + p_t['sigma'] * (p_t['B'] @ (p_t['D'] * z_i)) elif np.random.rand() < self.alpha: # DoS: Domain knowledge-guided external sampling z_i = np.random.randn(n_dim) sample_k = p_k['m_dec'] + p_k['sigma'] * (p_k['B'] @ (p_k['D'] * z_i)) vec = sample_k - p_t['m_dec'] vec_norm = np.linalg.norm(vec) if vec_norm < m_step[t]: ext[i] = sample_k else: uni_vec = vec / (vec_norm + 1e-30) ext[i] = p_t['m_dec'] + uni_vec * m_step[t] else: # SaS: Shape knowledge-guided external sampling idx = list(range(mu)) idx.pop(np.random.randint(len(idx))) top_decs_k = old_samples[k][old_rank[k][idx]] vec_mean = np.mean(top_decs_k, axis=0) vec = (vec_mean - p_k['m_dec']) / p_k['sigma'] # Transform from k's coordinate system to t's transformed = p_t['B'] @ (p_t['D'] * (p_k['B'].T @ (vec / p_k['D']))) ext[i] = p_t['m_dec'] + p_t['sigma'] * transformed external_samples[t] = ext # === Step 3: Evaluate, sort, update CMA-ES for each task === cur_decs = [None] * nt cur_objs = [None] * nt cur_cons = [None] * nt for t in range(nt): # Concatenate internal and external samples if external_samples[t] is not None and len(external_samples[t]) > 0: all_sample_decs = np.vstack([internal_samples[t], external_samples[t]]) else: all_sample_decs = internal_samples[t].copy() total_samples = len(all_sample_decs) # Boundary constraint violation (extended bounds [-0.05, 1.05]) clipped = np.clip(all_sample_decs, -0.05, 1.05) bound_cv = np.sum((all_sample_decs - clipped) ** 2, axis=1) # Evaluate (trim to task dimensions, clip to [0,1]) eval_decs = np.clip(all_sample_decs[:, :dims[t]], 0, 1) objs_t, cons_t = evaluation_single(problem, eval_decs, t) nfes += total_samples pbar.update(total_samples) # Compute constraint violation if cons_t is not None and cons_t.size > 0: cvs = np.sum(np.maximum(0, cons_t), axis=1) else: cvs = np.zeros(total_samples) # Add boundary violation penalty if np.any(bound_cv > 0) and np.max(cvs) > 0: bound_cv[bound_cv > 0] += np.max(cvs) total_cv = cvs + bound_cv # Sort by [total_cv, obj] lexicographically (ascending) rank_t = constrained_sort(objs_t, total_cv) # Track external sample success (how many external samples are in top mu) num_ex_s[t].append(tau[t]) suc_count = int(np.sum(rank_t[:mu] >= lam)) suc_ex_s[t].append(suc_count) # Negative transfer mitigation: adjust tau periodically if gen % self.adj_gap == 0 and gen > 0: start_idx = max(0, len(num_ex_s[t]) - self.adj_gap) num_all = sum(num_ex_s[t][start_idx:]) suc_all = sum(suc_ex_s[t][start_idx:]) if (num_all > 0 and suc_all / num_all > 0.5) or num_all == 0: tau[t] = min(self.tau0, tau[t] + 1) else: tau[t] = max(0, tau[t] - 1) # Save current samples and rank for next generation's SaS old_samples[t] = all_sample_decs.copy() old_rank[t] = rank_t.copy() # ===== CMA-ES parameter update ===== nfes_per_task_eig[t] += total_samples restarted = cmaes_update(cma_params[t], all_sample_decs[rank_t], nfes_per_task_eig[t]) if restarted: cma_params[t]['ps'] = np.zeros(n_dim) cma_params[t]['pc'] = np.zeros(n_dim) cma_params[t]['sigma'] = min(max(2 * cma_params[t]['sigma'], 0.01), 0.3) # Store population for history (top lam individuals in native space) top_lam_idx = rank_t[:lam] cur_decs[t] = np.clip(all_sample_decs[top_lam_idx, :dims[t]], 0, 1) cur_objs[t] = objs_t[top_lam_idx] if cons_t is not None and cons_t.size > 0: cur_cons[t] = cons_t[top_lam_idx] else: cur_cons[t] = np.zeros((lam, 0)) append_history(all_decs, cur_decs, all_objs, cur_objs, all_cons, cur_cons) pbar.close() runtime = time.time() - start_time results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=max_nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results