"""
Multi-objective Multi-task Evolutionary Algorithm with Progressive Auto-Encoding (MO-MTEA-PAE)
This module implements MO-MTEA-PAE for multi-task multi-objective optimization problems.
References
----------
[1] Q. Gu, Y. Li, W. Gong, Z. Yuan, B. Ning, C. Hu, and J. Wu, "Progressive Auto-Encoding for Domain Adaptation in Evolutionary Multi-Task Optimization," Applied Soft Computing, vol. 175, p. 113916, 2025.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class MO_MTEA_PAE:
"""
Multi-objective Multi-task Evolutionary Algorithm with Progressive Auto-Encoding.
This algorithm features:
- Kernel-based NFC (Nonlinear Feature Coupling) for cross-task knowledge transfer
- Two transfer strategies: segment transfer (historical distribution) and stochastic transfer (current distribution)
- Adaptive selection between DE and GA offspring generation
- Adaptive selection between transfer types based on success rates
- SPEA2 environmental selection per task
- Elite solution transfer across tasks
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None,
Seg=10, TNum=20, TGap=5,
F=0.5, CR=0.9, MuC=20, MuM=15,
save_data=True, save_path='./Data',
name='MO-MTEA-PAE', disable_tqdm=True):
"""
Initialize MO-MTEA-PAE algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int or List[int], optional
Population size per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 10000)
Seg : int, optional
Number of segments for DisPop update schedule (default: 10)
TNum : int, optional
Number of transfer solutions per transfer event (default: 20)
TGap : int, optional
Transfer gap in generations (default: 5)
F : float, optional
DE mutation factor (default: 0.5)
CR : float, optional
DE crossover rate (default: 0.9)
MuC : float, optional
SBX crossover distribution index (default: 20)
MuM : float, optional
PM mutation distribution index (default: 15)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'MO-MTEA-PAE')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.Seg = Seg
self.TNum = TNum
self.TGap = TGap
self.F = F
self.CR = CR
self.MuC = MuC
self.MuM = MuM
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MO-MTEA-PAE algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, constraints, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
n_per_task = par_list(self.n, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
dims = problem.dims
d_max = max(dims)
# Clamp TNum
TNum = min(self.TNum, min(n_per_task) // 2)
# SegGap: how often to update DisPop (in generations)
# total_gen ≈ max_nfes / N, SegGap = total_gen / Seg
SegGap = max(1, max_nfes_per_task[0] // (n_per_task[0] * self.Seg))
# Initialize populations in unified space (d_max)
decs = []
objs = []
cons = []
for t in range(nt):
decs_t = np.random.rand(n_per_task[t], d_max)
objs_t, cons_t = evaluation_single(problem, decs_t[:, :dims[t]], t)
decs.append(decs_t)
objs.append(objs_t)
cons.append(cons_t)
nfes_per_task = n_per_task.copy()
# SPEA2 selection for initial population
fit = []
for t in range(nt):
sel_idx, fit_t = self._spea2_select(objs[t], cons[t], n_per_task[t])
decs[t] = decs[t][sel_idx]
objs[t] = objs[t][sel_idx]
cons[t] = cons[t][sel_idx]
fit.append(fit_t[sel_idx])
# Save native-space history
all_decs = []
all_objs = []
all_cons = []
for t in range(nt):
all_decs.append([decs[t][:, :dims[t]].copy()])
all_objs.append([objs[t].copy()])
all_cons.append([cons[t].copy()])
# Archive and DisPop (deep copies)
arc_decs = [d.copy() for d in decs]
arc_objs = [o.copy() for o in objs]
arc_cons = [c.copy() for c in cons]
dis_decs = [d.copy() for d in decs]
dis_objs = [o.copy() for o in objs]
dis_cons = [c.copy() for c in cons]
# KT/OP flags for current population (reset each gen)
kt_flags = [np.zeros(n_per_task[t], dtype=int) for t in range(nt)]
op_flags = [np.zeros(n_per_task[t], dtype=int) for t in range(nt)]
# Success tracking (cumulative)
succ_t = np.full((nt, 2), float(TNum)) # successful transfer count [seg, sto]
sum_t = succ_t.copy() # total transfer count [seg, sto]
succ_g = np.array([[float(n_per_task[t])] * 2 for t in range(nt)]) # successful gen [DE, GA]
sum_g = succ_g.copy() # total gen [DE, GA]
# Progress bar
total_nfes = sum(max_nfes_per_task)
pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
gen_count = 0
while sum(nfes_per_task) < total_nfes:
gen_count += 1
# === Step 1: Generate Offspring ===
off_decs_list = []
off_objs_list = []
off_cons_list = []
off_kt_list = []
off_op_list = []
for t in range(nt):
Nt = n_per_task[t]
pG = succ_g[t] / sum_g[t]
off_decs_t, off_op_t = self._generation(
decs[t], Nt, pG, d_max)
off_kt_t = np.zeros(Nt, dtype=int)
off_decs_list.append(off_decs_t)
off_kt_list.append(off_kt_t)
off_op_list.append(off_op_t)
# === Step 2: Knowledge Transfer ===
if TNum > 0 and gen_count % self.TGap == 0:
# Backup archive
arc_back_decs = [d.copy() for d in arc_decs]
arc_back_objs = [o.copy() for o in arc_objs]
arc_back_cons = [c.copy() for c in arc_cons]
# Re-sort DisPop and prepare sorted current pop (temporarily in arc)
sorted_pop_decs = []
sorted_pop_objs = []
sorted_pop_cons = []
for t in range(nt):
# SPEA2 re-sort DisPop
sel_idx, _ = self._spea2_select(dis_objs[t], dis_cons[t], n_per_task[t])
dis_decs[t] = dis_decs[t][sel_idx]
dis_objs[t] = dis_objs[t][sel_idx]
dis_cons[t] = dis_cons[t][sel_idx]
# Sorted current pop (used as "Arc" for stochastic transfer)
sel_idx2, _ = self._spea2_select(objs[t], cons[t], n_per_task[t])
sorted_pop_decs.append(decs[t][sel_idx2])
sorted_pop_objs.append(objs[t][sel_idx2])
sorted_pop_cons.append(cons[t][sel_idx2])
for t in range(nt):
# Pick random source task
k = np.random.randint(nt)
while k == t:
k = np.random.randint(nt)
Nt = n_per_task[t]
Nk = n_per_task[k]
# --- Segment Transfer ---
# Select TNum best from Pop{k}
nd_idx = np.where(fit[k] < 1)[0]
if len(nd_idx) < TNum:
s_best_idx = np.arange(min(TNum, Nk))
else:
s_best_idx = nd_idx[np.random.permutation(len(nd_idx))[:TNum]]
s_best_decs = decs[k][s_best_idx, :dims[k]]
# NFC with DisPop distributions
t_dis = dis_decs[t][:, :dims[t]]
k_dis = dis_decs[k][:, :dims[k]]
seg_dec = self._nfc(t_dis, k_dis, s_best_decs)
# Pad to d_max
if dims[t] < d_max:
seg_dec = np.hstack([seg_dec,
np.random.rand(seg_dec.shape[0], d_max - dims[t])])
# --- Stochastic Replacement Transfer ---
s_best_decs2 = sorted_pop_decs[k][:TNum, :dims[k]]
t_dis2 = sorted_pop_decs[t][:, :dims[t]]
k_dis2 = sorted_pop_decs[k][:, :dims[k]]
sto_dec = self._nfc(t_dis2, k_dis2, s_best_decs2)
if dims[t] < d_max:
sto_dec = np.hstack([sto_dec,
np.random.rand(sto_dec.shape[0], d_max - dims[t])])
# Select between segment and stochastic
pT = succ_t[t] / sum_t[t]
tr_decs = np.zeros((TNum, d_max))
tr_kt = np.zeros(TNum, dtype=int)
for i in range(TNum):
if np.random.rand() < pT[0] / (pT[0] + pT[1]):
tr_decs[i] = seg_dec[i]
tr_kt[i] = 1 # segment
else:
tr_decs[i] = sto_dec[i]
tr_kt[i] = 2 # stochastic
tr_decs = np.clip(tr_decs, 0, 1)
# Replace random offspring with transferred solutions
replace_idx = np.random.permutation(len(off_decs_list[t]))[:TNum]
off_decs_list[t][replace_idx] = tr_decs
off_kt_list[t][replace_idx] = tr_kt
off_op_list[t][replace_idx] = 0
# Restore archive
arc_decs = arc_back_decs
arc_objs = arc_back_objs
arc_cons = arc_back_cons
# Update DisPop every SegGap generations
if gen_count % SegGap == 0:
dis_decs = [d.copy() for d in decs]
dis_objs = [o.copy() for o in objs]
dis_cons = [c.copy() for c in cons]
# === Step 3: Environmental Selection ===
for t in range(nt):
if nfes_per_task[t] >= max_nfes_per_task[t]:
continue
Nt = n_per_task[t]
# Reset parent flags
kt_flags[t] = np.zeros(Nt, dtype=int)
op_flags[t] = np.zeros(Nt, dtype=int)
# Elite solution transfer: replace one random offspring with best from another task
k = np.random.randint(nt)
while k == t:
k = np.random.randint(nt)
rnd_idx = np.random.randint(Nt)
off_decs_list[t][rnd_idx] = decs[k][0].copy() # best from task k (index 0)
off_kt_list[t][rnd_idx] = 3 # elite transfer
off_op_list[t][rnd_idx] = 0
# Track total counts (before evaluation)
sum_t[t, 0] += np.sum(off_kt_list[t] == 1)
sum_t[t, 1] += np.sum(off_kt_list[t] == 2)
sum_g[t, 0] += np.sum(off_op_list[t] == 1)
sum_g[t, 1] += np.sum(off_op_list[t] == 2)
# Evaluate offspring
off_objs_t, off_cons_t = evaluation_single(
problem, off_decs_list[t][:, :dims[t]], t)
nfes_per_task[t] += len(off_decs_list[t])
pbar.update(len(off_decs_list[t]))
# Merge parent + offspring
merged_decs = np.vstack([decs[t], off_decs_list[t]])
merged_objs = np.vstack([objs[t], off_objs_t])
merged_cons = np.vstack([cons[t], off_cons_t])
merged_kt = np.concatenate([kt_flags[t], off_kt_list[t]])
merged_op = np.concatenate([op_flags[t], off_op_list[t]])
# SPEA2 selection
sel_idx, fit_all = self._spea2_select(
merged_objs, merged_cons, Nt)
# Identify failed solutions
all_idx_set = set(range(len(merged_decs)))
sel_idx_set = set(sel_idx.tolist())
failed_idx = np.array(sorted(all_idx_set - sel_idx_set))
# Update success counts
succ_t[t, 0] += np.sum(merged_kt[sel_idx] == 1)
succ_t[t, 1] += np.sum(merged_kt[sel_idx] == 2)
succ_g[t, 0] += np.sum(merged_op[sel_idx] == 1)
succ_g[t, 1] += np.sum(merged_op[sel_idx] == 2)
# Update population
decs[t] = merged_decs[sel_idx]
objs[t] = merged_objs[sel_idx]
cons[t] = merged_cons[sel_idx]
fit[t] = fit_all[sel_idx]
kt_flags[t] = merged_kt[sel_idx]
op_flags[t] = merged_op[sel_idx]
# Update archive with failed solutions
if len(failed_idx) > 0:
failed_decs = merged_decs[failed_idx]
failed_objs = merged_objs[failed_idx]
failed_cons = merged_cons[failed_idx]
arc_decs[t] = np.vstack([arc_decs[t], failed_decs])
arc_objs[t] = np.vstack([arc_objs[t], failed_objs])
arc_cons[t] = np.vstack([arc_cons[t], failed_cons])
# Trim archive to N by random selection
if len(arc_decs[t]) > Nt:
perm = np.random.permutation(len(arc_decs[t]))[:Nt]
arc_decs[t] = arc_decs[t][perm]
arc_objs[t] = arc_objs[t][perm]
arc_cons[t] = arc_cons[t][perm]
# Append native-space history
append_history(all_decs[t], decs[t][:, :dims[t]],
all_objs[t], objs[t],
all_cons[t], cons[t])
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def _generation(self, pop_decs, N, pG, d_max):
"""
Generate offspring using adaptive DE or GA.
Parameters
----------
pop_decs : np.ndarray, shape (N, d_max)
Parent population (sorted by SPEA2 fitness, index 0 = best)
N : int
Population size
pG : np.ndarray, shape (2,)
Success rates for [DE, GA]
d_max : int
Unified dimension
Returns
-------
off_decs : np.ndarray, shape (N, d_max)
op_flags : np.ndarray, shape (N,)
1=DE, 2=GA
"""
off_decs = np.zeros((N, d_max))
op_flags = np.zeros(N, dtype=int)
# Tournament selection indices for GA
rank_vals = np.arange(1, N + 1)
ind_order = tournament_selection(2, 2 * N, rank_vals)
for i in range(N):
p_de = pG[0] / (pG[0] + pG[1]) if (pG[0] + pG[1]) > 0 else 0.5
if np.random.rand() < p_de:
# DE/rand/1 with rank-based selection
x1 = self._rank_selection(N, exclude=[i])
x2 = self._rank_selection(N, exclude=[i, x1])
x3 = self._random_selection(N, exclude=[i, x1, x2])
mutant = pop_decs[x1] + self.F * (pop_decs[x2] - pop_decs[x3])
# DE binomial crossover
j_rand = np.random.randint(d_max)
mask = np.random.rand(d_max) < self.CR
mask[j_rand] = True
off_decs[i] = np.where(mask, mutant, pop_decs[i])
op_flags[i] = 1
else:
# GA: SBX crossover + PM mutation
p1 = ind_order[i]
p2 = ind_order[i + N // 2] if (i + N // 2) < len(ind_order) else ind_order[-1]
off1, _ = crossover(pop_decs[p1], pop_decs[p2], mu=self.MuC)
off1 = mutation(off1, mu=self.MuM)
off_decs[i] = off1
op_flags[i] = 2
off_decs[i] = np.clip(off_decs[i], 0, 1)
return off_decs, op_flags
@staticmethod
def _rank_selection(N, exclude=None):
"""Rank-based DE parent selection. Lower index = better rank."""
if exclude is None:
exclude = []
for _ in range(1000):
x = np.random.randint(N)
# rank = x+1 (1-indexed), acceptance prob = (N - rank) / N
if x not in exclude and np.random.rand() < (N - (x + 1)) / N:
return x
candidates = [i for i in range(N) if i not in exclude]
return np.random.choice(candidates) if candidates else 0
@staticmethod
def _random_selection(N, exclude=None):
"""Random selection excluding specified indices."""
if exclude is None:
exclude = []
candidates = [i for i in range(N) if i not in exclude]
return np.random.choice(candidates) if candidates else 0
@staticmethod
def _spea2_select(objs, cons, N):
"""
SPEA2 environmental selection.
Returns
-------
sel_idx : np.ndarray
Indices of selected solutions (sorted by fitness)
fitness : np.ndarray
SPEA2 fitness of all input solutions
"""
pop_size = objs.shape[0]
N = min(N, pop_size)
fitness = spea2_fitness(objs, cons)
next_mask = fitness < 1
n_selected = np.sum(next_mask)
if n_selected < N:
sorted_idx = np.argsort(fitness)
next_mask = np.zeros(pop_size, dtype=bool)
next_mask[sorted_idx[:N]] = True
elif n_selected > N:
selected_idx = np.where(next_mask)[0]
keep_idx = spea2_truncation(objs[selected_idx], N)
next_mask = np.zeros(pop_size, dtype=bool)
next_mask[selected_idx[keep_idx]] = True
# Sort selected by fitness
sel_idx = np.where(next_mask)[0]
sorted_by_fit = np.argsort(fitness[sel_idx])
sel_idx = sel_idx[sorted_by_fit]
return sel_idx, fitness
@staticmethod
def _nfc(target_pop, source_pop, source_best, kernel='poly'):
"""
Nonlinear Feature Coupling (NFC) for cross-task knowledge transfer.
Maps source_best solutions from source task to target task space
using kernel autoencoding.
Parameters
----------
target_pop : np.ndarray, shape (N, D_target)
Target task population distribution
source_pop : np.ndarray, shape (N, D_source)
Source task population distribution
source_best : np.ndarray, shape (TNum, D_source)
Solutions to transfer from source task
Returns
-------
mapped : np.ndarray, shape (TNum, D_target)
"""
D_target = target_pop.shape[1]
D_source = source_pop.shape[1]
# Pad to same dimension
T_H = target_pop.copy()
S_H = source_pop.copy()
if D_target < D_source:
T_H = np.hstack([T_H, np.zeros((T_H.shape[0], D_source - D_target))])
elif D_target > D_source:
S_H = np.hstack([S_H, np.zeros((S_H.shape[0], D_target - D_source))])
# Transpose to (dim, N) - columns are samples
S_H_T = S_H.T.astype(np.float64)
T_H_T = T_H.T.astype(np.float64)
# Kernel matrix
kk = MO_MTEA_PAE._kernelmatrix(kernel, S_H_T, S_H_T)
d = kk.shape[0]
Q0 = kk @ kk.T
P = T_H_T @ kk.T
reg = 1e-5 * np.eye(d)
W = P @ np.linalg.pinv(Q0 + reg)
# Map source_best
S_Best = source_best.copy().astype(np.float64)
if D_target <= D_source:
K_map = MO_MTEA_PAE._kernelmatrix(kernel, S_H_T, S_Best.T)
mapped = (W @ K_map).T
mapped = mapped[:, :D_target]
else:
S_Best = np.hstack([S_Best, np.zeros((S_Best.shape[0], D_target - D_source))])
K_map = MO_MTEA_PAE._kernelmatrix(kernel, S_H_T, S_Best.T)
mapped = (W @ K_map).T
return mapped
@staticmethod
def _kernelmatrix(kernel, X, X2):
"""
Compute kernel matrix between column-format data.
Parameters
----------
kernel : str
Kernel type ('poly', 'lin', 'rbf')
X : np.ndarray, shape (dim, N1)
X2 : np.ndarray, shape (dim, N2)
Returns
-------
K : np.ndarray, shape (N1, N2)
"""
# Dimension padding
d1 = X.shape[0]
d2 = X2.shape[0]
if d1 < d2:
X = np.vstack([X, np.zeros((d2 - d1, X.shape[1]))])
elif d1 > d2:
X2 = np.vstack([X2, np.zeros((d1 - d2, X2.shape[1]))])
if kernel == 'poly':
b, d = 0.1, 5
return (X.T @ X2 + b) ** d
elif kernel == 'lin':
return X.T @ X2
elif kernel == 'rbf':
n1sq = np.sum(X ** 2, axis=0)
n2sq = np.sum(X2 ** 2, axis=0)
D = n1sq[:, None] + n2sq[None, :] - 2 * X.T @ X2
return np.exp(-D / (2 * 0.1 ** 2))
else:
raise ValueError(f'Unsupported kernel: {kernel}')