"""
Meta-Knowledge Transfer-based Differential Evolution (MKTDE)
This module implements MKTDE for multi-task optimization using centroid-based
meta-knowledge transfer between tasks in a multi-population DE framework.
References
----------
[1] Li, Jian-Yu, et al. "A Meta-Knowledge Transfer-Based Differential
Evolution for Multitask Optimization." IEEE Transactions on
Evolutionary Computation, 26(4): 719-734, 2022.
Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 2.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class MKTDE:
"""
Meta-Knowledge Transfer-based Differential Evolution.
Uses centroid alignment to transform source task solutions into the
target task's search distribution, creating an extended donor pool for
DE/rand/1/bin. The base vector x1 is selected from the current task
only, while difference vectors x2, x3 come from the combined pool.
Additionally, an elite solution from the source task is transferred
each generation.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, F=0.5, CR=0.6,
save_data=True, save_path='./Data', name='MKTDE',
disable_tqdm=True):
"""
Initialize MKTDE algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
F : float, optional
DE mutation scale factor (default: 0.5)
CR : float, optional
DE crossover rate (default: 0.6)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'MKTDE')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.F = F
self.CR = CR
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MKTDE algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n = self.n
max_nfes_per_task = par_list(self.max_nfes, nt)
max_nfes = self.max_nfes * nt
# Initialize and evaluate in real space
decs = initialization(problem, n)
objs, cons = evaluation(problem, decs)
nfes = n * nt
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Convert to unified space for DE operations (matching MATLAB max-D space)
pop_decs, pop_cons = space_transfer(
problem=problem, decs=decs, cons=cons, type='uni', padding='mid')
pop_objs = objs
maxD = pop_decs[0].shape[1]
maxC = pop_cons[0].shape[1]
pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}",
disable=self.disable_tqdm)
while nfes < max_nfes:
# Compute centroids in unified space
centroids = [np.mean(pop_decs[t], axis=0) for t in range(nt)]
# Source task selection (random, different from t)
source_tasks = []
for t in range(nt):
s = np.random.randint(nt)
while s == t:
s = np.random.randint(nt)
source_tasks.append(s)
# --- Generation and selection per task ---
for t in range(nt):
s = source_tasks[t]
ct, cs = centroids[t], centroids[s]
# Meta-knowledge transfer: align source via centroids
spop_transformed = pop_decs[s] - cs + ct
# Combined donor pool: current task + transformed source
popf = np.vstack([pop_decs[t], spop_transformed])
n_combined = len(popf)
# DE/rand/1/bin generation
off_decs = np.zeros((n, maxD))
for i in range(n):
# x1 from current task population only
x1 = np.random.randint(n)
while x1 == i:
x1 = np.random.randint(n)
# x2, x3 from combined pool
x2 = np.random.randint(n_combined)
while x2 == i or x2 == x1:
x2 = np.random.randint(n_combined)
x3 = np.random.randint(n_combined)
while x3 == i or x3 == x1 or x3 == x2:
x3 = np.random.randint(n_combined)
# DE/rand/1 mutation
v = pop_decs[t][x1] + self.F * (popf[x2] - popf[x3])
# Binomial crossover
u = pop_decs[t][i].copy()
j_rand = np.random.randint(maxD)
mask = np.random.rand(maxD) < self.CR
mask[j_rand] = True
u[mask] = v[mask]
off_decs[i] = np.clip(u, 0, 1)
# Evaluate offspring (trim to task dimension)
off_objs_t, off_cons_real = evaluation_single(
problem, off_decs[:, :dims[t]], t)
off_cons_t = np.zeros((n, maxC))
if maxC > 0 and off_cons_real.shape[1] > 0:
off_cons_t[:, :off_cons_real.shape[1]] = off_cons_real
nfes += n
pbar.update(n)
# Elitist selection (parents + offspring → best n)
merged_decs = np.vstack([pop_decs[t], off_decs])
merged_objs = np.vstack([pop_objs[t], off_objs_t])
merged_cons = np.vstack([pop_cons[t], off_cons_t])
sel = selection_elit(objs=merged_objs, n=n, cons=merged_cons)
pop_decs[t] = merged_decs[sel]
pop_objs[t] = merged_objs[sel]
pop_cons[t] = merged_cons[sel]
# --- Elite solution transfer ---
for t in range(nt):
s = source_tasks[t]
# Replace last (worst) with first (best) from source task
elite_dec = pop_decs[s][0].copy()
elite_obj, elite_con_real = evaluation_single(
problem, elite_dec[:dims[t]].reshape(1, -1), t)
nfes += 1
pbar.update(1)
pop_decs[t][-1] = elite_dec
pop_objs[t][-1] = elite_obj.flatten()
pop_cons[t][-1] = 0
if maxC > 0 and elite_con_real.shape[1] > 0:
pop_cons[t][-1, :elite_con_real.shape[1]] = \
elite_con_real.flatten()
# Record history in real space
real_decs, real_cons = space_transfer(
problem, decs=pop_decs, cons=pop_cons, type='real')
append_history(all_decs, real_decs, all_objs, pop_objs,
all_cons, real_cons)
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=max_nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results