"""
Evolutionary Multi-Task Optimization with Adaptive Intensity (EMTO-AI)
This module implements EMTO-AI for multi-task optimization with adaptive knowledge
transfer intensity based on cross-task competitiveness evaluation.
References
----------
[1] Zhou, Xinyu, et al. "Evolutionary Multi-Task Optimization With Adaptive
Intensity of Knowledge Transfer." IEEE Transactions on Emerging Topics
in Computational Intelligence, 1-13, 2024.
Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class EMTO_AI:
"""
Evolutionary Multi-Task Optimization with Adaptive Intensity of Knowledge Transfer.
Uses a DE-based multi-factorial framework with:
- Per-task elite archives for inter-task knowledge transfer
- DE mutation with archive base vector for transfer, DE/rand/1 for intra-task
- Binomial crossover
- Adaptive transfer intensity (rmp) updated periodically by cross-evaluating
each task's subpopulation on other tasks and measuring competitiveness
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, F=0.5, CR=0.6,
rate=0.05, gap_gen=35, save_data=True, save_path='./Data',
name='EMTO-AI', disable_tqdm=True):
"""
Initialize EMTO-AI algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
F : float, optional
DE mutation scale factor (default: 0.5)
CR : float, optional
DE crossover rate (default: 0.6)
rate : float, optional
Archive size as fraction of per-task population (default: 0.05)
gap_gen : int, optional
Generation interval for transfer intensity update (default: 35)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'EMTO-AI')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.F = F
self.CR = CR
self.rate = rate
self.gap_gen = gap_gen
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the EMTO-AI algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n = self.n
max_nfes_per_task = par_list(self.max_nfes, nt)
max_nfes = self.max_nfes * nt
pop_size = n * nt
# Initialize population and evaluate
decs = initialization(problem, n)
objs, cons = evaluation(problem, decs)
nfes = n * nt
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Transform to unified space
pop_decs, pop_cons = space_transfer(problem=problem, decs=decs, cons=cons,
type='uni', padding='mid')
pop_objs = objs
pop_sfs = [np.full((n, 1), fill_value=i) for i in range(nt)]
# Per-individual flag: 1 = transfer offspring that survived, 0 = not
pop_is_off = [np.zeros(n, dtype=int) for _ in range(nt)]
# Initialize archive: top rate% individuals per task
arc_len = max(1, int(np.ceil(n * self.rate)))
arc_decs = []
for t in range(nt):
sel = selection_elit(objs=pop_objs[t], n=arc_len, cons=pop_cons[t])
arc_decs.append(pop_decs[t][sel].copy())
# Initial rmp = 0 for all tasks (no cross-evaluation data yet, matching
# MATLAB behavior where getNums returns 0 when MFObj for other tasks is inf)
rmp = np.zeros(nt)
pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}",
disable=self.disable_tqdm)
gen = 1
while nfes < max_nfes:
# Merge populations from all tasks
m_decs, m_objs, m_cons, m_sfs = vstack_groups(
pop_decs, pop_objs, pop_cons, pop_sfs)
# --- Generation ---
off_decs = np.zeros_like(m_decs)
off_objs = np.full_like(m_objs, np.inf)
off_cons = np.zeros_like(m_cons)
off_sfs = np.zeros_like(m_sfs)
off_is_transfer = np.zeros(pop_size, dtype=int)
maxD = m_decs.shape[1]
for i in range(pop_size):
sf_i = m_sfs[i].item()
if np.random.rand() < rmp[sf_i]:
# --- Knowledge transfer: archive-base DE mutation ---
# Pick a random other task
other_tasks = [t for t in range(nt) if t != sf_i]
o = other_tasks[np.random.randint(len(other_tasks))]
# x1 from other task's archive
x1 = arc_decs[o][np.random.randint(len(arc_decs[o]))]
# x2, x3 from same task population
same_idx = np.where(m_sfs.flatten() == sf_i)[0]
same_idx = same_idx[same_idx != i]
if len(same_idx) < 2:
cands = np.arange(pop_size)
cands = cands[cands != i]
chosen = np.random.choice(cands, 2, replace=False)
else:
chosen = np.random.choice(same_idx, 2, replace=False)
# DE mutation: archive_base + F*(x2 - x3)
mutant = x1 + self.F * (m_decs[chosen[0]] - m_decs[chosen[1]])
trial = _de_binomial_crossover(mutant, m_decs[i], self.CR, maxD)
off_decs[i] = trial
off_sfs[i] = np.random.randint(nt) # random task assignment
off_is_transfer[i] = 1
else:
# --- Intra-task: DE/rand/1/bin ---
same_idx = np.where(m_sfs.flatten() == sf_i)[0]
same_idx = same_idx[same_idx != i]
if len(same_idx) < 3:
cands = np.arange(pop_size)
cands = cands[cands != i]
chosen = np.random.choice(cands, 3, replace=False)
else:
chosen = np.random.choice(same_idx, 3, replace=False)
mutant = m_decs[chosen[0]] + self.F * (m_decs[chosen[1]] - m_decs[chosen[2]])
trial = _de_binomial_crossover(mutant, m_decs[i], self.CR, maxD)
off_decs[i] = trial
off_sfs[i] = sf_i
off_is_transfer[i] = 0
# Out-of-bounds handling: replace with random in [0,1]
rand_dec = np.random.rand(maxD)
oob = (off_decs[i] > 1) | (off_decs[i] < 0)
off_decs[i][oob] = rand_dec[oob]
# --- Evaluation ---
for idx in range(pop_size):
t = off_sfs[idx].item()
off_objs[idx], off_cons[idx] = evaluation_single(
problem, off_decs[idx, :dims[t]], t)
nfes += pop_size
pbar.update(pop_size)
# --- Selection: merge parents + offspring, keep best n per task ---
merged_decs = np.vstack([m_decs, off_decs])
merged_objs = np.vstack([m_objs, off_objs])
merged_cons = np.vstack([m_cons, off_cons])
merged_sfs = np.vstack([m_sfs, off_sfs])
merged_is_off = np.concatenate([
np.zeros(pop_size, dtype=int), off_is_transfer])
pop_decs, pop_objs, pop_cons, pop_sfs = [], [], [], []
pop_is_off = []
for t in range(nt):
indices = np.where(merged_sfs.flatten() == t)[0]
t_decs, t_objs, t_cons = select_by_index(
indices, merged_decs, merged_objs, merged_cons)
t_is_off = merged_is_off[indices]
sel = selection_elit(objs=t_objs, n=n, cons=t_cons)
pop_decs.append(t_decs[sel])
pop_objs.append(t_objs[sel])
pop_cons.append(t_cons[sel])
pop_sfs.append(np.full((n, 1), t))
pop_is_off.append(t_is_off[sel])
# --- Update archive ---
for t in range(nt):
transfer_mask = pop_is_off[t] == 1
tsf_decs = pop_decs[t][transfer_mask]
if len(tsf_decs) >= arc_len:
arc_decs[t] = tsf_decs[:arc_len].copy()
elif len(tsf_decs) == 0:
sel = selection_elit(objs=pop_objs[t], n=arc_len, cons=pop_cons[t])
arc_decs[t] = pop_decs[t][sel].copy()
else:
# Remove oldest, add new transfer survivors
n_remove = min(len(tsf_decs), len(arc_decs[t]))
arc_decs[t] = np.vstack([
arc_decs[t][n_remove:], tsf_decs])[:arc_len]
# Reset is_offspring flags
pop_is_off = [np.zeros(n, dtype=int) for _ in range(nt)]
# --- Update transfer intensity every gap_gen generations ---
if gen % self.gap_gen == 0 and nfes + n * nt * (nt - 1) <= max_nfes:
# Cross-evaluate: evaluate each task's subpop on every other task
# cross_objs[t][o] = task t's subpop objectives when evaluated on task o
cross_objs = [[None] * nt for _ in range(nt)]
for t in range(nt):
cross_objs[t][t] = pop_objs[t]
for o in range(nt):
if o == t:
continue
t_decs_real = pop_decs[t][:, :dims[o]]
c_objs, _ = evaluation_single(problem, t_decs_real, o)
cross_objs[t][o] = c_objs
nfes += n
pbar.update(n)
# Compute rmp: for each target task o, measure how competitive
# source tasks' individuals are on task o
rmp = _compute_rmp(cross_objs, pop_objs, nt)
rmp = np.clip(rmp, 0.0, 1.0)
# Record history (transform back to real space)
real_decs, real_cons = space_transfer(
problem, decs=pop_decs, cons=pop_cons, type='real')
append_history(all_decs, real_decs, all_objs, pop_objs, all_cons, real_cons)
gen += 1
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=max_nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
# ============================================================
# Helper functions
# ============================================================
def _de_binomial_crossover(mutant, target, CR, d):
"""
DE binomial crossover between mutant and target vectors.
Parameters
----------
mutant : np.ndarray
Mutant vector, shape (d,)
target : np.ndarray
Target (parent) vector, shape (d,)
CR : float
Crossover rate in [0, 1]
d : int
Dimensionality
Returns
-------
trial : np.ndarray
Trial vector, shape (d,)
"""
trial = target.copy()
j_rand = np.random.randint(d)
mask = np.random.rand(d) < CR
mask[j_rand] = True
trial[mask] = mutant[mask]
return trial
def _get_nums(t_objs_on_o, o_objs_on_o):
"""
Count how many individuals from source task are competitive on target task.
Implements the getNums function from MATLAB: for each individual i in the
source subpopulation, compare against all individuals in the target
subpopulation. If the net wins (wins - losses) >= 0, the individual is
considered competitive.
Parameters
----------
t_objs_on_o : np.ndarray
Source task's objectives evaluated on target task, shape (n,)
o_objs_on_o : np.ndarray
Target task's native objectives, shape (n,)
Returns
-------
better_num : int
Number of competitive individuals from source task
"""
n_t = len(t_objs_on_o)
n_o = len(o_objs_on_o)
better_num = 0
for i in range(n_t):
wins = np.sum(t_objs_on_o[i] < o_objs_on_o)
losses = np.sum(t_objs_on_o[i] > o_objs_on_o)
if wins - losses >= 0:
better_num += 1
return better_num
def _compute_rmp(cross_objs, pop_objs, nt):
"""
Compute rmp from cross-task evaluation results.
For each source task t and target task o (o != t):
Evaluate how many of task t's individuals are competitive on task o.
rmp[o] = better_num / len(subpop_t)
For K > 2 tasks, rmp[o] is averaged over all source tasks t != o.
This matches the MATLAB logic where rmp[o] controls transfer intensity
for task o: higher means more knowledge transfer for that task.
Parameters
----------
cross_objs : list of list of np.ndarray
cross_objs[t][o] = task t's subpop evaluated on task o, shape (n, 1)
pop_objs : list of np.ndarray
Native objective values per task, shape (n, 1)
nt : int
Number of tasks
Returns
-------
rmp : np.ndarray
Updated transfer intensity per task, shape (nt,)
"""
rmp = np.zeros(nt)
for o in range(nt):
total_ratio = 0.0
count = 0
for t in range(nt):
if t == o:
continue
t_on_o = cross_objs[t][o][:, 0]
o_native = pop_objs[o][:, 0]
better_num = _get_nums(t_on_o, o_native)
total_ratio += better_num / len(t_on_o)
count += 1
if count > 0:
rmp[o] = total_ratio / count
return rmp