Source code for ddmtolab.Algorithms.MTSO.MFEA_II

"""
Multifactorial Evolutionary Algorithm With Online Transfer Parameter Estimation (MFEA-II)

This module implements MFEA-II for multi-task optimization with knowledge transfer across tasks.

References
----------
    [1] Bali, Kavitesh Kumar, et al. "Multifactorial evolutionary algorithm with online transfer parameter estimation: MFEA-II." IEEE Transactions on Evolutionary Computation 24.1 (2019): 69-83.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.16
Version: 1.0
"""
import time
from tqdm import tqdm
from scipy.stats import norm
from scipy.optimize import minimize_scalar
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Algorithms.MTSO.MFEA import mfea_selection


[docs] class MFEA_II: """ Multifactorial Evolutionary Algorithm With Online Transfer Parameter Estimation Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'equal', 'max_nfes': 'equal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, save_data=True, save_path='./Data', name='MFEA-II', disable_tqdm=True): """ Initialize MFEA-II. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int, optional Population size per task (default: 100) max_nfes : int, optional Maximum number of function evaluations per task (default: 10000) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MFEA_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the Multifactorial Evolutionary Algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n = self.n max_nfes_per_task = par_list(self.max_nfes, nt) max_nfes = self.max_nfes * nt # Initialize population and evaluate for each task decs = initialization(problem, n) objs, cons = evaluation(problem, decs) nfes = n * nt all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Transform populations to unified search space for knowledge transfer pop_decs, pop_cons = space_transfer(problem=problem, decs=decs, cons=cons, type='uni') pop_objs = objs # Skill factor indicates which task each individual belongs to pop_sfs = [np.full((n, 1), fill_value=i) for i in range(nt)] pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}", disable=self.disable_tqdm) while nfes < max_nfes: # Learn RMP matrix online rmpMatrix = learnRMP(pop_decs, dims) # Merge populations from all tasks into single arrays pop_decs, pop_objs, pop_cons, pop_sfs = vstack_groups(pop_decs, pop_objs, pop_cons, pop_sfs) off_decs = np.zeros_like(pop_decs) off_objs = np.zeros_like(pop_objs) off_cons = np.zeros_like(pop_cons) off_sfs = np.zeros_like(pop_sfs) # Randomly pair individuals for assortative mating shuffled_index = np.random.permutation(pop_decs.shape[0]) for i in range(0, len(shuffled_index), 2): p1 = shuffled_index[i] p2 = shuffled_index[i + 1] sf1 = pop_sfs[p1].item() sf2 = pop_sfs[p2].item() rmp_value = rmpMatrix[sf1, sf2] # Cross-task transfer: crossover if same task or rmp condition met if sf1 == sf2 or np.random.rand() < rmp_value: off_dec1, off_dec2 = crossover(pop_decs[p1, :], pop_decs[p2, :], mu=2) off_decs[i, :] = off_dec1 off_decs[i + 1, :] = off_dec2 off_sfs[i] = np.random.choice([sf1, sf2]) off_sfs[i + 1] = sf1 if off_sfs[i] == sf2 else sf2 else: # No transfer: randomly pick individuals from same task for crossover for x, p in enumerate([p1, p2]): sf = pop_sfs[p].item() # Find all individuals with the same skill factor same_sf_indices = np.where(pop_sfs.flatten() == sf)[0] # Remove current individual from candidates same_sf_indices = same_sf_indices[same_sf_indices != p] # Randomly select another individual from the same task idx = np.random.choice(same_sf_indices) # Crossover with the selected individual off_dec_curr, _ = crossover(pop_decs[p, :], pop_decs[idx, :], mu=2) off_dec_curr = mutation(off_dec_curr, mu=5) off_decs[i + x, :] = off_dec_curr # Inherit skill factor from parent off_sfs[i + x] = sf # Trim to task dimensionality and evaluate offspring task_idx1 = off_sfs[i].item() task_idx2 = off_sfs[i + 1].item() off_dec1_trimmed = off_decs[i, :dims[task_idx1]] off_dec2_trimmed = off_decs[i + 1, :dims[task_idx2]] off_objs[i, :], off_cons[i, :] = evaluation_single(problem, off_dec1_trimmed, task_idx1) off_objs[i + 1, :], off_cons[i + 1, :] = evaluation_single(problem, off_dec2_trimmed, task_idx2) # Merge parents and offspring populations pop_decs, pop_objs, pop_cons, pop_sfs = vstack_groups( (pop_decs, off_decs), (pop_objs, off_objs), (pop_cons, off_cons), (pop_sfs, off_sfs) ) # Environmental selection: keep best n individuals per task pop_decs, pop_objs, pop_cons, pop_sfs = mfea_selection(pop_decs, pop_objs, pop_cons, pop_sfs, n, nt) # Transform back to native search space decs, cons = space_transfer(problem=problem, decs=pop_decs, cons=pop_cons, type='real') nfes += n * nt pbar.update(n * nt) append_history(all_decs, decs, all_objs, pop_objs, all_cons, cons) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=max_nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def learnRMP(subpops, vars): """ Learn the relationship matrix (RMP) between multiple tasks. Parameters ---------- subpops : list List of subpopulations, either as numpy arrays or dicts with 'data' key. Each subpopulation contains solution variables for one task. vars : list or array-like Dimensionality (number of variables) for each task. Returns ------- rmpMatrix : np.ndarray Symmetric relationship matrix of shape (numtasks, numtasks). rmpMatrix[i,j] indicates the similarity between task i and task j. Diagonal elements are 1.0, off-diagonal values are in [0, 1]. Notes ----- The RMP (Relationship Matrix of Problems) quantifies inter-task similarities by computing probabilistic overlap between learned Gaussian models. Higher RMP values indicate stronger task relationships, enabling better knowledge transfer in multi-task optimization. """ # Convert to dict format if needed if isinstance(subpops, list) and isinstance(subpops[0], np.ndarray): subpops = [{'data': pop} for pop in subpops] numtasks = len(subpops) maxDim = max(vars) rmpMatrix = np.eye(numtasks) # Add noise and build probabilistic models probmodel = [] for i in range(numtasks): model = {} model['nsamples'] = subpops[i]['data'].shape[0] nrandsamples = int(np.floor(0.1 * model['nsamples'])) # Create random samples with maxDim columns randMat = np.random.rand(nrandsamples, maxDim) # Pad subpops data to maxDim with ZEROS (to match MATLAB behavior) current_data = subpops[i]['data'] padded_data = current_data # Combine original data with random samples combined_data = np.vstack([padded_data, randMat]) model['mean'] = np.mean(combined_data, axis=0) model['stdev'] = np.std(combined_data, axis=0, ddof=1) probmodel.append(model) # Compute pairwise similarities for i in range(numtasks): for j in range(i + 1, numtasks): popdata = [ {'probmatrix': np.ones((probmodel[i]['nsamples'], 2))}, {'probmatrix': np.ones((probmodel[j]['nsamples'], 2))} ] Dim = min(vars[i], vars[j]) # Compute probabilities for population i for k in range(probmodel[i]['nsamples']): for l in range(Dim): popdata[0]['probmatrix'][k, 0] *= norm.pdf( subpops[i]['data'][k, l], probmodel[i]['mean'][l], probmodel[i]['stdev'][l] ) popdata[0]['probmatrix'][k, 1] *= norm.pdf( subpops[i]['data'][k, l], probmodel[j]['mean'][l], probmodel[j]['stdev'][l] ) # Compute probabilities for population j for k in range(probmodel[j]['nsamples']): for l in range(Dim): popdata[1]['probmatrix'][k, 0] *= norm.pdf( subpops[j]['data'][k, l], probmodel[i]['mean'][l], probmodel[i]['stdev'][l] ) popdata[1]['probmatrix'][k, 1] *= norm.pdf( subpops[j]['data'][k, l], probmodel[j]['mean'][l], probmodel[j]['stdev'][l] ) # Optimize to find RMP value result = minimize_scalar( lambda x: loglik(x, popdata, numtasks), bounds=(0, 1), method='bounded' ) rmp_value = max(0, result.x + np.random.normal(0, 0.01)) rmp_value = min(rmp_value, 1) rmpMatrix[i, j] = rmp_value rmpMatrix[j, i] = rmp_value return rmpMatrix def loglik(rmp, popdata, ntasks): """ Compute the negative log-likelihood for a given RMP value. Parameters ---------- rmp : float Relationship matrix parameter value in [0, 1] to evaluate. Represents the strength of inter-task relationship. popdata : list List of dicts, each containing 'probmatrix' of shape (nsamples, 2). probmatrix[:, 0] are probabilities under own task model, probmatrix[:, 1] are probabilities under other task model. ntasks : int Total number of tasks in the multi-task problem. Returns ------- f : float Negative log-likelihood value. Lower values indicate better fit of the RMP parameter to the observed probability distributions. Notes ----- This function is used as the objective in optimization to find the optimal RMP value that maximizes the likelihood of observing the population data under a mixture model with inter-task knowledge transfer. """ f = 0 # Make a copy to avoid modifying the original popdata_copy = [{'probmatrix': pop['probmatrix'].copy()} for pop in popdata] for i in range(2): for j in range(2): if i == j: popdata_copy[i]['probmatrix'][:, j] *= (1 - (0.5 * (ntasks - 1) * rmp / ntasks)) else: popdata_copy[i]['probmatrix'][:, j] *= 0.5 * (ntasks - 1) * rmp / ntasks # Compute negative log-likelihood f += np.sum(-np.log(np.sum(popdata_copy[i]['probmatrix'], axis=1))) return f