"""
Evolutionary Multi-task with Population Distribution-based Transfer (EMT-PD)
This module implements EMT-PD for multi-task multi-objective optimization problems.
References
----------
[1] Liang, Zhengping, Weiqi Liang, Zhiqiang Wang, Xiaoliang Ma, Ling Liu, and Zexuan Zhu. "Multiobjective Evolutionary Multitasking With Two-Stage Adaptive Knowledge Transfer Based on Population Distribution." IEEE Transactions on Systems, Man, and Cybernetics: Systems (2021): 1-13.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.13
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class EMT_PD:
"""
Evolutionary Multi-task with Population Distribution-based Transfer.
This algorithm features:
- Two-stage adaptive knowledge transfer based on population distribution
- Covariance-based distribution alignment between tasks
- Multifactorial evolutionary framework with RMP
- NSGA-II based environmental selection
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, rmp=0.3, G=5, muc=20.0, mum=15.0,
save_data=True, save_path='./Data', name='EMT-PD', disable_tqdm=True):
"""
Initialize EMT-PD algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
rmp : float, optional
Random mating probability for inter-task crossover (default: 0.3)
G : int, optional
Transfer gap - perform distribution-based transfer every G generations (default: 5)
muc : float, optional
Distribution index for simulated binary crossover (SBX) (default: 20.0)
mum : float, optional
Distribution index for polynomial mutation (PM) (default: 15.0)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'EMTPD_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.rmp = rmp
self.G = G
self.muc = muc
self.mum = mum
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the EMT-PD algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, constraints, and runtime
"""
start_time = time.time()
problem = self.problem
n = self.n
nt = problem.n_tasks
dims = problem.dims
max_dim = max(dims)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Initialize population in unified space (max dimension)
# Each task's population is padded to max_dim
decs = []
for t in range(nt):
dec_t = np.random.rand(n, max_dim)
decs.append(dec_t)
# Evaluate in native space
objs = []
cons = []
for t in range(nt):
dec_native = decs[t][:, :dims[t]]
obj_t, con_t = evaluation_single(problem, dec_native, t)
objs.append(obj_t)
cons.append(con_t)
nfes_per_task = [n] * nt
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Initialize MFFactor (skill factor) for each individual
mf_factors = []
for t in range(nt):
mf_factors.append(np.full(n, t, dtype=int))
# Initial NSGA-II sorting
ranks = []
for t in range(nt):
rank_t, _, _ = self._nsga2_sort(objs[t], cons[t])
ranks.append(rank_t)
# Sort population by rank
sorted_indices = np.argsort(rank_t)
decs[t] = decs[t][sorted_indices]
objs[t] = objs[t][sorted_indices]
cons[t] = cons[t][sorted_indices] if cons[t] is not None else None
mf_factors[t] = mf_factors[t][sorted_indices]
# Progress bar
total_nfes = sum(max_nfes_per_task)
pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm)
gen = 0
# Main optimization loop
while sum(nfes_per_task) < total_nfes:
gen += 1
if gen % self.G != 0:
# === Standard Generation (MOMFEA-style) ===
# Merge all populations
all_pop_decs = np.vstack(decs)
all_mf_factors = np.concatenate(mf_factors)
# Tournament selection across all tasks
all_ranks = np.concatenate([np.arange(n) for _ in range(nt)]) # Use position as rank
mating_pool = tournament_selection(2, n * nt, all_ranks)
# Generate offspring
off_decs, off_mf_factors = self._generation(
all_pop_decs[mating_pool], all_mf_factors[mating_pool], max_dim
)
else:
# === Distribution-based Transfer ===
off_decs, off_mf_factors = self._transfer(decs, n, nt, max_dim)
# Evaluate offspring for each task
for t in range(nt):
# Get offspring belonging to task t
task_mask = off_mf_factors == t
off_decs_t = off_decs[task_mask]
if len(off_decs_t) == 0:
continue
# Evaluate in native space
off_decs_native = off_decs_t[:, :dims[t]]
off_objs_t, off_cons_t = evaluation_single(problem, off_decs_native, t)
nfes_per_task[t] += len(off_decs_t)
pbar.update(len(off_decs_t))
# Merge with current population
decs[t] = np.vstack([decs[t], off_decs_t])
objs[t] = np.vstack([objs[t], off_objs_t])
cons[t] = np.vstack([cons[t], off_cons_t]) if cons[t] is not None else off_cons_t
mf_factors[t] = np.concatenate([mf_factors[t], np.full(len(off_decs_t), t, dtype=int)])
# NSGA-II selection
rank_t, _, _ = self._nsga2_sort(objs[t], cons[t])
sorted_indices = np.argsort(rank_t)[:n]
decs[t] = decs[t][sorted_indices]
objs[t] = objs[t][sorted_indices]
cons[t] = cons[t][sorted_indices] if cons[t] is not None else None
mf_factors[t] = mf_factors[t][sorted_indices]
# Convert back to native space for history
decs_native = []
for t in range(nt):
decs_native.append(decs[t][:, :dims[t]])
append_history(all_decs, decs_native, all_objs, objs, all_cons, cons)
pbar.close()
runtime = time.time() - start_time
# Convert final decs to native space
final_decs = []
for t in range(nt):
final_decs.append(decs[t][:, :dims[t]])
# Build and save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def _generation(self, parent_decs, parent_mf_factors, max_dim):
"""
Generate offspring using MOMFEA-style assortative mating.
Parameters
----------
parent_decs : np.ndarray
Parent decision variables of shape (pop_size, max_dim)
parent_mf_factors : np.ndarray
Parent skill factors of shape (pop_size,)
max_dim : int
Maximum dimension
Returns
-------
off_decs : np.ndarray
Offspring decision variables
off_mf_factors : np.ndarray
Offspring skill factors
"""
pop_size = len(parent_decs)
off_decs = np.zeros((pop_size, max_dim))
off_mf_factors = np.zeros(pop_size, dtype=int)
# Shuffle for random pairing
shuffled_indices = np.random.permutation(pop_size)
count = 0
for i in range(0, pop_size - 1, 2):
p1_idx = shuffled_indices[i]
p2_idx = shuffled_indices[i + 1]
p1_dec = parent_decs[p1_idx]
p2_dec = parent_decs[p2_idx]
sf1 = parent_mf_factors[p1_idx]
sf2 = parent_mf_factors[p2_idx]
if sf1 == sf2 or np.random.rand() < self.rmp:
# Crossover
off_dec1, off_dec2 = crossover(p1_dec, p2_dec, mu=self.muc)
# Mutation
off_dec1 = mutation(off_dec1, mu=self.mum)
off_dec2 = mutation(off_dec2, mu=self.mum)
# Random skill factor assignment
off_sf1 = np.random.choice([sf1, sf2])
off_sf2 = sf1 if off_sf1 == sf2 else sf2
else:
# Only mutation (no crossover)
off_dec1 = mutation(p1_dec.copy(), mu=self.mum)
off_dec2 = mutation(p2_dec.copy(), mu=self.mum)
off_sf1 = sf1
off_sf2 = sf2
# Boundary handling
off_dec1 = np.clip(off_dec1, 0, 1)
off_dec2 = np.clip(off_dec2, 0, 1)
off_decs[count] = off_dec1
off_decs[count + 1] = off_dec2
off_mf_factors[count] = off_sf1
off_mf_factors[count + 1] = off_sf2
count += 2
# Handle odd population size
if pop_size % 2 == 1:
last_idx = shuffled_indices[-1]
off_decs[count] = mutation(parent_decs[last_idx].copy(), mu=self.mum)
off_decs[count] = np.clip(off_decs[count], 0, 1)
off_mf_factors[count] = parent_mf_factors[last_idx]
return off_decs, off_mf_factors
def _transfer(self, decs, n, nt, max_dim):
"""
Perform distribution-based knowledge transfer.
This method uses covariance matrices to align population distributions
between tasks and generate transferred solutions.
Parameters
----------
decs : list of np.ndarray
Decision variables for all tasks
n : int
Population size per task
nt : int
Number of tasks
max_dim : int
Maximum dimension
Returns
-------
off_decs : np.ndarray
Offspring decision variables
off_mf_factors : np.ndarray
Offspring skill factors
"""
model_size = min(n, 40)
off_decs_list = []
off_mf_factors_list = []
for t in range(nt):
# P: top model_size solutions from task t (transposed: max_dim x model_size)
P = decs[t][:model_size].T
# Select a random source task
task_pool = [k for k in range(nt) if k != t]
k = task_pool[np.random.randint(len(task_pool))]
# Q: top model_size solutions from task k
Q = decs[k][:model_size].T
# Compute covariance matrices A_t and A_k
A_t = np.zeros((max_dim, max_dim))
A_k = np.zeros((max_dim, max_dim))
for dim1 in range(max_dim):
a = P[dim1, :]
c = Q[dim1, :]
for dim2 in range(max_dim):
b = P[dim2, :]
d = Q[dim2, :]
# Covariance between dimensions
cov_ab = np.cov(a, b)
A_t[dim1, dim2] = cov_ab[0, 1] if cov_ab.shape == (2, 2) else 0
cov_cd = np.cov(c, d)
A_k[dim1, dim2] = cov_cd[0, 1] if cov_cd.shape == (2, 2) else 0
# Compute combined distribution parameters
# A = inv(inv(A_t) + inv(A_k))
# avg_n = A * (inv(A_t) * avg_P + inv(A_k) * avg_Q)
avg_P = np.mean(P, axis=1)
avg_Q = np.mean(Q, axis=1)
# Add regularization to avoid singular matrices
reg = 1e-6 * np.eye(max_dim)
A_t_reg = A_t + reg
A_k_reg = A_k + reg
try:
inv_A_t = np.linalg.inv(A_t_reg)
inv_A_k = np.linalg.inv(A_k_reg)
A = np.linalg.inv(inv_A_t + inv_A_k)
avg_n = A @ (inv_A_t @ avg_P + inv_A_k @ avg_Q)
except np.linalg.LinAlgError:
# Fallback to simple average if matrix inversion fails
avg_n = (avg_P + avg_Q) / 2
# Normalize avg_n to [0, 1]
max_n = np.max(avg_n)
min_n = np.min(avg_n)
if max_n - min_n > 1e-10:
avg_n = (avg_n - min_n) / (max_n - min_n)
else:
avg_n = np.full(max_dim, 0.5)
# Compute weight vector
w1 = avg_P - avg_n
# Generate offspring
for i in range(n):
a_idx = np.random.randint(model_size)
b_idx = np.random.randint(model_size)
# Weighted combination
off_dec = w1 * P[:, a_idx] + (1 - w1) * Q[:, b_idx]
# Mutation
off_dec = mutation(off_dec, mu=self.mum)
# Boundary handling
off_dec = np.clip(off_dec, 0, 1)
# Handle NaN values
nan_mask = np.isnan(off_dec)
if np.any(nan_mask):
off_dec[nan_mask] = np.random.rand(np.sum(nan_mask))
off_decs_list.append(off_dec)
off_mf_factors_list.append(t)
off_decs = np.array(off_decs_list)
off_mf_factors = np.array(off_mf_factors_list, dtype=int)
return off_decs, off_mf_factors
def _nsga2_sort(self, objs, cons=None):
"""
Sort solutions based on NSGA-II criteria.
Parameters
----------
objs : np.ndarray
Objective values of shape (pop_size, n_obj)
cons : np.ndarray, optional
Constraint values of shape (pop_size, n_con)
Returns
-------
rank : np.ndarray
Ranking of each solution
front_no : np.ndarray
Front number of each solution
crowd_dis : np.ndarray
Crowding distance of each solution
"""
pop_size = objs.shape[0]
# Non-dominated sorting
if cons is not None and cons.size > 0:
front_no, _ = nd_sort(objs, cons, pop_size)
else:
front_no, _ = nd_sort(objs, pop_size)
# Crowding distance
crowd_dis = crowding_distance(objs, front_no)
# Sort by front number (ascending), then by crowding distance (descending)
sorted_indices = np.lexsort((-crowd_dis, front_no))
# Create rank array
rank = np.empty(pop_size, dtype=int)
rank[sorted_indices] = np.arange(pop_size)
return rank, front_no, crowd_dis