"""
Multi-task Evolutionary Algorithm with Adaptive Knowledge Transfer via Anomaly Detection (MTEA-AD)
This module implements MTEA-AD for multi-task optimization with adaptive knowledge transfer
using anomaly detection to identify beneficial solutions from other tasks.
References
----------
[1] Wang, Chao, et al. "Solving multitask optimization problems with adaptive knowledge transfer via anomaly detection." IEEE Transactions on Evolutionary Computation 26.2 (2021): 304-318.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.12
Version: 1.0
"""
import time
from tqdm import tqdm
import numpy as np
from scipy.stats import multivariate_normal
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class MTEA_AD:
"""
Multi-task Evolutionary Algorithm with Adaptive Knowledge Transfer via Anomaly Detection.
Uses a Gaussian-based anomaly detection model to adaptively identify and transfer
beneficial solutions from other tasks during optimization.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, TRP=0.1, muc=2.0, mum=5.0,
save_data=True, save_path='./Data', name='MTEA-AD', disable_tqdm=True):
"""
Initialize MTEA-AD algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int or List[int], optional
Population size per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 10000)
TRP : float, optional
Transfer probability - probability of knowledge transfer in each generation (default: 0.1)
muc : float, optional
Distribution index for simulated binary crossover (SBX) (default: 2.0)
mum : float, optional
Distribution index for polynomial mutation (PM) (default: 5.0)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'MTEA_AD_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.TRP = TRP
self.muc = muc
self.mum = mum
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MTEA-AD algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_per_task = par_list(self.n, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Initialize population and evaluate for each task
decs = initialization(problem, n_per_task)
objs, cons = evaluation(problem, decs)
nfes_per_task = n_per_task.copy()
# Initialize epsilon (anomaly detection parameter) for each task
epsilon = np.zeros(nt)
gen = 1
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Skip tasks that have exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for t in active_tasks:
# Generate offspring through crossover and mutation
off_decs = self._generation(decs[t])
# Knowledge transfer with probability TRP
if np.random.rand() < self.TRP:
# Set NL parameter for anomaly detection
if gen == 1:
NL = 1.0
else:
NL = epsilon[t]
# Collect populations from other tasks (limit to 10 for many-task problems)
kpool = [k for k in active_tasks if k != t]
if len(kpool) > 10:
kpool = np.random.choice(kpool, size=10, replace=False).tolist()
# Gather historical population from other tasks
his_pop_dec = np.vstack([decs[k][:, :dims[t]] for k in kpool]) if kpool else np.empty((0, dims[t]))
if his_pop_dec.shape[0] > 0:
# Learn anomaly detection model and get transfer solutions
tfsol = self._learn_anomaly_detection(off_decs, his_pop_dec, NL)
# Clip transferred solutions to [0, 1]
tfsol = np.clip(tfsol, 0.0, 1.0)
if tfsol.shape[0] > 0:
# Evaluate offspring and transferred solutions
off_objs, off_cons = evaluation_single(problem, off_decs, t)
tf_objs, tf_cons = evaluation_single(problem, tfsol, t)
# Merge parent, offspring, and transferred populations
merged_objs, merged_decs, merged_cons = vstack_groups(
(objs[t], off_objs, tf_objs),
(decs[t], off_decs, tfsol),
(cons[t], off_cons, tf_cons)
)
# Elitist selection
index = selection_elit(merged_objs, n_per_task[t], merged_cons)
objs[t], decs[t], cons[t] = select_by_index(index, merged_objs, merged_decs, merged_cons)
# Calculate success rate for parameter adaptation
# Count how many transferred solutions were selected
parent_off_size = n_per_task[t] + n_per_task[t] # parent + offspring
succ_num = np.sum(index >= parent_off_size)
epsilon[t] = succ_num / tfsol.shape[0] if tfsol.shape[0] > 0 else 0
nfes_per_task[t] += n_per_task[t] + tfsol.shape[0]
pbar.update(n_per_task[t] + tfsol.shape[0])
else:
# No transfer solutions, proceed normally
off_objs, off_cons = evaluation_single(problem, off_decs, t)
objs[t], decs[t], cons[t] = vstack_groups(
(objs[t], off_objs), (decs[t], off_decs), (cons[t], off_cons)
)
index = selection_elit(objs[t], n_per_task[t], cons[t])
objs[t], decs[t], cons[t] = select_by_index(index, objs[t], decs[t], cons[t])
nfes_per_task[t] += n_per_task[t]
pbar.update(n_per_task[t])
else:
# No other tasks available for transfer
off_objs, off_cons = evaluation_single(problem, off_decs, t)
objs[t], decs[t], cons[t] = vstack_groups(
(objs[t], off_objs), (decs[t], off_decs), (cons[t], off_cons)
)
index = selection_elit(objs[t], n_per_task[t], cons[t])
objs[t], decs[t], cons[t] = select_by_index(index, objs[t], decs[t], cons[t])
nfes_per_task[t] += n_per_task[t]
pbar.update(n_per_task[t])
else:
# No knowledge transfer this generation
off_objs, off_cons = evaluation_single(problem, off_decs, t)
# Merge parent and offspring populations
objs[t], decs[t], cons[t] = vstack_groups(
(objs[t], off_objs), (decs[t], off_decs), (cons[t], off_cons)
)
# Elitist selection
index = selection_elit(objs[t], n_per_task[t], cons[t])
objs[t], decs[t], cons[t] = select_by_index(index, objs[t], decs[t], cons[t])
nfes_per_task[t] += n_per_task[t]
pbar.update(n_per_task[t])
append_history(all_decs[t], decs[t], all_objs[t], objs[t], all_cons[t], cons[t])
gen += 1
pbar.close()
runtime = time.time() - start_time
# Save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def _generation(self, population):
# Generate offspring through crossover, mutation and gene swapping.
n_pop = population.shape[0]
d = population.shape[1]
offspring = np.zeros((n_pop, d))
# Random pairing
ind_order = np.random.permutation(n_pop)
count = 0
for i in range(n_pop // 2):
p1 = ind_order[i]
p2 = ind_order[i + n_pop // 2]
# Crossover
off1, off2 = crossover(population[p1, :], population[p2, :], mu=self.muc)
# Mutation
off1 = mutation(off1, mu=self.mum)
off2 = mutation(off2, mu=self.mum)
# Gene swapping (as in MATLAB)
swap_indicator = np.random.rand(d) < 0.5
temp = off1[swap_indicator].copy()
off1[swap_indicator] = off2[swap_indicator]
off2[swap_indicator] = temp
# Clip to [0, 1]
off1 = np.clip(off1, 0.0, 1.0)
off2 = np.clip(off2, 0.0, 1.0)
offspring[count, :] = off1
offspring[count + 1, :] = off2
count += 2
return offspring
def _learn_anomaly_detection(self, curr_pop, his_pop, NL):
"""
Learn anomaly detection model to identify candidate transferred solutions.
Uses a multivariate Gaussian distribution fitted on the current population
to score solutions from historical populations. Solutions with high scores
(low anomaly) are selected for transfer.
Parameters
----------
curr_pop : np.ndarray
Current task population of shape (n, d)
his_pop : np.ndarray
Historical population from other tasks of shape (m, d)
NL : float
Anomaly detection threshold parameter in [0, 1].
Controls the proportion of solutions selected for transfer.
NL=1 selects all solutions with score >= minimum
NL=0 selects only solutions with score >= maximum
Returns
-------
tfsol : np.ndarray
Candidate transferred solutions of shape (k, d)
Notes
-----
The algorithm fits a Gaussian model on the current population (with added noise
to ensure positive definiteness) and evaluates historical solutions against this
model. Solutions that appear "normal" (high PDF value) under the current task's
distribution are selected for transfer.
"""
d = curr_pop.shape[1]
# Add random samples to ensure covariance matrix is positive definite
n_samples = max(1, int(0.01 * curr_pop.shape[0]))
rand_mat = np.random.rand(n_samples, d)
curr_pop_augmented = np.vstack([curr_pop, rand_mat])
# Fit multivariate Gaussian model
mean = np.mean(curr_pop_augmented, axis=0)
cov = np.cov(curr_pop_augmented, rowvar=False)
# Ensure covariance matrix is positive definite by adding small diagonal
cov = cov + 1e-6 * np.eye(d)
# Get unique historical solutions
his_pop_unique = np.unique(his_pop, axis=0)
# Ensure dimension compatibility
his_pop_unique = align_dimensions(his_pop_unique, d)
# Calculate anomaly scores (PDF values)
try:
scores = multivariate_normal.pdf(his_pop_unique, mean=mean, cov=cov)
except np.linalg.LinAlgError:
# If covariance is still singular, use diagonal covariance
var = np.var(curr_pop_augmented, axis=0) + 1e-6
scores = np.prod(
np.exp(-0.5 * ((his_pop_unique - mean) ** 2) / var) / np.sqrt(2 * np.pi * var),
axis=1
)
# Sort scores in descending order
sorted_indices = np.argsort(scores)[::-1]
sorted_scores = scores[sorted_indices]
# Determine threshold based on NL
if NL == 0:
threshold = sorted_scores[0] # Only select maximum
else:
idx = min(int(np.ceil(len(sorted_scores) * NL)), len(sorted_scores) - 1)
threshold = sorted_scores[idx]
# Select solutions with score >= threshold
selected_mask = scores >= threshold
tfsol = his_pop_unique[selected_mask]
return tfsol