"""
Evolutionary Multi-task with Effective Transfer (EMT-ET)
This module implements EMT-ET for multi-task multi-objective optimization problems.
References
----------
[1] Lin, Jiabin, Hai-Lin Liu, Kay Chen Tan, and Fangqing Gu. "An Effective Knowledge Transfer Approach for Multiobjective Multitasking Optimization." IEEE Transactions on Cybernetics 51.6 (2021): 3238-3248.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.16
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class EMT_ET:
"""
Evolutionary Multi-task with Effective Transfer.
This algorithm features:
- Adaptive knowledge transfer based on successful transferred solutions
- Transfer solutions selected from Pareto front of source tasks
- Distribution-based perturbation for transferred solutions
- NSGA-II based environmental selection
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, G=8, P=0.5, muc=20.0, mum=15.0,
save_data=True, save_path='./Data', name='EMT-ET', disable_tqdm=True):
"""
Initialize EMT-ET algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int or List[int], optional
Population size per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 10000)
G : int, optional
Number of transfer solutions per generation (default: 8)
P : float, optional
Probability of distribution-based perturbation (default: 0.5)
muc : float, optional
Distribution index for simulated binary crossover (SBX) (default: 20.0)
mum : float, optional
Distribution index for polynomial mutation (PM) (default: 15.0)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'EMTET_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.G = G
self.P = P
self.muc = muc
self.mum = mum
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the EMT-ET algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, constraints, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
n_per_task = par_list(self.n, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Initialize population and evaluate for each task
decs = initialization(problem, n_per_task)
objs, cons = evaluation(problem, decs)
nfes_per_task = n_per_task.copy()
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Initialize additional attributes for transfer tracking
# is_trans[t]: boolean array indicating if solution came from transfer
# origin_task[t]: source task index for transferred solutions
# front_no[t]: front number from NSGA-II sorting
is_trans = []
origin_task = []
front_no = []
rank = []
for t in range(nt):
is_trans.append(np.zeros(n_per_task[t], dtype=bool))
origin_task.append(np.full(n_per_task[t], -1, dtype=int))
# Initial NSGA-II sorting
rank_t, front_no_t, _ = self._nsga2_sort(objs[t], cons[t])
front_no.append(front_no_t)
rank.append(rank_t)
# Sort population by rank
sorted_indices = np.argsort(rank_t)
decs[t] = decs[t][sorted_indices]
objs[t] = objs[t][sorted_indices]
cons[t] = cons[t][sorted_indices] if cons[t] is not None else None
is_trans[t] = is_trans[t][sorted_indices]
origin_task[t] = origin_task[t][sorted_indices]
front_no[t] = front_no[t][sorted_indices]
# Progress bar
total_nfes = sum(max_nfes_per_task)
pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
# Main optimization loop
while sum(nfes_per_task) < total_nfes:
active_tasks = [t for t in range(nt) if nfes_per_task[t] < max_nfes_per_task[t]]
if not active_tasks:
break
for t in active_tasks:
# === Step 1: Transfer ===
transfer_decs, transfer_is_trans, transfer_origin = self._transfer(
decs, objs, is_trans, origin_task, front_no, t, n_per_task
)
# Align dimensions for transferred solutions
target_dim = problem.dims[t]
transfer_decs_aligned = np.zeros((len(transfer_decs), target_dim))
for i in range(len(transfer_decs)):
transfer_decs_aligned[i] = align_dimensions(transfer_decs[i], target_dim)
# Evaluate transferred solutions
transfer_objs, transfer_cons = evaluation_single(problem, transfer_decs_aligned, t)
nfes_per_task[t] += len(transfer_decs_aligned)
pbar.update(len(transfer_decs_aligned))
# Reset is_trans for current population (they are no longer "newly transferred")
is_trans[t] = np.zeros(len(decs[t]), dtype=bool)
# Merge current population with transferred solutions
decs[t] = np.vstack([decs[t], transfer_decs_aligned])
objs[t] = np.vstack([objs[t], transfer_objs])
cons[t] = np.vstack([cons[t], transfer_cons]) if cons[t] is not None else transfer_cons
is_trans[t] = np.concatenate([is_trans[t], transfer_is_trans])
origin_task[t] = np.concatenate([origin_task[t], transfer_origin])
# Update front numbers for merged population
rank_t, front_no_t, _ = self._nsga2_sort(objs[t], cons[t])
front_no[t] = front_no_t
# === Step 2: Generation ===
# Create mating pool: first N individuals + G transferred (with rank 1 for transferred)
current_pop_size = n_per_task[t]
n_offspring = n_per_task[t] - self.G
# Tournament selection for mating
# Use indices [0, N-1] for original population, with transferred solutions having advantage
mating_pool_size = n_offspring
mating_ranks = np.concatenate([
np.arange(1, current_pop_size + 1),
np.ones(self.G)
])
mating_pool = tournament_selection(2, mating_pool_size, mating_ranks)
# Generate offspring
parent_decs = decs[t][mating_pool]
off_decs = ga_generation(parent_decs, muc=self.muc, mum=self.mum)
# Evaluate offspring
off_objs, off_cons = evaluation_single(problem, off_decs, t)
nfes_per_task[t] += len(off_decs)
pbar.update(len(off_decs))
# === Step 3: Selection ===
# Merge all: current (with transferred) + offspring
decs[t] = np.vstack([decs[t], off_decs])
objs[t] = np.vstack([objs[t], off_objs])
cons[t] = np.vstack([cons[t], off_cons]) if cons[t] is not None else off_cons
# Extend is_trans and origin_task for offspring (not transferred)
is_trans[t] = np.concatenate([is_trans[t], np.zeros(len(off_decs), dtype=bool)])
origin_task[t] = np.concatenate([origin_task[t], np.full(len(off_decs), -1, dtype=int)])
# NSGA-II sorting and selection
rank_t, front_no_t, _ = self._nsga2_sort(objs[t], cons[t])
# Select top N individuals
sorted_indices = np.argsort(rank_t)[:n_per_task[t]]
decs[t] = decs[t][sorted_indices]
objs[t] = objs[t][sorted_indices]
cons[t] = cons[t][sorted_indices] if cons[t] is not None else None
is_trans[t] = is_trans[t][sorted_indices]
origin_task[t] = origin_task[t][sorted_indices]
front_no[t] = front_no_t[sorted_indices]
rank[t] = rank_t[sorted_indices]
# Append to history
append_history(all_decs[t], decs[t], all_objs[t], objs[t], all_cons[t], cons[t])
pbar.close()
runtime = time.time() - start_time
# Build and save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def _transfer(self, decs, objs, is_trans, origin_task, front_no, t, n_per_task):
"""
Perform knowledge transfer to task t.
The transfer strategy:
1. If there are successful transferred solutions (is_trans=True and FrontNo<2),
find nearest neighbors from their origin tasks
2. Otherwise, randomly select solutions from other tasks
Parameters
----------
decs : list of np.ndarray
Decision variables for all tasks
objs : list of np.ndarray
Objective values for all tasks
is_trans : list of np.ndarray
Transfer flags for all tasks
origin_task : list of np.ndarray
Origin task indices for all tasks
front_no : list of np.ndarray
Front numbers for all tasks
t : int
Target task index
n_per_task : list of int
Population sizes for all tasks
Returns
-------
transfer_decs : np.ndarray
Decision variables of transfer solutions
transfer_is_trans : np.ndarray
Transfer flags (all True)
transfer_origin : np.ndarray
Origin task indices
"""
nt = len(decs)
# Find successful transferred solutions (is_trans=True and in first front)
successful_indices = np.where((is_trans[t] == True) & (front_no[t] < 2))[0]
transfer_decs_list = []
transfer_origin_list = []
if len(successful_indices) > 0:
# Strategy 1: Find nearest neighbors from origin tasks
G_temp = int(np.ceil(self.G / len(successful_indices)))
for s_idx in successful_indices:
ot = origin_task[t][s_idx] # Origin task
if ot < 0 or ot >= nt:
continue
# Calculate distances to all solutions in origin task
# Need to align dimensions first
s_dec = decs[t][s_idx]
ot_decs = decs[ot]
# Align dimensions for distance calculation
min_dim = min(len(s_dec), ot_decs.shape[1])
s_dec_aligned = s_dec[:min_dim]
ot_decs_aligned = ot_decs[:, :min_dim]
distances = np.sqrt(np.sum((ot_decs_aligned - s_dec_aligned) ** 2, axis=1))
nearest_indices = np.argsort(distances)
# Select G_temp nearest neighbors
for j in range(min(G_temp, len(nearest_indices))):
if len(transfer_decs_list) >= self.G:
break
idx = nearest_indices[j]
transfer_decs_list.append(decs[ot][idx].copy())
transfer_origin_list.append(ot)
if len(transfer_decs_list) >= self.G:
break
# If not enough, use random selection (Strategy 2)
if len(transfer_decs_list) < self.G:
task_pool = [k for k in range(nt) if k != t]
while len(transfer_decs_list) < self.G:
ot = task_pool[np.random.randint(len(task_pool))]
idx = np.random.randint(n_per_task[ot])
transfer_decs_list.append(decs[ot][idx].copy())
transfer_origin_list.append(ot)
# Trim to exactly G solutions
transfer_decs_list = transfer_decs_list[:self.G]
transfer_origin_list = transfer_origin_list[:self.G]
# Apply distribution-based perturbation
for i in range(len(transfer_decs_list)):
if np.random.rand() < self.P:
# Perturb: dec = 2 * rand() * dec, then clip to [0, 1]
transfer_decs_list[i] = 2 * np.random.rand() * transfer_decs_list[i]
transfer_decs_list[i] = np.clip(transfer_decs_list[i], 0, 1)
# Convert to arrays
# Note: transfer_decs may have different dimensions, handle in caller
transfer_is_trans = np.ones(len(transfer_decs_list), dtype=bool)
transfer_origin = np.array(transfer_origin_list, dtype=int)
return transfer_decs_list, transfer_is_trans, transfer_origin
def _nsga2_sort(self, objs, cons=None):
"""
Sort solutions based on NSGA-II criteria.
Parameters
----------
objs : np.ndarray
Objective values of shape (pop_size, n_obj)
cons : np.ndarray, optional
Constraint values of shape (pop_size, n_con)
Returns
-------
rank : np.ndarray
Ranking of each solution
front_no : np.ndarray
Front number of each solution
crowd_dis : np.ndarray
Crowding distance of each solution
"""
pop_size = objs.shape[0]
# Non-dominated sorting
if cons is not None and cons.size > 0:
front_no, _ = nd_sort(objs, cons, pop_size)
else:
front_no, _ = nd_sort(objs, pop_size)
# Crowding distance
crowd_dis = crowding_distance(objs, front_no)
# Sort by front number (ascending), then by crowding distance (descending)
sorted_indices = np.lexsort((-crowd_dis, front_no))
# Create rank array
rank = np.empty(pop_size, dtype=int)
rank[sorted_indices] = np.arange(pop_size)
return rank, front_no, crowd_dis