"""
Scenario-based Self-Learning Transfer Differential Evolution (SSLT-DE)
This module implements SSLT-DE for multi-task optimization using a DQN-based
reinforcement learning framework to adaptively select among four knowledge
transfer scenarios.
References
----------
[1] Z. Yuan, G. Dai, L. Peng, M. Wang, Z. Song, and X. Chen, "Scenario-based
self-learning transfer framework for multi-task optimization problems,"
Knowledge-Based Systems, vol. 325, p. 113824, 2025.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
import torch
import torch.nn as nn
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
# ============================================================================
# Helper Functions
# ============================================================================
def _wasserstein_1d(u_decs, v_decs):
"""Compute 1D Wasserstein distance between two flattened population arrays."""
u = np.sort(u_decs.ravel())
v = np.sort(v_decs.ravel())
all_vals = np.unique(np.concatenate([u, v]))
if len(all_vals) < 2:
return 0.0
u_cdf = np.searchsorted(u, all_vals[:-1], side='right') / len(u)
v_cdf = np.searchsorted(v, all_vals[:-1], side='right') / len(v)
return float(np.sum(np.abs(u_cdf - v_cdf) * np.diff(all_vals)))
def _dispersion_metric(decs, objs):
"""Mean pairwise squared distance among top 10% individuals."""
M = max(int(0.1 * len(objs)), 1)
rank = np.argsort(objs.flatten())
top = decs[rank[:M]]
if M <= 1:
return 0.0
total = 0.0
for i in range(M - 1):
diff = top[i + 1:] - top[i]
total += np.sum(diff ** 2)
return total / (M * (M - 1))
def _dispersion_type(decs, objs, decs_old, objs_old):
"""Compare dispersion: 1=decreasing, 2=same, 3=increasing."""
dm = _dispersion_metric(decs, objs)
dm_old = _dispersion_metric(decs_old, objs_old)
if dm < dm_old:
return 1
elif dm == dm_old:
return 2
else:
return 3
def _convergence_dist(decs_old, decs_new):
"""Euclidean distance between old and new population centers."""
c_old = np.mean(decs_old, axis=0)
c_new = np.mean(decs_new, axis=0)
return float(np.sqrt(np.sum((c_old - c_new) ** 2)))
def _smooth(decs, objs):
"""Keep the best individual from each consecutive triple."""
keep = []
n = len(decs)
for i in range(0, n - 2, 3):
triple_objs = objs[i:i + 3].flatten()
best = np.argmin(triple_objs)
keep.append(i + best)
if len(keep) == 0:
keep = [np.argmin(objs.flatten())]
return decs[keep], objs[keep]
def _de_crossover_single(trial, target, CR):
"""Binomial crossover for a single individual pair."""
d = len(trial)
mask = np.random.rand(d) < CR
j_rand = np.random.randint(d)
mask[j_rand] = True
offspring = target.copy()
offspring[mask] = trial[mask]
return offspring
def _normalize(X):
"""Min-max normalize columns to [-1, 1], matching MATLAB mapminmax."""
mins = X.min(axis=0)
maxs = X.max(axis=0)
rng = maxs - mins
rng[rng == 0] = 1.0
return 2.0 * (X - mins) / rng - 1.0, mins, maxs
def _normalize_apply(x, mins, maxs):
"""Apply saved min-max normalization."""
rng = maxs - mins
rng[rng == 0] = 1.0
return 2.0 * (x - mins) / rng - 1.0
# ============================================================================
# Q-Network for DQN
# ============================================================================
class _QNet(nn.Module):
"""Simple MLP for Q-value prediction."""
def __init__(self, input_dim=7, hidden_dim=32):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1)
)
def forward(self, x):
return self.net(x).squeeze(-1)
def _train_qnet(model, X, y, epochs=200, lr=0.005):
"""Train Q-network on normalized data."""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
X_t = torch.tensor(X, dtype=torch.float32)
y_t = torch.tensor(y, dtype=torch.float32)
model.train()
for _ in range(epochs):
pred = model(X_t)
loss = nn.functional.mse_loss(pred, y_t)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# ============================================================================
# SSLT-DE Algorithm
# ============================================================================
[docs]
class SSLT_DE:
"""
Scenario-based Self-Learning Transfer Differential Evolution.
Uses a DQN-based reinforcement learning framework to adaptively select
among four knowledge transfer scenarios:
1. No transfer (standard DE/rand/1/bin)
2. Shape transfer (shift smoothed source toward target center)
3. Bi-directional transfer (DE on merged populations)
4. Domain transfer (direction-guided from best source-target difference)
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None,
threshold=150, gap=50, gamma=0.9, epsilon=0.8,
F=0.5, CR=0.9,
save_data=True, save_path='./Data', name='SSLT-DE',
disable_tqdm=True):
"""
Initialize SSLT-DE algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
threshold : int, optional
Number of generations before building DQN (default: 150)
gap : int, optional
DQN update interval in generations (default: 50)
gamma : float, optional
Discount factor for Q-learning (default: 0.9)
epsilon : float, optional
Epsilon-greedy exploration rate (default: 0.8)
F : float, optional
DE mutation scale factor (default: 0.5)
CR : float, optional
DE crossover rate (default: 0.9)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'SSLT-DE')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.threshold = threshold
self.gap = gap
self.gamma = gamma
self.epsilon = epsilon
self.F = F
self.CR = CR
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the SSLT-DE algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n = self.n
max_nfes_per_task = par_list(self.max_nfes, nt)
max_nfes = self.max_nfes * nt
eps = 1e-30
# Initialize and evaluate
decs = initialization(problem, n)
objs, cons = evaluation(problem, decs)
nfes = n * nt
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Convert to unified space for cross-task operations
pop_decs, pop_cons = space_transfer(
problem=problem, decs=decs, cons=cons, type='uni', padding='mid')
pop_objs = objs
maxD = pop_decs[0].shape[1]
maxC = pop_cons[0].shape[1]
# Per-task DQN state
data_task = [[] for _ in range(nt)] # Experience replay buffer
model_built = [False] * nt # Whether DQN is built
count_task = [0] * nt # Update counter
q_model = [None] * nt # Q-network per task
norm_params = [None] * nt # Normalization parameters
# Store previous generation populations
pop_decs_old = [d.copy() for d in pop_decs]
pop_objs_old = [o.copy() for o in pop_objs]
gen = 0
pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}",
disable=self.disable_tqdm)
while nfes < max_nfes:
for t in range(nt):
if nfes >= max_nfes:
break
# Random source task
s = np.random.randint(nt)
while s == t and nt > 1:
s = np.random.randint(nt)
# ============================================================
# Compute state features
# ============================================================
min_old_t = np.min(pop_objs_old[t]) + eps
min_cur_t = np.min(pop_objs[t]) + eps
min_old_s = np.min(pop_objs_old[s]) + eps
min_cur_s = np.min(pop_objs[s]) + eps
conv_target = (min_old_t - min_cur_t) / abs(min_old_t)
conv_source = (min_old_s - min_cur_s) / abs(min_old_s)
wsd = _wasserstein_1d(pop_decs[t], pop_decs[s])
ls_target = _dispersion_type(pop_decs[t], pop_objs[t],
pop_decs_old[t], pop_objs_old[t])
ls_source = _dispersion_type(pop_decs[s], pop_objs[s],
pop_decs_old[s], pop_objs_old[s])
pha = nfes / max_nfes
state = np.array([conv_source, conv_target, wsd,
ls_target, ls_source, pha])
# ============================================================
# Action selection
# ============================================================
if gen <= self.threshold:
action = np.random.randint(1, 5)
elif not model_built[t]:
# Build DQN model
exp = np.array(data_task[t])
X_raw = exp[:, :7]
y_raw = exp[:, 7]
X_norm, x_min, x_max = _normalize(X_raw)
y_norm, y_min, y_max = _normalize(y_raw.reshape(-1, 1))
y_norm = y_norm.flatten()
q_model[t] = _QNet(input_dim=7, hidden_dim=32)
_train_qnet(q_model[t], X_norm, y_norm)
norm_params[t] = (x_min, x_max, y_min, y_max)
model_built[t] = True
action = np.random.randint(1, 5)
else:
# Epsilon-greedy
if np.random.rand() > self.epsilon:
action = np.random.randint(1, 5)
else:
x_min, x_max, y_min, y_max = norm_params[t]
q_vals = []
q_model[t].eval()
with torch.no_grad():
for a in range(1, 5):
x_raw = np.append(state, a).reshape(1, -1)
x_n = _normalize_apply(x_raw, x_min, x_max)
x_t = torch.tensor(x_n, dtype=torch.float32)
q_vals.append(q_model[t](x_t).item())
action = np.argmax(q_vals) + 1
# ============================================================
# Execute action
# ============================================================
pop_decs_old[t] = pop_decs[t].copy()
pop_objs_old[t] = pop_objs[t].copy()
if action == 1:
# No KT: standard DE/rand/1/bin
off_decs = de_generation(pop_decs[t], F=self.F, CR=self.CR)
off_objs, off_cons_real = evaluation_single(
problem, off_decs[:, :dims[t]], t)
nfes += n
pbar.update(n)
# 1-to-1 DE selection
better = off_objs.flatten() <= pop_objs[t].flatten()
pop_decs[t][better] = off_decs[better]
pop_objs[t][better] = off_objs[better]
elif action == 2:
# Shape KT: shift smoothed source toward target center
sm_s_decs, sm_s_objs = _smooth(pop_decs[s], pop_objs[s])
sm_t_decs, _ = _smooth(pop_decs[t], pop_objs[t])
center_t = np.mean(sm_t_decs, axis=0)
center_s = np.mean(sm_s_decs, axis=0)
shifted = sm_s_decs + (center_t - center_s)
shifted = np.clip(shifted, 0, 1)
n_shifted = len(shifted)
sh_objs, sh_cons_real = evaluation_single(
problem, shifted[:, :dims[t]], t)
nfes += n_shifted
pbar.update(n_shifted)
# Elite selection (target ∪ shifted)
merged_decs = np.vstack([pop_decs[t], shifted])
merged_objs = np.vstack([pop_objs[t], sh_objs])
sel = selection_elit(objs=merged_objs, n=n)
pop_decs[t] = merged_decs[sel]
pop_objs[t] = merged_objs[sel]
elif action == 3:
# Bi-KT: DE on merged populations
merged = np.vstack([pop_decs[t], pop_decs[s]])
n_merged = len(merged)
off_decs = np.zeros_like(merged)
for i in range(n_merged):
# DE/current-to-rand/1
idxs = list(range(n_merged))
idxs.remove(i)
r1, r2, r3 = np.random.choice(idxs, 3, replace=False)
v = merged[i] + self.F * (merged[r1] - merged[i]) \
+ 0.5 * (merged[r2] - merged[r3])
off_decs[i] = _de_crossover_single(v, merged[i], self.CR)
off_decs = np.clip(off_decs, 0, 1)
off_objs, off_cons_real = evaluation_single(
problem, off_decs[:, :dims[t]], t)
nfes += n_merged
pbar.update(n_merged)
# Elite selection (target ∪ offspring)
merged_sel = np.vstack([pop_decs[t], off_decs])
merged_sel_objs = np.vstack([pop_objs[t], off_objs])
sel = selection_elit(objs=merged_sel_objs, n=n)
pop_decs[t] = merged_sel[sel]
pop_objs[t] = merged_sel_objs[sel]
elif action == 4:
# Domain KT: direction-guided transfer
best_s = np.argmin(pop_objs[s].flatten())
best_t = np.argmin(pop_objs[t].flatten())
direction = pop_decs[s][best_s] - pop_decs[t][best_t]
num = max(1, round(pha * 10))
perm = np.random.permutation(n)
off_decs = np.zeros((num, maxD))
for i in range(num):
idx = perm[i % n]
off_decs[i] = _de_crossover_single(
pop_decs[t][idx], direction, self.CR)
off_decs = np.clip(off_decs, 0, 1)
off_objs, off_cons_real = evaluation_single(
problem, off_decs[:, :dims[t]], t)
nfes += num
pbar.update(num)
# Elite selection (target ∪ offspring)
merged_decs = np.vstack([pop_decs[t], off_decs])
merged_objs = np.vstack([pop_objs[t], off_objs])
sel = selection_elit(objs=merged_objs, n=n)
pop_decs[t] = merged_decs[sel]
pop_objs[t] = merged_objs[sel]
# ============================================================
# Compute reward and store experience
# ============================================================
fold = np.min(pop_decs_old[t] @ np.ones((maxD, 1))) # dummy
fold = np.min(pop_objs_old[t])
f = np.min(pop_objs[t])
fold_mean = np.mean(pop_objs_old[t])
f_mean = np.mean(pop_objs[t])
imp_rate = (fold - f) / (abs(fold) + eps)
pop_rate = (fold_mean - f_mean) / (abs(fold_mean) + eps)
move_dis = _convergence_dist(pop_decs_old[t], pop_decs[t])
vals = np.array([imp_rate, pop_rate, move_dis])
max_val, min_val = vals.max(), vals.min()
rng = max_val - min_val
if rng > eps:
imp_rate_n = (imp_rate - min_val) / rng
pop_rate_n = (pop_rate - min_val) / rng
move_dis_n = (move_dis - min_val) / rng
else:
imp_rate_n = pop_rate_n = move_dis_n = 0.0
pha_new = nfes / max_nfes
reward = (imp_rate_n + pop_rate_n + move_dis_n) * pha_new
# New state features
min_new_t = np.min(pop_objs[t]) + eps
min_new_s = np.min(pop_objs[s]) + eps
conv_new_target = (np.min(pop_objs_old[t]) - min_new_t) / abs(np.min(pop_objs_old[t]) + eps)
conv_new_source = (np.min(pop_objs_old[s]) - min_new_s) / abs(np.min(pop_objs_old[s]) + eps)
wsd_new = _wasserstein_1d(pop_decs[s], pop_decs[t])
ls_new_target = _dispersion_type(pop_decs[t], pop_objs[t],
pop_decs_old[t], pop_objs_old[t])
ls_new_source = _dispersion_type(pop_decs[s], pop_objs[s],
pop_decs_old[s], pop_objs_old[s])
record = np.array([
conv_source, conv_target, wsd, ls_target, ls_source, pha, action,
reward, conv_new_source, conv_new_target, wsd_new,
ls_new_target, ls_new_source, pha_new
])
data_task[t].append(record)
if len(data_task[t]) > 500:
data_task[t].pop(0)
# ============================================================
# Update DQN periodically
# ============================================================
if model_built[t]:
count_task[t] += 1
if count_task[t] > self.gap:
exp = np.array(data_task[t])
X_raw = exp[:, :7]
rewards_raw = exp[:, 7]
X_norm, x_min, x_max = _normalize(X_raw)
# Compute max Q across all experiences
q_model[t].eval()
with torch.no_grad():
X_t = torch.tensor(X_norm, dtype=torch.float32)
q_preds = q_model[t](X_t).numpy()
max_q = np.max(q_preds)
# Q-learning target: R + gamma * max(Q)
target_q = rewards_raw + self.gamma * max_q
y_norm, y_min, y_max = _normalize(target_q.reshape(-1, 1))
y_norm = y_norm.flatten()
norm_params[t] = (x_min, x_max, y_min, y_max)
q_model[t] = _QNet(input_dim=7, hidden_dim=32)
_train_qnet(q_model[t], X_norm, y_norm)
count_task[t] = 0
# Record history in real space
real_decs, real_cons = space_transfer(
problem, decs=pop_decs, cons=pop_cons, type='real')
append_history(all_decs, real_decs, all_objs, pop_objs,
all_cons, real_cons)
gen += 1
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=max_nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results