"""
Multitask Differential Evolution with Adaptive Dual Knowledge Transfer (MTDE-ADKT)
This module implements MTDE-ADKT for multi-task optimization using SHADE-based
adaptive parameters, distribution-aligned knowledge transfer, and adaptive
transfer probability control.
References
----------
[1] Zhang, Tingyu, et al. "Multitask Differential Evolution with Adaptive
Dual Knowledge Transfer." Applied Soft Computing, 165: 112040, 2024.
Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class MTDE_ADKT:
"""
Multitask DE with Adaptive Dual Knowledge Transfer.
Combines SHADE-based adaptive F/CR with two knowledge transfer modes:
- Type 1: Distribution-aligned transfer via covariance whitening/coloring
- Type 2: Direct transfer from source task
- Type 3: Standard DE/current-to-pbest/1 (no transfer)
Transfer probabilities (RMP1, RMP2) are adaptively adjusted based on
the relative success rates of each transfer type.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, P=0.1, H=100,
Gap=50, Alpha=0.25, RMP0=0.15, Beta=0.9, TGap=1,
save_data=True, save_path='./Data', name='MTDE-ADKT',
disable_tqdm=True):
"""
Initialize MTDE-ADKT algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
P : float, optional
Top P fraction for pbest selection (default: 0.1)
H : int, optional
SHADE success history memory size (default: 100)
Gap : int, optional
RMP adaptation period in generations (default: 50)
Alpha : float, optional
Population reduction timing as fraction of budget (default: 0.25)
RMP0 : float, optional
Initial random mating probability (default: 0.15)
Beta : float, optional
EMA smoothing factor for centroid tracking (default: 0.9)
TGap : int, optional
Transfer frequency: transfer every TGap generations (default: 1)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'MTDE-ADKT')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.P = P
self.H = H
self.Gap = Gap
self.Alpha = Alpha
self.RMP0 = RMP0
self.Beta = Beta
self.TGap = TGap
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MTDE-ADKT algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n = self.n
max_nfes_per_task = par_list(self.max_nfes, nt)
max_nfes = self.max_nfes * nt
# Initialize and evaluate
decs = initialization(problem, n)
objs, cons = evaluation(problem, decs)
nfes = n * nt
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Convert to unified space
pop_decs, pop_cons = space_transfer(
problem=problem, decs=decs, cons=cons, type='uni', padding='mid')
pop_objs = objs
maxD = pop_decs[0].shape[1]
maxC = pop_cons[0].shape[1]
# SHADE memory
MF = [0.5 * np.ones(self.H) for _ in range(nt)]
MCR = [0.5 * np.ones(self.H) for _ in range(nt)]
Hidx = [0] * nt
# Archives (unified space decs)
archives = [np.empty((0, maxD)) for _ in range(nt)]
# Success rate histories (initialized with 1.0 as in MATLAB)
r_suc1 = [[1.0] for _ in range(nt)]
r_suc2 = [[1.0] for _ in range(nt)]
r_suc3 = [[1.0] for _ in range(nt)]
# Adaptive RMP
RMP1 = np.full(nt, self.RMP0)
RMP2 = np.full(nt, self.RMP0)
delta_rmp = self.Gap / 500.0
# EMA centroid
mDec = 0.5 * np.ones((nt, maxD))
for t in range(nt):
mDec[t] = np.mean(pop_decs[t], axis=0)
reduce_flag = False
gen = 1
pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}",
disable=self.disable_tqdm)
while nfes < max_nfes:
for t in range(nt):
n_t = len(pop_decs[t])
# --- SHADE: generate adaptive F and CR ---
F_arr = np.zeros(n_t)
CR_arr = np.zeros(n_t)
for i in range(n_t):
idx = np.random.randint(self.H)
f = MF[t][idx] + 0.1 * np.tan(np.pi * (np.random.rand() - 0.5))
while f <= 0:
f = MF[t][idx] + 0.1 * np.tan(
np.pi * (np.random.rand() - 0.5))
F_arr[i] = min(f, 1.0)
cr = np.random.normal(MCR[t][idx], 0.1)
CR_arr[i] = np.clip(cr, 0, 1)
# Source task selection
k = np.random.randint(nt)
while k == t:
k = np.random.randint(nt)
# Union: population + archive
if len(archives[t]) > 0:
union_decs = np.vstack([pop_decs[t], archives[t]])
else:
union_decs = pop_decs[t]
# --- Partition individuals into 3 transfer types ---
par1_idx, par2_idx, par3_idx = [], [], []
if gen % self.TGap == 0:
for i in range(n_t):
if np.random.rand() < RMP1[t]:
par1_idx.append(i)
elif np.random.rand() < RMP2[t]:
par2_idx.append(i)
else:
par3_idx.append(i)
else:
par3_idx = list(range(n_t))
n1, n2, n3 = len(par1_idx), len(par2_idx), len(par3_idx)
# pbest indices (sorted by CV then obj)
cvs = np.sum(np.maximum(0, pop_cons[t]), axis=1) \
if maxC > 0 else np.zeros(n_t)
sort_order = np.lexsort((pop_objs[t][:, 0], cvs))
n_pbest = max(round(self.P * n_t), 1)
pop_pbest = sort_order[:n_pbest]
# Transfer individuals from source
n_transfer = n1 + n2
n_source = len(pop_decs[k])
if n_transfer > 0:
t_perm = np.random.permutation(n_source)
transfer_idx = t_perm[:min(n_transfer, n_source)]
if len(transfer_idx) < n_transfer:
extra = np.random.randint(
n_source, size=n_transfer - len(transfer_idx))
transfer_idx = np.concatenate([transfer_idx, extra])
else:
transfer_idx = np.array([], dtype=int)
# Offspring storage
off_decs = np.zeros_like(pop_decs[t])
off_objs = np.full_like(pop_objs[t], np.inf)
off_cons = np.zeros_like(pop_cons[t])
replace_all = np.zeros(n_t, dtype=bool)
# === Type 1: Distribution-aligned transfer ===
if n1 > 0:
idx1 = np.array(par1_idx)
t_idx1 = transfer_idx[:n1]
# D_Align: compute transformation matrix once
mus = np.mean(pop_decs[k], axis=0)
mut = np.mean(pop_decs[t], axis=0)
Cs = np.cov((pop_decs[k] - mus), rowvar=False) \
+ np.eye(maxD)
Ct = np.cov((pop_decs[t] - mut), rowvar=False) \
+ np.eye(maxD)
inv_sqrt_Cs = _matrix_inv_sqrt(Cs)
sqrt_Ct = _matrix_sqrt(Ct)
T_mat = inv_sqrt_Cs @ sqrt_Ct
# Transform source individuals
source_centered = pop_decs[k][t_idx1] - mus
transpop1 = source_centered @ T_mat + mDec[t]
off1 = _generation_transfer(
pop_decs[t][idx1], transpop1,
F_arr[idx1], CR_arr[idx1],
pop_decs[t], union_decs, pop_pbest, maxD)
o1, c1_real = evaluation_single(
problem, off1[:, :dims[t]], t)
c1 = np.zeros((n1, maxC))
if maxC > 0 and c1_real.shape[1] > 0:
c1[:, :c1_real.shape[1]] = c1_real
nfes += n1
pbar.update(n1)
off_decs[idx1] = off1
off_objs[idx1] = o1
off_cons[idx1] = c1
rep1 = _tournament(pop_objs[t][idx1], pop_cons[t][idx1],
o1, c1, maxC)
replace_all[idx1] = rep1
r_suc1[t].append(np.sum(rep1) / n1)
else:
r_suc1[t].append(0.0)
# === Type 2: Direct transfer (no D_Align) ===
if n2 > 0:
idx2 = np.array(par2_idx)
t_idx2 = transfer_idx[n1:n1 + n2]
transpop2 = pop_decs[k][t_idx2].copy()
off2 = _generation_transfer(
pop_decs[t][idx2], transpop2,
F_arr[idx2], CR_arr[idx2],
pop_decs[t], union_decs, pop_pbest, maxD)
o2, c2_real = evaluation_single(
problem, off2[:, :dims[t]], t)
c2 = np.zeros((n2, maxC))
if maxC > 0 and c2_real.shape[1] > 0:
c2[:, :c2_real.shape[1]] = c2_real
nfes += n2
pbar.update(n2)
off_decs[idx2] = off2
off_objs[idx2] = o2
off_cons[idx2] = c2
rep2 = _tournament(pop_objs[t][idx2], pop_cons[t][idx2],
o2, c2, maxC)
replace_all[idx2] = rep2
r_suc2[t].append(np.sum(rep2) / n2)
else:
r_suc2[t].append(0.0)
# === Type 3: Standard DE/current-to-pbest/1 ===
if n3 > 0:
idx3 = np.array(par3_idx)
off3 = _generation_standard(
pop_decs[t][idx3],
F_arr[idx3], CR_arr[idx3],
pop_decs[t], union_decs, pop_pbest, maxD)
o3, c3_real = evaluation_single(
problem, off3[:, :dims[t]], t)
c3 = np.zeros((n3, maxC))
if maxC > 0 and c3_real.shape[1] > 0:
c3[:, :c3_real.shape[1]] = c3_real
nfes += n3
pbar.update(n3)
off_decs[idx3] = off3
off_objs[idx3] = o3
off_cons[idx3] = c3
rep3 = _tournament(pop_objs[t][idx3], pop_cons[t][idx3],
o3, c3, maxC)
replace_all[idx3] = rep3
r_suc3[t].append(np.sum(rep3) / n3)
else:
r_suc3[t].append(0.0)
# === SHADE update ===
if np.any(replace_all):
SF = F_arr[replace_all]
SCR = CR_arr[replace_all]
p_cv = np.sum(np.maximum(0, pop_cons[t][replace_all]),
axis=1) if maxC > 0 else np.zeros(
np.sum(replace_all))
o_cv = np.sum(np.maximum(0, off_cons[replace_all]),
axis=1) if maxC > 0 else np.zeros(
np.sum(replace_all))
dif = p_cv - o_cv
dif_obj = pop_objs[t][replace_all, 0] \
- off_objs[replace_all, 0]
dif_obj = np.maximum(dif_obj, 0)
dif[dif <= 0] = dif_obj[dif <= 0]
sum_dif = np.sum(dif)
if sum_dif > 0:
dif = dif / sum_dif
MF[t][Hidx[t]] = np.sum(dif * SF ** 2) / \
np.sum(dif * SF)
MCR[t][Hidx[t]] = np.sum(dif * SCR)
else:
MF[t][Hidx[t]] = MF[t][(Hidx[t] - 1) % self.H]
MCR[t][Hidx[t]] = MCR[t][(Hidx[t] - 1) % self.H]
else:
MF[t][Hidx[t]] = MF[t][(Hidx[t] - 1) % self.H]
MCR[t][Hidx[t]] = MCR[t][(Hidx[t] - 1) % self.H]
Hidx[t] = (Hidx[t] + 1) % self.H
# === Archive update ===
if np.any(replace_all):
replaced_decs = pop_decs[t][replace_all]
if len(archives[t]) > 0:
archives[t] = np.vstack(
[archives[t], replaced_decs])
else:
archives[t] = replaced_decs.copy()
if len(archives[t]) > n:
perm = np.random.permutation(len(archives[t]))[:n]
archives[t] = archives[t][perm]
# === Selection: replace winners ===
pop_decs[t][replace_all] = off_decs[replace_all]
pop_objs[t][replace_all] = off_objs[replace_all]
pop_cons[t][replace_all] = off_cons[replace_all]
# === RMP adaptation ===
for t in range(nt):
if gen % self.Gap == 0 and gen >= self.Gap:
r1_avg = np.mean(r_suc1[t][gen - self.Gap:gen])
r3_avg = np.mean(r_suc3[t][gen - self.Gap:gen])
if r1_avg * self.TGap >= r3_avg:
RMP1[t] = min(RMP1[t] + delta_rmp, 0.45)
else:
RMP1[t] = max(RMP1[t] - delta_rmp, 0.01)
r2_avg = np.mean(r_suc2[t][gen - self.Gap:gen])
if r2_avg * self.TGap >= r3_avg:
RMP2[t] = min(RMP2[t] + delta_rmp, 0.45)
else:
RMP2[t] = max(RMP2[t] - delta_rmp, 0.02)
# === Population reduction ===
if not reduce_flag and nfes >= max_nfes * self.Alpha:
new_n = round(n / 2)
for t in range(nt):
cvs_t = np.sum(np.maximum(0, pop_cons[t]), axis=1) \
if maxC > 0 else np.zeros(len(pop_decs[t]))
rank_t = np.lexsort((pop_objs[t][:, 0], cvs_t))
# Save removed to archive
removed = rank_t[new_n:]
if len(removed) > 0:
if len(archives[t]) > 0:
archives[t] = np.vstack(
[archives[t], pop_decs[t][removed]])
else:
archives[t] = pop_decs[t][removed].copy()
if len(archives[t]) > n:
perm = np.random.permutation(
len(archives[t]))[:n]
archives[t] = archives[t][perm]
# Keep top half
keep = rank_t[:new_n]
pop_decs[t] = pop_decs[t][keep]
pop_objs[t] = pop_objs[t][keep]
pop_cons[t] = pop_cons[t][keep]
reduce_flag = True
# === EMA centroid update ===
for t in range(nt):
mDec[t] = (1 - self.Beta) * mDec[t] + \
self.Beta * np.mean(pop_decs[t], axis=0)
# Record history
real_decs, real_cons = space_transfer(
problem, decs=pop_decs, cons=pop_cons, type='real')
append_history(all_decs, real_decs, all_objs, pop_objs,
all_cons, real_cons)
gen += 1
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=max_nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
# ============================================================
# Helper functions
# ============================================================
def _matrix_sqrt(A):
"""Compute matrix square root via eigendecomposition."""
eigvals, eigvecs = np.linalg.eigh(A)
eigvals = np.maximum(eigvals, 1e-10)
return eigvecs @ np.diag(np.sqrt(eigvals)) @ eigvecs.T
def _matrix_inv_sqrt(A):
"""Compute inverse matrix square root via eigendecomposition."""
eigvals, eigvecs = np.linalg.eigh(A)
eigvals = np.maximum(eigvals, 1e-10)
return eigvecs @ np.diag(1.0 / np.sqrt(eigvals)) @ eigvecs.T
def _tournament(parent_objs, parent_cons, off_objs, off_cons, maxC):
"""1-to-1 tournament selection: offspring replaces parent if better."""
n_ind = len(parent_objs)
replace = np.zeros(n_ind, dtype=bool)
p_cv = np.sum(np.maximum(0, parent_cons), axis=1) \
if maxC > 0 else np.zeros(n_ind)
o_cv = np.sum(np.maximum(0, off_cons), axis=1) \
if maxC > 0 else np.zeros(n_ind)
for i in range(n_ind):
if o_cv[i] < p_cv[i]:
replace[i] = True
elif o_cv[i] == p_cv[i] and off_objs[i, 0] < parent_objs[i, 0]:
replace[i] = True
return replace
def _generation_transfer(parent_decs, transpop_decs, F, CR,
pop_decs, union_decs, pbest_idx, D):
"""
DE/current-to-pbest/1 with transfer base vector.
Mutation: v = transpop_i + F * (pbest - transpop_i) + F * (x1 - x2)
Crossover: binomial with transpop_i
Boundary: midpoint repair toward transpop_i
"""
n_ind = len(parent_decs)
n_pop = len(pop_decs)
n_union = len(union_decs)
off = np.zeros((n_ind, D))
for i in range(n_ind):
pb = pbest_idx[np.random.randint(len(pbest_idx))]
x1 = np.random.randint(n_pop)
while x1 == pb:
x1 = np.random.randint(n_pop)
x2 = np.random.randint(n_union)
while x2 == x1 or x2 == pb:
x2 = np.random.randint(n_union)
v = transpop_decs[i] + \
F[i] * (pop_decs[pb] - transpop_decs[i]) + \
F[i] * (pop_decs[x1] - union_decs[x2])
# Binomial crossover with transpop (not parent)
u = transpop_decs[i].copy()
j_rand = np.random.randint(D)
mask = np.random.rand(D) < CR[i]
mask[j_rand] = True
u[mask] = v[mask]
# Midpoint boundary repair toward transpop
vio_low = u < 0
u[vio_low] = transpop_decs[i][vio_low] / 2
vio_up = u > 1
u[vio_up] = (transpop_decs[i][vio_up] + 1) / 2
off[i] = u
return off
def _generation_standard(parent_decs, F, CR, pop_decs, union_decs,
pbest_idx, D):
"""
Standard DE/current-to-pbest/1 (no transfer).
Mutation: v = parent_i + F * (pbest - parent_i) + F * (x1 - x2)
Crossover: binomial with parent_i
Boundary: midpoint repair toward parent_i
"""
n_ind = len(parent_decs)
n_pop = len(pop_decs)
n_union = len(union_decs)
off = np.zeros((n_ind, D))
for i in range(n_ind):
pb = pbest_idx[np.random.randint(len(pbest_idx))]
x1 = np.random.randint(n_pop)
while x1 == pb:
x1 = np.random.randint(n_pop)
x2 = np.random.randint(n_union)
while x2 == x1 or x2 == pb:
x2 = np.random.randint(n_union)
v = parent_decs[i] + \
F[i] * (pop_decs[pb] - parent_decs[i]) + \
F[i] * (pop_decs[x1] - union_decs[x2])
# Binomial crossover with parent
u = parent_decs[i].copy()
j_rand = np.random.randint(D)
mask = np.random.rand(D) < CR[i]
mask[j_rand] = True
u[mask] = v[mask]
# Midpoint boundary repair toward parent
vio_low = u < 0
u[vio_low] = parent_decs[i][vio_low] / 2
vio_up = u > 1
u[vio_up] = (parent_decs[i][vio_up] + 1) / 2
off[i] = u
return off