Source code for ddmtolab.Algorithms.MTSO.MFEA_VC

"""
Multifactorial Evolutionary Algorithm with Variational Crossover (MFEA-VC)

This module implements MFEA-VC for multi-task optimization using a contrastive
Variational Auto-Encoder (VAE) to guide knowledge transfer in early generations.

References
----------
    [1] Wang, Ruilin, et al. "Contrastive Variational Auto-Encoder Driven
        Convergence Guidance in Evolutionary Multitasking." Applied Soft
        Computing, 163: 111883, 2024.

Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
import torch
import torch.nn as nn
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MFEA_VC: """ Multifactorial Evolutionary Algorithm with Variational Crossover. Uses a VAE (with random weights, no training) to generate cross-task individuals for the first `vae_gens` generations. The VAE encodes both tasks' population data into a shared latent space and decodes to produce mixed-task offspring used as SBX crossover partners. After `vae_gens` generations, reverts to standard MFEA behavior with SBX crossover and polynomial mutation. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'equal', 'max_nfes': 'equal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, rmp=0.3, muc=2, mum=5, vae_gens=25, lam=0.8, save_data=True, save_path='./Data', name='MFEA-VC', disable_tqdm=True): """ Initialize MFEA-VC algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int, optional Population size per task (default: 100) max_nfes : int, optional Maximum number of function evaluations per task (default: 10000) rmp : float, optional Random mating probability (default: 0.3) muc : float, optional Distribution index for SBX crossover (default: 2) mum : float, optional Distribution index for polynomial mutation (default: 5) vae_gens : int, optional Number of generations to use VAE-guided crossover (default: 25) lam : float, optional Lambda scaling factor for VAE latent space (default: 0.8) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'MFEA-VC') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.rmp = rmp self.muc = muc self.mum = mum self.vae_gens = vae_gens self.lam = lam self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MFEA-VC algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n = self.n max_nfes_per_task = par_list(self.max_nfes, nt) max_nfes = self.max_nfes * nt pop_size = n * nt # Initialize population and evaluate decs = initialization(problem, n) objs, cons = evaluation(problem, decs) nfes = n * nt all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Transform to unified space pop_decs, pop_cons = space_transfer(problem=problem, decs=decs, cons=cons, type='uni', padding='mid') pop_objs = objs pop_sfs = [np.full((n, 1), fill_value=i) for i in range(nt)] pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}", disable=self.disable_tqdm) gen = 1 while nfes < max_nfes: # --- VAE generation (first vae_gens generations, 2-task only) --- vae_decs = None if gen <= self.vae_gens and nt == 2: vae_decs = _generate_vae_individuals( pop_decs, pop_objs, pop_sfs, nt, self.lam) # Merge populations m_decs, m_objs, m_cons, m_sfs = vstack_groups( pop_decs, pop_objs, pop_cons, pop_sfs) maxD = m_decs.shape[1] # --- Generation --- off_decs = np.zeros_like(m_decs) off_objs = np.full_like(m_objs, np.inf) off_cons = np.zeros_like(m_cons) off_sfs = np.zeros_like(m_sfs) shuffled = np.random.permutation(pop_size) for pair_idx in range(pop_size // 2): p1 = shuffled[pair_idx] p2 = shuffled[pair_idx + pop_size // 2] sf1 = m_sfs[p1].item() sf2 = m_sfs[p2].item() idx1 = pair_idx * 2 idx2 = pair_idx * 2 + 1 if sf1 == sf2 or np.random.rand() < self.rmp: # --- Transfer: crossover --- if vae_decs is not None and len(vae_decs) > 0: # VAE-guided crossover: crossover with VAE individual vi = np.random.randint(len(vae_decs)) vae_dec = vae_decs[vi] off_decs[idx1], _ = crossover( m_decs[p1], vae_dec, mu=self.muc) off_decs[idx2], _ = crossover( m_decs[p2], vae_dec, mu=self.muc) # Trim to proper length (in unified space, already maxD) off_decs[idx1] = off_decs[idx1][:maxD] off_decs[idx2] = off_decs[idx2][:maxD] else: # Standard SBX crossover off_decs[idx1], off_decs[idx2] = crossover( m_decs[p1], m_decs[p2], mu=self.muc) # Task imitation: random parent's MFFactor off_sfs[idx1] = np.random.choice([sf1, sf2]) off_sfs[idx2] = np.random.choice([sf1, sf2]) else: # --- No transfer: polynomial mutation --- off_decs[idx1] = mutation(m_decs[p1], mu=self.mum) off_decs[idx2] = mutation(m_decs[p2], mu=self.mum) off_sfs[idx1] = sf1 off_sfs[idx2] = sf2 # Clip to [0, 1] off_decs[idx1] = np.clip(off_decs[idx1], 0, 1) off_decs[idx2] = np.clip(off_decs[idx2], 0, 1) # --- Evaluation --- for idx in range(pop_size): t = off_sfs[idx].item() off_objs[idx], off_cons[idx] = evaluation_single( problem, off_decs[idx, :dims[t]], t) nfes += pop_size pbar.update(pop_size) # --- Selection --- merged_decs = np.vstack([m_decs, off_decs]) merged_objs = np.vstack([m_objs, off_objs]) merged_cons = np.vstack([m_cons, off_cons]) merged_sfs = np.vstack([m_sfs, off_sfs]) pop_decs, pop_objs, pop_cons, pop_sfs = [], [], [], [] for t in range(nt): indices = np.where(merged_sfs.flatten() == t)[0] t_decs, t_objs, t_cons = select_by_index( indices, merged_decs, merged_objs, merged_cons) sel = selection_elit(objs=t_objs, n=n, cons=t_cons) pop_decs.append(t_decs[sel]) pop_objs.append(t_objs[sel]) pop_cons.append(t_cons[sel]) pop_sfs.append(np.full((n, 1), t)) # Record history real_decs, real_cons = space_transfer( problem, decs=pop_decs, cons=pop_cons, type='real') append_history(all_decs, real_decs, all_objs, pop_objs, all_cons, real_cons) gen += 1 pbar.close() runtime = time.time() - start_time results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=max_nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================ # VAE model and generation helpers # ============================================================ class _SimpleVAE(nn.Module): """ VAE matching the MATLAB implementation architecture. Encoder: input → FC(H) → ReLU → FC(H) → ReLU → FC(H) → ReLU → FC(H) → Sigmoid → FC_mean(L) → FC_logvar(L) Decoder: FC(H) → ReLU → FC(H) → ReLU → FC(H) → ReLU → FC(H) → Sigmoid → FC(input_size) Note: fc_logvar takes fc_mean's output as input (sequential in MATLAB). """ def __init__(self, input_size, hidden_size=256, latent_size=200): super().__init__() self.encoder_body = nn.Sequential( nn.Linear(input_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, hidden_size), nn.Sigmoid(), ) # Sequential: sigmoid → fc_mean → fc_logvar (logvar takes mean as input) self.fc_mean = nn.Linear(hidden_size, latent_size) self.fc_logvar = nn.Linear(latent_size, latent_size) self.decoder = nn.Sequential( nn.Linear(latent_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, hidden_size), nn.Sigmoid(), nn.Linear(hidden_size, input_size), ) @torch.no_grad() def generate(self, x, lam=0.8): """ Encode, reparameterize, and decode. Parameters ---------- x : torch.Tensor Combined data from both tasks, shape (n1+n2, input_size). First n1 rows = task 1, last n2 rows = task 2. lam : float Lambda scaling factor for latent space. Returns ------- output : np.ndarray Decoded output, shape (n_generated, input_size) """ h = self.encoder_body(x) z_mean_all = self.fc_mean(h) z_logvar_all = self.fc_logvar(z_mean_all) n_total = x.shape[0] n_half = n_total // 2 # Task 1's encoded output → mean, Task 2's → logvar z_mean = z_mean_all[:n_half] z_logvar = z_logvar_all[n_half:] # Reparameterization eps = torch.randn_like(z_mean) z = z_mean + torch.exp(0.5 * z_logvar) * eps # Decode with lambda scaling output = self.decoder(z * lam) return output.numpy() def _generate_vae_individuals(pop_decs, pop_objs, pop_sfs, nt, lam): """ Generate VAE-guided individuals for knowledge transfer. Prepares population data, passes through untrained VAE, and extracts decision variables for use as crossover partners. Parameters ---------- pop_decs : list of np.ndarray Population decision variables per task (unified space) pop_objs : list of np.ndarray Population objective values per task pop_sfs : list of np.ndarray Skill factors per task nt : int Number of tasks (must be 2) lam : float Lambda scaling for VAE latent space Returns ------- vae_decs : list of np.ndarray VAE-generated decision vectors for crossover """ if nt != 2: return [] maxD = pop_decs[0].shape[1] desired_cols = 100 # Build data matrices: [Dec * 10000; MFObj; TaskLabel * 10000] data_tasks = [] for t in range(nt): n_t = pop_decs[t].shape[0] dec_scaled = pop_decs[t] * 10000.0 # (n_t, maxD) obj_vals = pop_objs[t] # (n_t, n_objs) task_label = np.full((n_t, 1), (t + 1) * 10000.0) # data per individual: [Dec*10000, MFObj, TaskLabel*10000] data_t = np.hstack([dec_scaled, obj_vals, task_label]) # (n_t, maxD+n_objs+1) data_tasks.append(data_t) # Pad or truncate to desired_cols per task for t in range(nt): n_t = data_tasks[t].shape[0] if n_t > desired_cols: data_tasks[t] = data_tasks[t][:desired_cols] elif n_t < desired_cols: extra_idx = np.random.randint(0, n_t, size=desired_cols - n_t) data_tasks[t] = np.vstack([data_tasks[t], data_tasks[t][extra_idx]]) # 50/50 train split (train = test in MATLAB code) n_train = desired_cols // 2 for t in range(nt): perm = np.random.permutation(desired_cols) data_tasks[t] = data_tasks[t][perm[:n_train]] # (50, features) # Remove TaskLabel column for encoder input X1 = data_tasks[0][:, :-1] # (50, maxD + n_objs) X2 = data_tasks[1][:, :-1] # (50, maxD + n_objs) input_size = X1.shape[1] # Combine for encoding x_combined = np.vstack([X1, X2]).astype(np.float32) # (100, input_size) # Build VAE with random weights (no training, matching MATLAB istraining=false) vae = _SimpleVAE(input_size, hidden_size=256, latent_size=200) vae.eval() # Generate x_tensor = torch.from_numpy(x_combined) output = vae.generate(x_tensor, lam=lam) # (50, input_size) # Split into task 1 and task 2 n_gen = output.shape[0] n_half = n_gen // 2 new_x1 = output[:n_half] # (25, input_size) new_x2 = output[n_half:] # (25, input_size) # Extract Dec values (first maxD columns) vae_decs = [] for row in new_x1: vae_decs.append(row[:maxD].copy()) for row in new_x2: vae_decs.append(row[:maxD].copy()) return vae_decs