"""
Evolutionary Multitasking via Explicit Autoencoding (EMEA)
This module implements EMEA for multi-task optimization with knowledge transfer via autoencoding.
References
----------
[1] Feng, Liang, et al. "Evolutionary multitasking via explicit autoencoding." IEEE transactions on cybernetics 49.9 (2018): 3457-3470.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.10.25
Version: 1.0
"""
from tqdm import tqdm
import time
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class EMEA:
"""
Evolutionary Multitasking via Explicit Autoencoding for multi-task optimization.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, SNum=10, TGap=10, muc=2, mum=5, F=0.5, CR=0.6, save_data=True,
save_path='./Data', name='EMEA', disable_tqdm=True):
"""
Initialize EMEA algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int or List[int], optional
Population size per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 10000)
SNum : int, optional
Number of transferred solutions (default: 10)
TGap : int, optional
Transfer interval in generations (default: 10)
muc : float, optional
Distribution index for simulated binary crossover (SBX) (default: 2.0)
mum : float, optional
Distribution index for polynomial mutation (PM) (default: 5.0)
F : float, optional
Scaling factor for DE mutation (default: 0.5)
CR : float, optional
Crossover rate for DE (default: 0.6)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'EMEA_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.SNum = SNum
self.TGap = TGap
self.muc = muc
self.mum = mum
self.F = F
self.CR = CR
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the EMEA algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_per_task = par_list(self.n, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Initialize population and evaluate for each task
decs = initialization(problem, n_per_task)
objs, cons = evaluation(problem, decs)
nfes_per_task = n_per_task.copy()
# Store initial populations for domain adaptation
initial_decs = [d.copy() for d in decs]
gen = 1
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Skip tasks that have exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
# Generate offspring: alternate between GA and DE operators
if i % 2 == 0:
off_decs = ga_generation(decs[i], self.muc, self.mum)
else:
off_decs = de_generation(decs[i], self.F, self.CR)
# Knowledge transfer via mDA at specified intervals
if self.SNum > 0 and gen % self.TGap == 0:
inject_num = int(round(self.SNum / (nt - 1)))
inject_decs = np.zeros((0, dims[i]))
# Collect best solutions from other tasks
for k in active_tasks:
if k == i:
continue
cvs_k = np.sum(np.maximum(0, cons[k]), axis=1)
his_rank = constrained_sort(objs[k], cvs_k)
his_decs = decs[k][his_rank, :]
his_best_decs = his_decs[:inject_num, :dims[k]].squeeze()
# Transform via marginalized denoising autoencoder
inject_decs_k = mDA(initial_decs[i], initial_decs[k], his_best_decs)
inject_decs = np.vstack([inject_decs, inject_decs_k])
# Replace random offspring with transferred solutions
replace_idx = np.random.choice(off_decs.shape[0], size=inject_decs.shape[0], replace=False)
off_decs[replace_idx, :dims[i]] = inject_decs
offobjs, offcons = evaluation_single(problem, off_decs, i)
# Merge parent and offspring populations
objs[i], decs[i], cons[i] = vstack_groups(
(objs[i], offobjs), (decs[i], off_decs), (cons[i], offcons)
)
# Elitist selection: keep best n individuals
index = selection_elit(objs[i], n_per_task[i], cons[i])
objs[i], decs[i], cons[i] = select_by_index(index, objs[i], decs[i], cons[i])
nfes_per_task[i] += n_per_task[i]
pbar.update(n_per_task[i])
append_history(all_decs[i], decs[i], all_objs[i], objs[i], all_cons[i], cons[i])
gen += 1
pbar.close()
runtime = time.time() - start_time
# Save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def mDA(curr_decs, his_decs, his_best_decs):
"""
Marginalized Denoising Autoencoder for cross-domain knowledge transfer.
Parameters
----------
curr_decs : np.ndarray
Current task population of shape (n, d_curr)
his_decs : np.ndarray
Historical task population of shape (n, d_his)
his_best_decs : np.ndarray
Best solution(s) from historical task of shape (m, d_his)
Returns
-------
inject_decs : np.ndarray
Transformed solution(s) mapped to current task domain of shape (m, d_curr)
Notes
-----
The mDA learns a linear transformation W that maps solutions from the historical task
domain to the current task domain using ridge regression:
W = P @ (Q + λI)^(-1)
where P = xx @ noise^T, Q = noise @ noise^T, and λ is the regularization parameter.
"""
curr_len = curr_decs.shape[1]
his_len = his_decs.shape[1]
# Align dimensions by zero-padding the shorter one
max_dim = max(curr_len, his_len)
curr_decs = align_dimensions(curr_decs, max_dim, fill='zero')
his_decs = align_dimensions(his_decs, max_dim, fill='zero')
# Transpose for matrix operations
xx = curr_decs.T
noise = his_decs.T
d, n = xx.shape
# Add bias term
xxb = np.vstack([xx, np.ones((1, n))])
noise_xb = np.vstack([noise, np.ones((1, n))])
# Compute transformation matrix using ridge regression
Q = noise_xb @ noise_xb.T
P = xxb @ noise_xb.T
lambda_reg = 1e-5
reg = lambda_reg * np.eye(d + 1)
reg[-1, -1] = 0
W = P @ np.linalg.inv(Q + reg)
# Remove bias term from transformation matrix
W = W[:-1, :-1]
# Apply transformation to historical best solutions
if curr_len <= his_len:
inject_decs = (W @ his_best_decs.T).T[:, :curr_len]
else:
his_best_decs = np.column_stack([his_best_decs, np.zeros((his_best_decs.shape[0], curr_len - his_len))])
inject_decs = (W @ his_best_decs.T).T
# Clip to valid range [0, 1]
inject_decs = np.clip(inject_decs, 0., 1.)
return inject_decs