Source code for ddmtolab.Algorithms.MTSO.DTSKT

"""
Distribution Direction-Assisted Two-Stage Knowledge Transfer (DTSKT)

This module implements DTSKT for many-task single-objective optimization with
distribution-based two-stage knowledge transfer.

References
----------
    [1] Zhang, Tingyu, et al. "Distribution Direction-Assisted Two-Stage Knowledge
        Transfer for Many-Task Optimization." IEEE Transactions on Systems, Man,
        and Cybernetics: Systems (2025): 1-15.

Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class DTSKT: """ Distribution Direction-Assisted Two-Stage Knowledge Transfer for Many-Task Optimization. Uses Gaussian EDA with two-stage knowledge transfer: - Exploring stage: shifts sampling mean along the best source task's search path - Exploiting stage: uses combined source-target distribution for refined search Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'equal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'equal', 'max_nfes': 'equal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, A=0.35, beta=0.6, rmp=0.5, topn=2, save_data=True, save_path='./Data', name='DTSKT', disable_tqdm=True): """ Initialize DTSKT algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance (all tasks must have equal dimensions) n : int, optional Population size per task (default: 200) max_nfes : int, optional Maximum number of function evaluations per task (default: 10000) A : float, optional Elite ratio for weighted mean computation (default: 0.35) beta : float, optional Stage transition ratio - fraction of budget for exploring stage (default: 0.6) rmp : float, optional Probability of knowledge transfer in offspring generation (default: 0.5) topn : int, optional Number of top source tasks for multi-source transfer (default: 2) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'DTSKT') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 200 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.A = A self.beta = beta self.rmp = rmp self.topn = topn self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the DTSKT algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims d = dims[0] n = self.n max_nfes_per_task = par_list(self.max_nfes, nt) total_max_nfes = sum(max_nfes_per_task) elit_n = max(1, round(self.A * n)) # Initialize population and evaluate decs = initialization(problem, n) objs, cons = evaluation(problem, decs) nfes_per_task = [n] * nt total_nfes = n * nt all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Log-weights for weighted mean computation log_weights = np.log(elit_n + 1) - np.log(np.arange(1, elit_n + 1)) # Initialize distribution parameters for each task M = [None] * nt # weighted mean of elite M_old = [None] * nt # population mean S = [None] * nt # element-wise std from elite path = [None] * nt # search direction rank = [None] * nt # sort order for t in range(nt): rank[t] = _constrained_rank(objs[t], cons[t]) elite_decs = decs[t][rank[t][:elit_n]] M[t] = np.average(elite_decs, axis=0, weights=log_weights) M_old[t] = np.mean(decs[t], axis=0) QQ = elite_decs - M[t] S[t] = np.sqrt(np.mean(QQ ** 2, axis=0)) path[t] = decs[t][rank[t][0]] - M_old[t] pbar = tqdm(total=total_max_nfes, initial=total_nfes, desc=f"{self.name}", disable=self.disable_tqdm) while total_nfes < total_max_nfes: # Compute cosine similarity matrix CO CO = np.zeros((nt, nt)) for t in range(nt - 1): for k in range(t + 1, nt): norm_t = np.linalg.norm(path[t]) norm_k = np.linalg.norm(path[k]) if norm_t > 0 and norm_k > 0: cos_val = np.dot(path[t], path[k]) / (norm_t * norm_k) else: cos_val = 0.0 CO[t, k] = np.exp(cos_val) CO[k, t] = CO[t, k] # Diagonal stays 0 (matches MATLAB's 1/inf initialization) for t in range(nt): if total_nfes >= total_max_nfes: break # --- Evaluate M{t} --- M_dec = np.clip(M[t].reshape(1, -1), 0, 1) OO_obj, OO_con = evaluation_single(problem, M_dec, t) nfes_per_task[t] += 1 total_nfes += 1 pbar.update(1) flag = 0 idx_set = set() mDec_t = np.mean(decs[t], axis=0) # Find most similar task co_row = CO[t, :] k = np.argmax(co_row) max_co = co_row[k] co_sum = np.sum(co_row) msr = max_co / co_sum if co_sum > 0 else 0.0 # --- Determine transfer strategy --- if msr > 1.5 / nt: # Single dominant source task PATH = path[k].copy() elite_k = decs[k][rank[k][:elit_n]] shifted_k = elite_k - M_old[k] + mDec_t elite_t = decs[t][rank[t][:elit_n]] POPM = np.vstack([shifted_k, elite_t]) - M[t] S1 = np.sqrt(np.mean(POPM ** 2, axis=0)) else: # Multi-source transfer PATH = np.zeros(d) CO2 = np.argsort(CO[k, :])[::-1] TOP_idx = list(CO2[:self.topn]) if t in TOP_idx: TOP_idx.remove(t) if k not in TOP_idx: TOP_idx.append(k) # Compute weights from CO(t, :) w = np.array([CO[t, ti] for ti in TOP_idx]) w_sum = np.sum(w) w = w / w_sum if w_sum > 0 else np.ones(len(TOP_idx)) / len(TOP_idx) # Weighted path for i, ti in enumerate(TOP_idx): PATH += w[i] * path[ti] # Weighted combined std DDD = np.zeros(d) for i, kk in enumerate(TOP_idx): elite_kk = decs[kk][rank[kk][:elit_n]] shifted_kk = elite_kk - M_old[kk] + mDec_t elite_t = decs[t][rank[t][:elit_n]] POPM = np.vstack([shifted_kk, elite_t]) - M[t] WW = np.sqrt(np.mean(POPM ** 2, axis=0)) DDD += w[i] * WW S1 = DDD # --- Generate offspring --- offspring_decs = np.zeros((n, d)) offspring_evaluated = np.full(n, False) offspring_objs = np.full((n, objs[t].shape[1]), np.inf) n_cons_t = cons[t].shape[1] offspring_cons = np.zeros((n, n_cons_t)) gen_count = 0 for i in range(n - 2): if total_nfes >= total_max_nfes: gen_count = i break if flag == 0: # Generate from distribution if np.random.rand() < self.rmp: if total_nfes < self.beta * total_max_nfes: # Exploring stage offspring_decs[i] = np.random.normal( M[t] + np.random.rand() * PATH, S[t]) else: # Exploiting stage offspring_decs[i] = np.random.normal(M[t], S1) else: offspring_decs[i] = np.random.normal(M[t], S[t]) offspring_decs[i] = np.clip(offspring_decs[i], 0, 1) # Evaluate off_obj, off_con = evaluation_single( problem, offspring_decs[i:i + 1], t) offspring_objs[i] = off_obj[0] offspring_cons[i] = off_con[0] offspring_evaluated[i] = True nfes_per_task[t] += 1 total_nfes += 1 pbar.update(1) # Check if worse than OO if offspring_objs[i, 0] > OO_obj[0, 0]: flag = 1 idx_set.add(i) else: # Mirror strategy offspring_decs[i] = 2 * M[t] - offspring_decs[i - 1] flag = 0 offspring_decs[i] = np.clip(offspring_decs[i], 0, 1) gen_count = i + 1 # Add best individual and OO as last two if gen_count <= n - 2: gen_count = n - 2 offspring_decs[n - 2] = decs[t][rank[t][0]] # best from current pop offspring_decs[n - 1] = M_dec[0] # OO point # Evaluate non-evaluated offspring (mirrors + best + OO) non_eval_indices = [i for i in range(n) if not offspring_evaluated[i]] if non_eval_indices and total_nfes < total_max_nfes: ne_decs = offspring_decs[non_eval_indices] ne_objs, ne_cons = evaluation_single(problem, ne_decs, t) for j, ni in enumerate(non_eval_indices): offspring_objs[ni] = ne_objs[j] offspring_cons[ni] = ne_cons[j] offspring_evaluated[ni] = True nfes_per_task[t] += len(non_eval_indices) total_nfes += len(non_eval_indices) pbar.update(len(non_eval_indices)) # Assemble new population (idx first, then rest - order doesn't matter after sort) decs[t] = offspring_decs.copy() objs[t] = offspring_objs.copy() cons[t] = offspring_cons.copy() # --- Selection (sort) --- rank[t] = _constrained_rank(objs[t], cons[t]) # --- Update M with 4-point correction --- M_O = M[t].copy() elite_decs = decs[t][rank[t][:elit_n]] M[t] = np.average(elite_decs, axis=0, weights=log_weights) D_M = M[t] - M_O if total_nfes + 4 <= total_max_nfes: O_decs = np.clip(np.array([ M[t] + 2 * D_M, # O1: overshoot M[t], # O2: current M_O, # O3: old M[t] - 0.5 * D_M # O4: between ]), 0, 1) O_objs, _ = evaluation_single(problem, O_decs, t) nfes_per_task[t] += 4 total_nfes += 4 pbar.update(4) # Adjust M based on objective trend if O_objs[0, 0] < O_objs[1, 0] and O_objs[1, 0] < O_objs[2, 0]: # Improving trend: use overshoot M[t] = O_decs[0] elif O_objs[1, 0] > max(O_objs[2, 0], O_objs[3, 0]): # M went wrong: fallback M[t] = O_decs[3] # else: keep M[t] as is # --- Update S, path, M_old --- elite_decs = decs[t][rank[t][:elit_n]] QQ = elite_decs - M[t] S[t] = np.sqrt(np.mean(QQ ** 2, axis=0)) # Path from random top-3 individual to old mean top3_idx = np.random.randint(0, min(3, n)) path[t] = decs[t][rank[t][top3_idx]] - M_old[t] M_old[t] = np.mean(decs[t], axis=0) # Append history for all tasks append_history(all_decs, decs, all_objs, objs, all_cons, cons) pbar.close() runtime = time.time() - start_time # Trim excess evaluations all_decs, all_objs, nfes_per_task, all_cons = trim_excess_evaluations( all_decs, all_objs, nt, max_nfes_per_task, nfes_per_task, all_cons) # Save results results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=max_nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _constrained_rank(objs, cons): """ Sort individuals by constraint violation then objective value (ascending). Parameters ---------- objs : np.ndarray Objective values, shape (n, 1) cons : np.ndarray Constraint values, shape (n, c) Returns ------- rank : np.ndarray Sort indices, shape (n,) """ n = objs.shape[0] if cons is not None and cons.shape[1] > 0: cv = np.sum(np.maximum(0, cons), axis=1) else: cv = np.zeros(n) return np.lexsort((objs[:, 0], cv))