"""
Multi-task Evolutionary Algorithm Based on Decomposition with Transfer of Search Directions (MTEA-D-TSD)
This module implements MTEA-D-TSD for multi-task multi-objective optimization problems.
References
----------
[1] Y. Li, W. Gong, and Q. Gu, "Transfer Search Directions Among Decomposed Subtasks for Evolutionary Multitasking in Multiobjective Optimization," in Proceedings of the Genetic and Evolutionary Computation Conference (GECCO '24), 2024, pp. 557-565.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.spatial.distance import pdist, squareform
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class MTEA_D_TSD:
"""
Multi-task Evolutionary Algorithm Based on Decomposition with Transfer of Search Directions.
This algorithm features:
- MOEA/D framework with Tchebycheff decomposition
- Search direction (SD) tracking for each individual
- Cross-task transfer of search directions based on cosine similarity
- Adaptive per-individual transfer rate
- DE/rand/1 mutation with polynomial mutation
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None,
TR0=0.2, CF=0.4, SNum=10, Delta=0.9, NR=2,
F=0.5, CR=0.9, MuM=15,
save_data=True, save_path='./Data',
name='MTEA-D-TSD', disable_tqdm=True):
"""
Initialize MTEA-D-TSD algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int or List[int], optional
Population size per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 10000)
TR0 : float, optional
Initial transfer rate (default: 0.2)
CF : float, optional
Cumulative factor for search direction update (default: 0.4)
SNum : int, optional
Number of random samples for source selection (default: 10)
Delta : float, optional
Probability of choosing parents from local neighborhood (default: 0.9)
NR : int, optional
Maximum number of solutions replaced per offspring (default: 2)
F : float, optional
DE mutation factor (default: 0.5)
CR : float, optional
DE crossover rate (default: 0.9)
MuM : float, optional
PM mutation distribution index (default: 15)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'MTEA-D-TSD')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.TR0 = TR0
self.CF = CF
self.SNum = SNum
self.Delta = Delta
self.NR = NR
self.F = F
self.CR = CR
self.MuM = MuM
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MTEA-D-TSD algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, constraints, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
no = problem.n_objs
dims = problem.dims
d_max = max(dims)
n_per_task = par_list(self.n, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate weight vectors and neighborhoods
W = []
Nt = []
DT = []
B = []
for t in range(nt):
w_t, n_t = uniform_point(n_per_task[t], no[t])
W.append(w_t)
Nt.append(n_t)
n_per_task[t] = n_t
dt = int(np.ceil(n_t / 10))
DT.append(dt)
distances = squareform(pdist(w_t))
neighbors = np.argsort(distances, axis=1)[:, :dt]
B.append(neighbors)
# Initialize populations in unified d_max space and evaluate
decs = []
objs = []
cons = []
for t in range(nt):
decs_t = np.random.rand(Nt[t], d_max)
objs_t, cons_t = evaluation_single(problem, decs_t[:, :dims[t]], t)
decs.append(decs_t)
objs.append(objs_t)
cons.append(cons_t)
nfes_per_task = Nt.copy()
# Native-space history
all_decs = [[d[:, :dims[t]].copy()] for t, d in enumerate(decs)]
all_objs = [[o.copy()] for o in objs]
all_cons = [[c.copy()] for c in cons]
# Ideal points
Z = [np.min(objs[t], axis=0) for t in range(nt)]
# Search directions (d_max) and transfer rates per individual
SD = [np.zeros((Nt[t], d_max)) for t in range(nt)]
TR = [np.full(Nt[t], self.TR0) for t in range(nt)]
# Transfer flag and SD neighborhoods
trans_flag = False
SD_B = None
RD_B = None
total_nfes = sum(max_nfes_per_task)
pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < total_nfes:
# Start transfer after 10% budget consumed
if not trans_flag and sum(nfes_per_task) > 0.1 * total_nfes and self.TR0 > 0:
SD_B = []
RD_B = []
for t in range(nt):
sd_b_t = []
rd_b_t = []
for i in range(Nt[t]):
sd_b_i = np.zeros((DT[t], 2), dtype=int)
rd_b_i = np.ones(DT[t])
for j in range(DT[t]):
k, jj = self._source_select(
SD[t][i], SD, nt, Nt)
sd_b_i[j] = [k, jj]
sd_b_t.append(sd_b_i)
rd_b_t.append(rd_b_i)
SD_B.append(sd_b_t)
RD_B.append(rd_b_t)
trans_flag = True
# Snapshot old population
old_decs = [d.copy() for d in decs]
for t in np.random.permutation(nt):
if nfes_per_task[t] >= max_nfes_per_task[t]:
continue
for i in np.random.permutation(Nt[t]):
if nfes_per_task[t] >= max_nfes_per_task[t]:
break
# Select neighborhood
PL = B[t][i][np.random.permutation(DT[t])]
PG = np.random.permutation(Nt[t])
P = PL if np.random.rand() < self.Delta else PG
flag = False
if trans_flag and np.random.rand() < TR[t][i]:
flag = True
# Search-direction transfer
sd_idx = np.random.randint(DT[t])
k, j = SD_B[t][i][sd_idx]
# Scale source SD to match target SD magnitude
src_sd = SD[k][j]
src_norm = np.linalg.norm(src_sd)
tgt_norm = np.linalg.norm(SD[t][i])
if src_norm > 1e-20:
sd = src_sd / src_norm * tgt_norm
else:
sd = SD[t][i].copy()
off_dec = decs[t][i] + 2 * np.random.rand() * sd
off_dec = np.clip(off_dec, 0, 1)
else:
# DE/rand/1 + DE crossover + PM mutation
if np.random.rand() < 0.5:
x1, x2, x3 = decs[t][i], decs[t][P[0]], decs[t][P[1]]
else:
x1, x2, x3 = decs[t][i], decs[t][P[0]], old_decs[t][P[1]]
# DE mutation
off_dec = x1 + self.F * (x2 - x3)
# DE crossover (binomial) in d_max space
j_rand = np.random.randint(d_max)
mask = np.random.rand(d_max) < self.CR
mask[j_rand] = True
off_dec = np.where(mask, off_dec, decs[t][i])
# PM mutation
off_dec = mutation(off_dec, mu=self.MuM)
off_dec = np.clip(off_dec, 0, 1)
# Evaluate (trim to task dim)
off_obj, off_con = evaluation_single(
problem, off_dec[:dims[t]].reshape(1, -1), t)
nfes_per_task[t] += 1
pbar.update(1)
# Update ideal point
Z[t] = np.minimum(Z[t], off_obj[0])
# Tchebycheff selection
g_old = np.max(np.abs(objs[t][P] - Z[t]) * W[t][P], axis=1)
g_new_val = np.max(np.abs(off_obj[0] - Z[t]) * W[t][P], axis=1)
# Constraint violation
CV_off = np.sum(np.maximum(0, off_con[0]))
CV_pop = np.sum(np.maximum(0, cons[t][P]), axis=1)
# Replace where better: (same CV & better Tchebycheff) or less CV
replace_mask = ((g_old >= g_new_val) & (CV_pop == CV_off)) | (CV_pop > CV_off)
replace_idx = P[replace_mask][:self.NR]
for r_idx in replace_idx:
decs[t][r_idx] = off_dec
objs[t][r_idx] = off_obj[0]
cons[t][r_idx] = off_con[0]
# Update TR and SD neighborhoods
if flag and len(replace_idx) > 0:
TR[t][i] = min(0.5, TR[t][i] * 1.1)
RD_B[t][i][sd_idx] += 1
elif flag and len(replace_idx) == 0:
TR[t][i] = TR[t][i] * 0.9 + self.TR0 / 2 * 0.1
RD_B[t][i][sd_idx] -= 1
if RD_B[t][i][sd_idx] <= 0:
k_new, jj_new = self._source_select(
SD[t][i], SD, nt, Nt)
SD_B[t][i][sd_idx] = [k_new, jj_new]
RD_B[t][i][sd_idx] = 1
# Append native-space history
append_history(all_decs[t], decs[t][:, :dims[t]],
all_objs[t], objs[t],
all_cons[t], cons[t])
# Update search directions: SD = CF*SD + (1-CF)*variation
for t in range(nt):
for i in range(Nt[t]):
variation = decs[t][i] - old_decs[t][i]
if not np.allclose(variation, 0):
SD[t][i] = self.CF * SD[t][i] + (1 - self.CF) * variation
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def _source_select(self, self_sd, SD, nt, Nt):
"""
Select source search-direction neighbor based on cosine similarity.
Randomly samples SNum candidates from all tasks and selects the one
with the highest cosine similarity to self_sd.
Parameters
----------
self_sd : np.ndarray
Search direction of the target individual
SD : list of np.ndarray
Search directions for all tasks
nt : int
Number of tasks
Nt : list of int
Population sizes per task
Returns
-------
k : int
Selected source task index
jj : int
Selected source individual index
"""
best_sim = -2.0
best_k, best_j = 0, 0
self_norm = np.linalg.norm(self_sd)
for _ in range(self.SNum):
k = np.random.randint(nt)
j = np.random.randint(Nt[k])
src_sd = SD[k][j]
src_norm = np.linalg.norm(src_sd)
if self_norm > 1e-20 and src_norm > 1e-20:
sim = np.dot(self_sd, src_sd) / (self_norm * src_norm)
else:
sim = -1.0
# Exclude exact match (cosine_sim == 1 -> set to -1)
if abs(sim - 1.0) < 1e-10:
sim = -1.0
if sim > best_sim:
best_sim = sim
best_k, best_j = k, j
return best_k, best_j