Source code for ddmtolab.Algorithms.MTSO.MFEA_AKT

"""
Multifactorial Evolutionary Algorithm with Adaptive Knowledge Transfer (MFEA-AKT)

This module implements MFEA-AKT for multi-task optimization with adaptive crossover
operator selection for inter-task knowledge transfer.

References
----------
    [1] Zhou, Lei, et al. "Toward Adaptive Knowledge Transfer in Multifactorial
        Evolutionary Computation." IEEE Transactions on Cybernetics, 51(5):
        2563-2576, 2021.

Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MFEA_AKT: """ Multifactorial Evolutionary Algorithm with Adaptive Knowledge Transfer. Extends MFEA with 6 crossover operators for inter-task transfer and an adaptive mechanism to select the best operator based on improvement tracking. The 6 crossover operators are: 0: Two-point crossover 1: Uniform crossover 2: Arithmetical crossover (r=0.25) 3: Geometric crossover (r=0.2) 4: BLX-alpha crossover (a=0.3) 5: SBX crossover Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'equal', 'max_nfes': 'equal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, rmp=0.3, gap=20, muc=2, mum=5, save_data=True, save_path='./Data', name='MFEA-AKT', disable_tqdm=True): """ Initialize MFEA-AKT algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int, optional Population size per task (default: 100) max_nfes : int, optional Maximum number of function evaluations per task (default: 10000) rmp : float, optional Random mating probability for inter-task crossover (default: 0.3) gap : int, optional History window size for operator selection fallback (default: 20) muc : float, optional Distribution index for SBX crossover (default: 2) mum : float, optional Distribution index for polynomial mutation (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'MFEA-AKT') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.rmp = rmp self.gap = gap self.muc = muc self.mum = mum self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MFEA-AKT algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n = self.n max_nfes_per_task = par_list(self.max_nfes, nt) max_nfes = self.max_nfes * nt pop_size = n * nt # total population size # Initialize population and evaluate decs = initialization(problem, n) objs, cons = evaluation(problem, decs) nfes = n * nt all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Transform to unified space pop_decs, pop_cons = space_transfer(problem=problem, decs=decs, cons=cons, type='uni', padding='mid') pop_objs = objs pop_sfs = [np.full((n, 1), fill_value=i) for i in range(nt)] # Per-individual metadata (as lists of arrays, one per task) pop_cxf = [np.random.randint(0, 6, size=(n,)) for _ in range(nt)] # Record of best CX factor per generation (for fallback selection) cfb_record = [] pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}", disable=self.disable_tqdm) gen = 1 while nfes < max_nfes: # Merge populations from all tasks into single arrays m_decs, m_objs, m_cons, m_sfs = vstack_groups( pop_decs, pop_objs, pop_cons, pop_sfs) m_cxf = np.concatenate(pop_cxf) # (pop_size,) # --- Generation --- off_decs = np.zeros_like(m_decs) off_objs = np.full_like(m_objs, np.inf) off_cons = np.zeros_like(m_cons) off_sfs = np.zeros_like(m_sfs) off_cxf = np.zeros(pop_size, dtype=int) off_istran = np.zeros(pop_size, dtype=bool) off_parnum = np.full(pop_size, -1, dtype=int) # -1 = no parent tracked shuffled_index = np.random.permutation(pop_size) for i in range(0, pop_size, 2): p1 = shuffled_index[i] p2 = shuffled_index[i + pop_size // 2] if i + pop_size // 2 < pop_size else shuffled_index[(i + 1) % pop_size] sf1 = m_sfs[p1].item() sf2 = m_sfs[p2].item() if sf1 == sf2 or np.random.rand() < self.rmp: p = [p1, p2] if sf1 == sf2: # Same task: SBX crossover off_decs[i], off_decs[i + 1] = crossover( m_decs[p1], m_decs[p2], mu=self.muc) off_cxf[i] = m_cxf[p1] off_cxf[i + 1] = m_cxf[p2] off_istran[i] = False off_istran[i + 1] = False else: # Different tasks: hyberCX with adaptive operator alpha = m_cxf[p[np.random.randint(2)]] off_decs[i], off_decs[i + 1] = _hyber_crossover( m_decs[p1], m_decs[p2], alpha, self.muc) off_cxf[i] = alpha off_cxf[i + 1] = alpha off_istran[i] = True off_istran[i + 1] = True # Task imitation (random parent's task) for k in [i, i + 1]: rand_p = p[np.random.randint(2)] off_sfs[k] = m_sfs[rand_p] if off_istran[k]: off_parnum[k] = rand_p else: # No transfer: mutation off_decs[i] = mutation(m_decs[p1], mu=self.mum) off_decs[i + 1] = mutation(m_decs[p2], mu=self.mum) off_sfs[i] = sf1 off_sfs[i + 1] = sf2 off_cxf[i] = m_cxf[p1] off_cxf[i + 1] = m_cxf[p2] # Clip to [0, 1] off_decs[i] = np.clip(off_decs[i], 0, 1) off_decs[i + 1] = np.clip(off_decs[i + 1], 0, 1) # --- Evaluation --- for idx in range(pop_size): t = off_sfs[idx].item() dec_trimmed = off_decs[idx, :dims[t]] off_objs[idx], off_cons[idx] = evaluation_single( problem, dec_trimmed, t) nfes += pop_size pbar.update(pop_size) # --- Calculate best CXFactor --- imp_num = np.full(6, -np.inf) # max improvement per operator has_any_improvement = False for idx in range(pop_size): if off_parnum[idx] >= 0: cfc = off_objs[idx, 0] par_idx = off_parnum[idx] pfc = m_objs[par_idx, 0] if pfc != 0: imp = (pfc - cfc) / abs(pfc) else: imp = -cfc if cfc > 0 else 0.0 cx = off_cxf[idx] if imp > imp_num[cx]: imp_num[cx] = imp if imp > 0: has_any_improvement = True if has_any_improvement: # Best operator is the one with highest max improvement max_idx = np.argmax(imp_num) else: # Fallback: most frequent best operator in recent history if len(cfb_record) > 0: start = max(0, len(cfb_record) - self.gap) recent = cfb_record[start:] counts = np.bincount(recent, minlength=6) max_idx = np.argmax(counts) else: max_idx = np.random.randint(0, 6) cfb_record.append(max_idx) # --- Adaptive CXFactor update --- for idx in range(pop_size): if off_parnum[idx] >= 0: cfc = off_objs[idx, 0] par_idx = off_parnum[idx] pfc = m_objs[par_idx, 0] imp = (pfc - cfc) / abs(pfc) if pfc != 0 else 0.0 if imp < 0: # Offspring worsened → adopt best operator off_cxf[idx] = max_idx else: # Non-transfer offspring: 50% best, 50% random if np.random.rand() < 0.5: off_cxf[idx] = max_idx else: off_cxf[idx] = np.random.randint(0, 6) # --- Selection: merge parents + offspring, keep best n per task --- merged_decs = np.vstack([m_decs, off_decs]) merged_objs = np.vstack([m_objs, off_objs]) merged_cons = np.vstack([m_cons, off_cons]) merged_sfs = np.vstack([m_sfs, off_sfs]) merged_cxf = np.concatenate([m_cxf, off_cxf]) pop_decs, pop_objs, pop_cons, pop_sfs = [], [], [], [] pop_cxf = [] for t in range(nt): indices = np.where(merged_sfs.flatten() == t)[0] t_decs, t_objs, t_cons = select_by_index( indices, merged_decs, merged_objs, merged_cons) t_cxf = merged_cxf[indices] sel = selection_elit(objs=t_objs, n=n, cons=t_cons) pop_decs.append(t_decs[sel]) pop_objs.append(t_objs[sel]) pop_cons.append(t_cons[sel]) pop_sfs.append(np.full((n, 1), t)) pop_cxf.append(t_cxf[sel]) # Record history (transform back to real space for storage) real_decs, real_cons = space_transfer( problem, decs=pop_decs, cons=pop_cons, type='real') append_history(all_decs, real_decs, all_objs, pop_objs, all_cons, real_cons) gen += 1 pbar.close() runtime = time.time() - start_time results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=max_nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================ # Crossover operators for knowledge transfer # ============================================================ def _hyber_crossover(par1, par2, alpha, muc=2): """ Apply one of 6 crossover operators based on alpha. Parameters ---------- par1, par2 : np.ndarray Parent decision vectors, shape (d,) alpha : int Crossover operator index (0-5) muc : float Distribution index for SBX (operator 5) Returns ------- off1, off2 : np.ndarray Offspring decision vectors, shape (d,) """ if alpha == 0: off1 = _tp_crossover(par1, par2) off2 = _tp_crossover(par2, par1) elif alpha == 1: off1 = _uf_crossover(par1, par2) off2 = _uf_crossover(par2, par1) elif alpha == 2: off1 = _ari_crossover(par1, par2) off2 = _ari_crossover(par2, par1) elif alpha == 3: off1 = _geo_crossover(par1, par2) off2 = _geo_crossover(par2, par1) elif alpha == 4: off1 = _blxa_crossover(par1, par2, a=0.3) off2 = _blxa_crossover(par2, par1, a=0.3) else: # alpha == 5 off1, off2 = crossover(par1, par2, mu=muc) return off1, off2 def _tp_crossover(par1, par2): """Two-point crossover.""" d = len(par1) i, j = sorted(np.random.randint(0, d, size=2)) off = par1.copy() off[i:j + 1] = par2[i:j + 1] return off def _uf_crossover(par1, par2): """Uniform crossover.""" mask = np.random.randint(0, 2, size=len(par1)).astype(bool) off = par1.copy() off[mask] = par2[mask] return off def _ari_crossover(par1, par2, r=0.25): """Arithmetical crossover with ratio r.""" return r * par1 + (1 - r) * par2 def _geo_crossover(par1, par2, r=0.2): """Geometric crossover with ratio r.""" p1 = np.maximum(par1, 1e-15) p2 = np.maximum(par2, 1e-15) return np.power(p1, r) * np.power(p2, 1 - r) def _blxa_crossover(par1, par2, a=0.3): """BLX-alpha crossover.""" cmin = np.minimum(par1, par2) cmax = np.maximum(par1, par2) interval = cmax - cmin low = cmin - interval * a high = cmax + interval * a return low + (high - low) * np.random.rand(len(par1))