"""
Multi-Task Evolutionary Algorithm with Hierarchical Knowledge Transfer Strategy (MTEA-HKTS)
This module implements MTEA-HKTS for multi-task optimization using KLD-based
variable ordering, adaptive knowledge transfer with hierarchical strategy
selection, and alternating GA/DE operators.
References
----------
[1] Zhao, Ben, et al. "A Multi-Task Evolutionary Algorithm for Solving
the Problem of Transfer Targets." Information Sciences, 681: 121214,
2024.
Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class MTEA_HKTS:
"""
Multi-Task EA with Hierarchical Knowledge Transfer Strategy.
Uses KLD-based decision variable alignment across tasks, adaptive
transfer probability control via a task selection table, and
alternating GA (SBX+PM) / DE (rand/1/bin) operators.
Three operation modes per generation:
- sign=0 (10%): Separate transferred population evaluated independently
- sign=1 (9%): Transferred individuals replace worst, standard GA/DE
- sign=2 (81%): Transferred individuals in temp pop, cross-population GA/DE
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, pTransfer=0.5,
mu=2, mum=5, F=0.5, CR=0.5, minx=0.1, Lb=0.1, Ub=0.7,
save_data=True, save_path='./Data', name='MTEA-HKTS',
disable_tqdm=True):
"""
Initialize MTEA-HKTS algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
pTransfer : float, optional
Initial transfer portion (default: 0.5)
mu : float, optional
SBX crossover distribution index (default: 2)
mum : float, optional
Polynomial mutation distribution index (default: 5)
F : float, optional
DE mutation factor (default: 0.5)
CR : float, optional
DE crossover rate (default: 0.5)
minx : float, optional
Minimum scale boundary (default: 0.1)
Lb : float, optional
Lower bound for transfer probability (default: 0.1)
Ub : float, optional
Upper bound for transfer probability (default: 0.7)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'MTEA-HKTS')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.pTransfer = pTransfer
self.mu = mu
self.mum = mum
self.F = F
self.CR = CR
self.minx = minx
self.Lb = Lb
self.Ub = Ub
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MTEA-HKTS algorithm.
Returns
-------
Results
Optimization results
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n = self.n
max_nfes_per_task = par_list(self.max_nfes, nt)
max_nfes = self.max_nfes * nt
# Initialize and evaluate
decs = initialization(problem, n)
objs, cons = evaluation(problem, decs)
nfes = n * nt
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Convert to unified space
pop_decs, pop_cons = space_transfer(
problem=problem, decs=decs, cons=cons, type='uni', padding='mid')
pop_objs = objs
maxD = pop_decs[0].shape[1]
maxC = pop_cons[0].shape[1]
# Sort each task by objective
for t in range(nt):
si = np.argsort(pop_objs[t][:, 0])
pop_decs[t] = pop_decs[t][si]
pop_objs[t] = pop_objs[t][si]
pop_cons[t] = pop_cons[t][si]
# Transfer probability table (diagonal = 0)
scale = np.full((nt, nt), self.pTransfer)
table = np.full((nt, nt), 0.5)
np.fill_diagonal(table, 0.0)
# Archive: 3N individuals per task (decs + objs)
arch_decs = []
arch_objs = []
for t in range(nt):
idx = np.random.randint(n, size=3 * n)
arch_decs.append(pop_decs[t][idx].copy())
arch_objs.append(pop_objs[t][idx].copy())
gen = 1
pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}",
disable=self.disable_tqdm)
while nfes < max_nfes:
for t in range(nt):
n_t = len(pop_decs[t])
# --- Operation mode ---
if np.random.rand() < 0.9:
sign = 1 if np.random.rand() < 0.1 else 2
else:
sign = 0
# --- Source task selection (roulette wheel) ---
m_pt = _select_task(table[t])
# --- Transfer option ---
if np.random.rand() < table[t, m_pt]:
option = 1 # KLD-aligned transfer
else:
m_pt = t
option = 2 # Self with exclusion
if np.random.rand() > 0.9:
option = 0 # Random mapping
# --- Variable ordering (using archives) ---
order = _var_order(arch_decs[m_pt], arch_decs[t],
dims[m_pt], dims[t], option)
# --- Population preparation ---
transpop_decs = None
if sign == 1 or sign == 2:
nTransfer = max(round(scale[t, m_pt] * n_t), 1)
nTransfer = min(nTransfer, n_t)
temp_decs = pop_decs[t][::-1].copy() # worst first
trans = _m_transfer(
pop_decs[m_pt], pop_decs[t],
dims[m_pt], dims[t], nTransfer, order, option)
temp_decs[:nTransfer] = trans
else: # sign == 0
nTransfer = max(round(0.1 * n_t), 1)
temp_decs = pop_decs[t].copy()
transpop_decs = _m_transfer(
pop_decs[m_pt], pop_decs[t],
dims[m_pt], dims[t], nTransfer, order, option)
# --- Generation ---
op = 'GA' if t % 2 == 0 else 'DE'
if sign == 1 or sign == 0:
if op == 'GA':
off_decs = _gen_ga(temp_decs, self.mu, self.mum)
else:
off_decs = _gen_de(temp_decs, self.F, self.CR)
else: # sign == 2
if op == 'GA':
off_decs = _gen_ga1(
pop_decs[t], temp_decs, self.mu, self.mum)
else:
off_decs = _gen_de1(
pop_decs[t], temp_decs, self.F, self.CR)
# --- Evaluate offspring ---
o_objs, o_cons_r = evaluation_single(
problem, off_decs[:, :dims[t]], t)
o_cons = np.zeros((len(off_decs), maxC))
if maxC > 0 and o_cons_r.shape[1] > 0:
o_cons[:, :o_cons_r.shape[1]] = o_cons_r
nfes += len(off_decs)
pbar.update(len(off_decs))
# --- Merge and select ---
if sign == 0 and transpop_decs is not None:
tp_objs, tp_cons_r = evaluation_single(
problem, transpop_decs[:, :dims[t]], t)
tp_cons = np.zeros((len(transpop_decs), maxC))
if maxC > 0 and tp_cons_r.shape[1] > 0:
tp_cons[:, :tp_cons_r.shape[1]] = tp_cons_r
nfes += len(transpop_decs)
pbar.update(len(transpop_decs))
m_decs = np.vstack([pop_decs[t], off_decs, transpop_decs])
m_objs = np.vstack([pop_objs[t], o_objs, tp_objs])
m_cons = np.vstack([pop_cons[t], o_cons, tp_cons])
else:
m_decs = np.vstack([pop_decs[t], off_decs])
m_objs = np.vstack([pop_objs[t], o_objs])
m_cons = np.vstack([pop_cons[t], o_cons])
tp_objs = None
# Sort by objective, select top N
si = np.argsort(m_objs[:, 0])
pop_decs[t] = m_decs[si[:n_t]]
pop_objs[t] = m_objs[si[:n_t]]
pop_cons[t] = m_cons[si[:n_t]]
# --- Update archive ---
seg = gen % 3
s_idx = seg * n_t
for i in range(n_t):
if pop_objs[t][i, 0] < arch_objs[t][s_idx + i, 0]:
arch_decs[t][s_idx + i] = pop_decs[t][i]
arch_objs[t][s_idx + i] = pop_objs[t][i]
# --- Transfer quality tracking ---
rev_pop = pop_decs[t][::-1] # worst first
quality = transpop_decs if sign == 0 else off_decs
ia_sum = 0
for r_idx in range(n_t):
for q_idx in range(len(quality)):
if np.array_equal(rev_pop[r_idx], quality[q_idx]):
ia_sum += (r_idx + 1)
break
norm = n_t / 2.0 * (n_t + 1)
ratio = ia_sum / norm if norm > 0 else 0
if sign == 0:
scale[t, m_pt] = 0.1 + ratio * 0.4
else:
scale[t, m_pt] = self.minx + ratio * (0.5 - self.minx)
# --- Transfer probability table update ---
if m_pt != t and option != 0:
denom = scale[t, m_pt] + scale[t, t]
if denom > 0:
temp_val = (scale[t, m_pt] - scale[t, t]) / denom
else:
temp_val = 0
w = 0.1 + np.random.rand() * 0.8
table[t, m_pt] = (
self.Lb + w * (table[t, m_pt] - self.Lb) +
(1 - w) * temp_val * (self.Ub - self.Lb))
if np.isnan(table[t, m_pt]) or table[t, m_pt] < self.Lb:
table[t, m_pt] = self.Lb
if table[t, m_pt] > self.Ub:
table[t, m_pt] = self.Ub
# Record history
real_decs, real_cons = space_transfer(
problem, decs=pop_decs, cons=pop_cons, type='real')
append_history(all_decs, real_decs, all_objs, pop_objs,
all_cons, real_cons)
gen += 1
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=max_nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
# ============================================================
# Helper functions
# ============================================================
def _select_task(table_row):
"""Roulette wheel selection based on transfer probability row."""
s = np.sum(table_row)
if s <= 0:
return np.random.randint(len(table_row))
probs = table_row / s
return np.random.choice(len(table_row), p=probs)
def _var_order(prev_decs, this_decs, prev_dims, this_dims, option):
"""
KLD-based decision variable ordering between two populations.
Maps each target dimension to the source dimension with minimum KLD.
"""
if option == 0:
return np.random.randint(prev_dims, size=this_dims)
m_prev = np.mean(prev_decs[:, :prev_dims], axis=0)
m_this = np.mean(this_decs[:, :this_dims], axis=0)
prev_max = np.max(prev_decs[:, :prev_dims], axis=0)
prev_min = np.min(prev_decs[:, :prev_dims], axis=0)
this_max = np.max(this_decs[:, :this_dims], axis=0)
this_min = np.min(this_decs[:, :this_dims], axis=0)
prev_range = np.maximum(prev_max - prev_min, 1e-15)
this_range = np.maximum(this_max - this_min, 1e-15)
var_prev = np.var(prev_decs[:, :prev_dims], axis=0, ddof=1)
var_this = np.var(this_decs[:, :this_dims], axis=0, ddof=1)
var_prev = np.maximum(var_prev, 1e-15)
var_this = np.maximum(var_this, 1e-15)
order = np.zeros(this_dims, dtype=int)
for i in range(this_dims):
# Scale source variance by range ratio
scaled_var = var_prev * (this_range[i] / prev_range) ** 2
scaled_var = np.maximum(scaled_var, 1e-15)
# KLD(source_j || target_i)
KLD = (np.log2(np.sqrt(scaled_var) / np.sqrt(var_this[i])) +
(var_this[i] + (m_prev - m_this[i]) ** 2) / (2 * scaled_var)
- 0.5)
if option == 2 and i < prev_dims:
KLD[i] = np.max(KLD) + 1 # exclude same-index match
order[i] = np.argmin(KLD)
return order
def _m_transfer(prev_decs, this_decs, prev_dims, this_dims,
n_transfer, order, option):
"""
Transfer and transform decision variables from source to target.
Scales variables by range ratio and shifts by mean difference
when source/target distributions don't overlap sufficiently.
"""
prev_max = np.max(prev_decs[:, :prev_dims], axis=0).copy()
prev_min = np.min(prev_decs[:, :prev_dims], axis=0).copy()
this_max = np.max(this_decs[:, :this_dims], axis=0)
this_min = np.min(this_decs[:, :this_dims], axis=0)
prev_range = np.maximum(
np.max(prev_decs[:, :prev_dims], axis=0) -
np.min(prev_decs[:, :prev_dims], axis=0), 1e-15)
this_range = np.maximum(this_max - this_min, 1e-15)
m_prev = np.mean(prev_decs[:, :prev_dims], axis=0)
m_this = np.mean(this_decs[:, :this_dims], axis=0)
# Scale prev bounds (in-place, cumulative per MATLAB code)
if option != 0:
for i in range(this_dims):
j = order[i]
sc = this_range[i] / prev_range[j]
prev_max[j] = (prev_max[j] - m_prev[j]) * sc + m_prev[j]
prev_min[j] = (prev_min[j] - m_prev[j]) * sc + m_prev[j]
n_transfer = min(n_transfer, len(prev_decs))
new_decs = this_decs[:n_transfer].copy()
for nn in range(n_transfer):
for i in range(this_dims):
j = order[i]
if option != 0 and prev_range[j] > 1e-15:
new_decs[nn, i] = ((prev_decs[nn, j] - m_prev[j]) *
(this_range[i] / prev_range[j]) +
m_prev[j])
else:
new_decs[nn, i] = prev_decs[nn, j]
# Check overlap for mean shift
need_shift = True
if option != 0:
if (prev_min[j] <= this_max[i] and
prev_max[j] >= this_max[i] and
m_prev[j] <= this_max[i]):
need_shift = False
if (prev_max[j] >= this_min[i] and
prev_min[j] <= this_min[i] and
m_prev[j] >= this_min[i]):
need_shift = False
if need_shift:
new_decs[nn, i] += m_this[i] - m_prev[j]
new_decs[nn, i] = np.clip(new_decs[nn, i], 0, 1)
return new_decs
def _gen_ga(pop_decs, mu, mum):
"""Standard GA generation: SBX crossover + polynomial mutation."""
n_pop, D = pop_decs.shape
perm = np.random.permutation(n_pop)
n_pairs = n_pop // 2
off = np.zeros((n_pairs * 2, D))
for i in range(n_pairs):
p1 = perm[i]
p2 = perm[i + n_pop // 2]
c1, c2 = crossover(pop_decs[p1], pop_decs[p2], mu=mu)
off[2 * i] = np.clip(mutation(c1, mu=mum), 0, 1)
off[2 * i + 1] = np.clip(mutation(c2, mu=mum), 0, 1)
return off
def _gen_de(pop_decs, F, CR):
"""Standard DE/rand/1/bin with random boundary repair."""
n_pop, D = pop_decs.shape
off = np.zeros_like(pop_decs)
for i in range(n_pop):
indices = np.arange(n_pop)
indices = indices[indices != i]
a, b, c = np.random.choice(indices, 3, replace=False)
v = pop_decs[a] + F * (pop_decs[b] - pop_decs[c])
# Binomial crossover
u = pop_decs[i].copy()
j_rand = np.random.randint(D)
mask = np.random.rand(D) < CR
mask[j_rand] = True
u[mask] = v[mask]
# Random boundary repair
rand_dec = np.random.rand(D)
u[u > 1] = rand_dec[u > 1]
u[u < 0] = rand_dec[u < 0]
off[i] = u
return off
def _gen_ga1(pop_decs, temp_decs, mu, mum):
"""Cross-population GA: SBX between temp and original, then mutation."""
n_pop, D = pop_decs.shape
perm = np.random.permutation(n_pop)
off = np.zeros((n_pop, D))
for i in range(n_pop):
p1 = perm[i]
c1, _ = crossover(temp_decs[i], pop_decs[p1], mu=mu)
off[i] = np.clip(mutation(c1, mu=mum), 0, 1)
return off
def _gen_de1(pop_decs, temp_decs, F, CR):
"""Cross-population DE: mutation from original, crossover with temp."""
n_pop, D = pop_decs.shape
off = np.zeros_like(pop_decs)
for i in range(n_pop):
indices = np.arange(n_pop)
indices = indices[indices != i]
a, b, c = np.random.choice(indices, 3, replace=False)
v = pop_decs[a] + F * (pop_decs[b] - pop_decs[c])
# Binomial crossover with temp (not original)
u = temp_decs[i].copy()
j_rand = np.random.randint(D)
mask = np.random.rand(D) < CR
mask[j_rand] = True
u[mask] = v[mask]
rand_dec = np.random.rand(D)
u[u > 1] = rand_dec[u > 1]
u[u < 0] = rand_dec[u < 0]
off[i] = u
return off