Source code for ddmtolab.Algorithms.MTSO.MTEA_PAE

"""
Multi-Task Evolutionary Algorithm with Progressive Auto-Encoding (MTEA-PAE)

This module implements MTEA-PAE for multi-task optimization using kernelized
autoencoding (NFC) for cross-task knowledge transfer with adaptive operator
and transfer type selection.

References
----------
    [1] Gu, Qiong, et al. "Progressive Auto-Encoding for Domain Adaptation
        in Evolutionary Multi-Task Optimization." Applied Soft Computing,
        113916, 2025.

Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


def _kernelmatrix_poly(X, X2):
    """Polynomial kernel matrix K = (X^T @ X2 + 0.1)^5.

    Parameters
    ----------
    X : np.ndarray, shape (D, N1)
        Feature matrix (features x samples)
    X2 : np.ndarray, shape (D, N2)
        Feature matrix (features x samples)

    Returns
    -------
    K : np.ndarray, shape (N1, N2)
    """
    d1, d2 = X.shape[0], X2.shape[0]
    if d1 < d2:
        X = np.vstack([X, np.zeros((d2 - d1, X.shape[1]))])
    elif d2 < d1:
        X2 = np.vstack([X2, np.zeros((d1 - d2, X2.shape[1]))])
    return (X.T @ X2 + 0.1) ** 5


def _nfc(T_H, S_H, S_Best):
    """Kernelized autoencoding transfer (NFC).

    Transfers source solutions to target domain using kernel ridge regression
    learned from historical population distributions.

    Parameters
    ----------
    T_H : np.ndarray, shape (N_t, D_t)
        Target history population
    S_H : np.ndarray, shape (N_s, D_k)
        Source history population
    S_Best : np.ndarray, shape (M, D_k)
        Source solutions to transfer

    Returns
    -------
    result : np.ndarray, shape (M, D_t)
        Transferred solutions in target space
    """
    D_t = T_H.shape[1]
    D_k = S_H.shape[1]

    # Pad dimensions to match
    T_H_pad = T_H.copy()
    S_H_pad = S_H.copy()
    if D_t < D_k:
        T_H_pad = np.hstack([T_H_pad,
                             np.zeros((T_H_pad.shape[0], D_k - D_t))])
    elif D_t > D_k:
        S_H_pad = np.hstack([S_H_pad,
                             np.zeros((S_H_pad.shape[0], D_t - D_k))])

    # Transpose to features x samples
    S_T = S_H_pad.T
    T_T = T_H_pad.T

    # Self-kernel K(S, S)
    K = _kernelmatrix_poly(S_T, S_T)

    # Solve for mapping W = P * pinv(Q0 + reg)
    Q0 = K @ K.T
    P = T_T @ K.T
    reg = 1e-5 * np.eye(Q0.shape[0])
    W = P @ np.linalg.pinv(Q0 + reg)

    # Transform source best
    if D_t <= D_k:
        K_new = _kernelmatrix_poly(S_T, S_Best.T)
        result = (W @ K_new).T
        return result[:, :D_t]
    else:
        S_Best_pad = np.hstack([S_Best,
                                np.zeros((S_Best.shape[0], D_t - D_k))])
        K_new = _kernelmatrix_poly(S_T, S_Best_pad.T)
        return (W @ K_new).T


def _constrained_sort(objs, cons):
    """Sort indices by constraint violation then objective (ascending).

    Parameters
    ----------
    objs : np.ndarray, shape (N, 1)
    cons : np.ndarray, shape (N, C)

    Returns
    -------
    rank : np.ndarray
        Sorted indices
    """
    if cons.shape[1] > 0:
        cv = np.sum(np.maximum(0, cons), axis=1)
    else:
        cv = np.zeros(len(objs))
    return np.lexsort((objs.flatten(), cv))


[docs] class MTEA_PAE: """ Multi-Task Evolutionary Algorithm with Progressive Auto-Encoding. Uses kernelized autoencoding (NFC) for cross-task knowledge transfer with two transfer strategies: segment transfer (using current distribution) and stochastic replacement transfer (using archive). Adaptive selection between DE/rand/1/bin and GA (SBX+PM) for offspring generation. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'equal', 'max_nfes': 'equal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, Seg=10, TNum=20, TGap=5, F=0.5, CR=0.9, MuC=2, MuM=5, save_data=True, save_path='./Data', name='MTEA-PAE', disable_tqdm=True): """ Initialize MTEA-PAE algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int, optional Population size per task (default: 100) max_nfes : int, optional Maximum number of function evaluations per task (default: 10000) Seg : int, optional Number of segments for distribution snapshots (default: 10) TNum : int, optional Number of solutions to transfer (default: 20) TGap : int, optional Generation gap between transfers (default: 5) F : float, optional DE mutation scale factor (default: 0.5) CR : float, optional DE crossover rate (default: 0.9) MuC : float, optional SBX distribution index (default: 2) MuM : float, optional PM distribution index (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'MTEA-PAE') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.Seg = Seg self.TNum = TNum self.TGap = TGap self.F = F self.CR = CR self.MuC = MuC self.MuM = MuM self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MTEA-PAE algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n = self.n max_nfes_per_task = par_list(self.max_nfes, nt) max_nfes = self.max_nfes * nt maxD = max(dims) TNum = min(self.TNum, n // 2) # Generation gap between distribution snapshots SegGap = max(1, self.max_nfes // (n * self.Seg)) # Initialize populations in [0,1]^D_t per task decs = initialization(problem, n) objs, cons = evaluation(problem, decs) nfes = n * nt all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Pad to maxD space (extra dims filled with random) pop_decs = [] for t in range(nt): if dims[t] < maxD: pad = np.random.rand(n, maxD - dims[t]) pop_decs.append(np.hstack([decs[t], pad])) else: pop_decs.append(decs[t].copy()) pop_objs = [o.copy() for o in objs] pop_cons = [c.copy() for c in cons] # Sort each population by [CVs, Objs] for t in range(nt): rank = _constrained_sort(pop_objs[t], pop_cons[t]) pop_decs[t] = pop_decs[t][rank] pop_objs[t] = pop_objs[t][rank] pop_cons[t] = pop_cons[t][rank] # Archive and distribution reference (copies of initial sorted pop) arc_decs = [p.copy() for p in pop_decs] arc_objs = [o.copy() for o in pop_objs] arc_cons = [c.copy() for c in pop_cons] dis_decs = [p.copy() for p in pop_decs] dis_objs = [o.copy() for o in pop_objs] dis_cons = [c.copy() for c in pop_cons] # Adaptive success tracking SuccT = TNum * np.ones((nt, 2)) SumT = SuccT.copy() SuccG = n * np.ones((nt, 2)) SumG = SuccG.copy() gen = 0 pbar = tqdm(total=max_nfes, initial=nfes, desc=self.name, disable=self.disable_tqdm) while nfes < max_nfes: gen += 1 # --- Generate offspring per task --- off_decs = [] off_kt = [] off_op = [] for t in range(nt): pG = SuccG[t] / SumG[t] od, oop = self._generation(pop_decs[t], pG, n, maxD) off_decs.append(od) off_kt.append(np.zeros(n, dtype=int)) off_op.append(oop) # --- Knowledge transfer every TGap generations --- if TNum > 0 and gen % self.TGap == 0: # Backup archive before temporary modifications arc_bk_d = [a.copy() for a in arc_decs] arc_bk_o = [a.copy() for a in arc_objs] arc_bk_c = [a.copy() for a in arc_cons] # Sort DisPop and merge Arc with Pop for t in range(nt): rank = _constrained_sort(dis_objs[t], dis_cons[t]) dis_decs[t] = dis_decs[t][rank] dis_objs[t] = dis_objs[t][rank] dis_cons[t] = dis_cons[t][rank] arc_decs[t] = np.vstack([arc_decs[t], pop_decs[t]]) arc_objs[t] = np.vstack([arc_objs[t], pop_objs[t]]) arc_cons[t] = np.vstack([arc_cons[t], pop_cons[t]]) rank = _constrained_sort(arc_objs[t], arc_cons[t]) arc_decs[t] = arc_decs[t][rank] arc_objs[t] = arc_objs[t][rank] arc_cons[t] = arc_cons[t][rank] # Perform NFC transfer for each task for t in range(nt): k = np.random.randint(nt) while k == t: k = np.random.randint(nt) # Segment Transfer using DisPop sBest_seg = pop_decs[k][:TNum, :dims[k]] tDis = dis_decs[t][:, :dims[t]] kDis = dis_decs[k][:, :dims[k]] SegDec = _nfc(tDis, kDis, sBest_seg) if dims[t] < maxD: SegDec = np.hstack([SegDec, np.random.rand(TNum, maxD - dims[t])]) # Stochastic Transfer using Archive sBest_sto = arc_decs[k][:TNum, :dims[k]] tArc = arc_decs[t][:, :dims[t]] kArc = arc_decs[k][:, :dims[k]] StoDec = _nfc(tArc, kArc, sBest_sto) if dims[t] < maxD: StoDec = np.hstack([StoDec, np.random.rand(TNum, maxD - dims[t])]) # Probabilistic selection between transfer types pT = SuccT[t] / SumT[t] p_seg = pT[0] / (pT[0] + pT[1]) replace_idx = np.random.choice(n, TNum, replace=False) for i, idx in enumerate(replace_idx): if np.random.rand() < p_seg: off_decs[t][idx] = np.clip(SegDec[i], 0, 1) off_kt[t][idx] = 1 else: off_decs[t][idx] = np.clip(StoDec[i], 0, 1) off_kt[t][idx] = 2 off_op[t][idx] = 0 # Restore archive from backup arc_decs = arc_bk_d arc_objs = arc_bk_o arc_cons = arc_bk_c # --- Environmental selection --- for t in range(nt): # Elite solution transfer from random source task k = np.random.randint(nt) while k == t: k = np.random.randint(nt) rnd_idx = np.random.randint(n) off_decs[t][rnd_idx] = pop_decs[k][0].copy() off_kt[t][rnd_idx] = 3 off_op[t][rnd_idx] = 0 # Count submissions SumT[t, 0] += np.sum(off_kt[t] == 1) SumT[t, 1] += np.sum(off_kt[t] == 2) SumG[t, 0] += np.sum(off_op[t] == 1) SumG[t, 1] += np.sum(off_op[t] == 2) # Evaluate offspring off_objs_t, off_cons_t = evaluation_single( problem, off_decs[t][:, :dims[t]], t) nfes += n pbar.update(n) # Combine parent + offspring merged_decs = np.vstack([pop_decs[t], off_decs[t]]) merged_objs = np.vstack([pop_objs[t], off_objs_t]) merged_cons = np.vstack([pop_cons[t], off_cons_t]) merged_kt = np.concatenate( [np.zeros(n, dtype=int), off_kt[t]]) merged_op = np.concatenate( [np.zeros(n, dtype=int), off_op[t]]) # Elitist selection sel = selection_elit(objs=merged_objs, n=n, cons=merged_cons) # Sort selected population (needed for rank-biased DE) sub_rank = _constrained_sort(merged_objs[sel], merged_cons[sel]) sel_sorted = sel[sub_rank] # Count successes from surviving offspring SuccT[t, 0] += np.sum(merged_kt[sel_sorted] == 1) SuccT[t, 1] += np.sum(merged_kt[sel_sorted] == 2) SuccG[t, 0] += np.sum(merged_op[sel_sorted] == 1) SuccG[t, 1] += np.sum(merged_op[sel_sorted] == 2) # Failed individuals (not selected) all_idx = set(range(2 * n)) failed_idx = np.array(sorted(all_idx - set(sel_sorted))) # Update archive with failed individuals arc_decs[t] = np.vstack([arc_decs[t], merged_decs[failed_idx]]) arc_objs[t] = np.vstack([arc_objs[t], merged_objs[failed_idx]]) arc_cons[t] = np.vstack([arc_cons[t], merged_cons[failed_idx]]) # Random sample archive to N perm = np.random.permutation(len(arc_decs[t]))[:n] arc_decs[t] = arc_decs[t][perm] arc_objs[t] = arc_objs[t][perm] arc_cons[t] = arc_cons[t][perm] # Update population (sorted) pop_decs[t] = merged_decs[sel_sorted] pop_objs[t] = merged_objs[sel_sorted] pop_cons[t] = merged_cons[sel_sorted] # Update distribution reference every SegGap generations if gen % SegGap == 0: dis_decs = [p.copy() for p in pop_decs] dis_objs = [o.copy() for o in pop_objs] dis_cons = [c.copy() for c in pop_cons] # Record history in task-specific dimensions real_decs = [pop_decs[t][:, :dims[t]] for t in range(nt)] append_history(all_decs, real_decs, all_objs, pop_objs, all_cons, pop_cons) pbar.close() runtime = time.time() - start_time results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=max_nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _generation(self, pop_decs, pG, n, maxD): """Generate offspring with adaptive DE/GA selection. Parameters ---------- pop_decs : np.ndarray, shape (n, maxD) Current sorted population pG : np.ndarray, shape (2,) Success ratio for [DE, GA] n : int Population size maxD : int Maximum dimensionality Returns ------- off_decs : np.ndarray, shape (n, maxD) op : np.ndarray, shape (n,) Operator type (1=DE, 2=GA) """ off_decs = pop_decs.copy() op = np.zeros(n, dtype=int) # Pre-compute tournament selection for GA parents indorder = np.zeros(2 * n, dtype=int) for j in range(2 * n): a, b = np.random.randint(n, size=2) indorder[j] = min(a, b) # lower index = better (sorted pop) p_de = pG[0] / (pG[0] + pG[1]) for i in range(n): if np.random.rand() < p_de: # DE/rand/1/bin with rank-biased donor selection x1 = np.random.randint(n) itr = 0 while ((np.random.rand() > (n - 1 - x1) / n or x1 == i) and itr < 1000): x1 = np.random.randint(n) itr += 1 x2 = np.random.randint(n) itr = 0 while ((np.random.rand() > (n - 1 - x2) / n or x2 == i or x2 == x1) and itr < 1000): x2 = np.random.randint(n) itr += 1 x3 = np.random.randint(n) while x3 == i or x3 == x1 or x3 == x2: x3 = np.random.randint(n) # DE/rand/1 mutation + binomial crossover v = pop_decs[x1] + self.F * (pop_decs[x2] - pop_decs[x3]) u = pop_decs[i].copy() j_rand = np.random.randint(maxD) mask = np.random.rand(maxD) < self.CR mask[j_rand] = True u[mask] = v[mask] off_decs[i] = u op[i] = 1 else: # GA: SBX crossover + polynomial mutation p1 = indorder[i] p2 = indorder[i + n // 2] par1 = pop_decs[p1].reshape(1, -1) par2 = pop_decs[p2].reshape(1, -1) child, _ = crossover(par1, par2, self.MuC) child = mutation(child, self.MuM) off_decs[i] = child[0] op[i] = 2 off_decs[i] = np.clip(off_decs[i], 0, 1) return off_decs, op