Source code for ddmtolab.Algorithms.MTSO.SELF

"""
A Surrogate-Assisted Evolutionary Framework for Expensive Multitask Optimization Problems (SELF)

This module implements SELF using multi-task Gaussian processes and Bayesian optimization for expensive multi-task
optimization.

References
----------
    [1] Tan, Shenglian, et al. "A surrogate-assisted evolutionary framework for expensive multitask optimization problems." IEEE Transactions on Evolutionary Computation (2024).

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.11.18
Version: 1.0
"""
import numpy as np
from tqdm import tqdm
import torch
import time

from ddmtolab.Methods.Algo_Methods.bo_utils import mtgp_predict, mtgp_build, mtgp_task_corr
from ddmtolab.Methods.mtop import MTOP
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.acquisition import LogExpectedImprovement
from botorch.models.transforms import Standardize
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Algorithms.STSO.DE import DE
import warnings
warnings.filterwarnings("ignore")


[docs] class SELF: """ Surrogate-Assisted Evolutionary Framework for expensive multi-task optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'True', 'n_initial': 'equal', 'max_nfes': 'unequal, controlled by SELF' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, max_nfes=None, np=10, F=0.5, CR=0.9, ng=50, nl=50, save_data=True, save_path='./Data', name='SELF', disable_tqdm=True): """ Initialize SELF algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) np : int, optional Population size (default: 10) F : float, optional Mutation factor for DE (default: 0.5) CR : float, optional Crossover rate for DE (default: 0.9) ng : int, optional Number of trial vectors in global knowledge transfer phase (default: 50) nl : int, optional Sample size for training GP model in local knowledge transfer phase (default: 50) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'SELF_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.max_nfes = max_nfes if max_nfes is not None else 200 self.np = np if max_nfes is not None else 10 self.F = F self.CR = CR self.ng = ng self.nl = nl self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the SELF algorithm with three phases: global transfer, local optimization, and local transfer. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() data_type = torch.double problem = self.problem nt = problem.n_tasks dims = problem.dims n_initial_per_task = par_list(self.np, nt) nfes_per_task = [0] * nt max_nfes = self.max_nfes * nt # Initialize samples using Latin Hypercube Sampling decs = initialization(problem, self.np, method='lhs') objs, _ = evaluation(problem, decs) nfes = self.np * nt for i in range(nt): nfes_per_task[i] += self.np # Working population for evolutionary updates pop_decs = copy.deepcopy(decs) pop_objs = copy.deepcopy(objs) pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}", disable=self.disable_tqdm) while nfes < max_nfes: # === Global Knowledge Transfer Phase === # Build multi-task Gaussian process and extract task correlations objs_normalized, _, _ = normalize(objs, axis=0, method='minmax') mtgp = mtgp_build(decs, objs_normalized, dims, data_type=data_type) task_corr = mtgp_task_corr(mtgp) for i in range(nt): for j in range(self.np): # Generate candidates using DE guided by current best off_decs = de_generation_with_core(pop_decs[i], pop_objs[i], pop_decs[i][j], self.ng, self.F, self.CR) # Predict objectives using MTGP surrogate pred_objs, pred_std = mtgp_predict(mtgp=mtgp, off_decs=off_decs, task_id=i, dims=dims, nt=nt, data_type=data_type) # Evaluate candidate with minimum predicted objective best_idx = np.argmin(pred_objs) best_off_dec = off_decs[[best_idx], :] true_obj, _ = evaluation_single(problem, best_off_dec, i) # Greedy update: replace if offspring is better if true_obj < pop_objs[i][j]: pop_decs[i][j] = best_off_dec[0] pop_objs[i][j] = true_obj[0] decs[i], objs[i] = vstack_groups((decs[i], best_off_dec), (objs[i], true_obj)) nfes += 1 nfes_per_task[i] += 1 pbar.update(1) # === Local Optimization Phase === for i in range(nt): # Select top nl individuals for local GP model if len(decs[i]) <= self.nl: nearest_decs = decs[i] nearest_objs = objs[i] else: best_indices = np.argsort(objs[i].flatten())[:self.nl] nearest_decs = decs[i][best_indices] nearest_objs = objs[i][best_indices] # Generate next point via Bayesian optimization with LogEI candidate = bo_next_point_de(nearest_decs, nearest_objs, dims[i], data_type) true_obj, _ = evaluation_single(problem, candidate, i) # Update working population if candidate improves if np.any(true_obj < pop_objs[i]): is_duplicate = np.any(np.all(np.isclose(pop_decs[i], candidate[0]), axis=1)) if not is_duplicate: worse_indices = np.where(pop_objs[i] > true_obj)[0] if len(worse_indices) > 0: worst_idx = worse_indices[np.argmax(pop_objs[i][worse_indices])] pop_decs[i][worst_idx] = candidate[0] pop_objs[i][worst_idx] = true_obj[0] decs[i], objs[i] = vstack_groups((decs[i], candidate), (objs[i], true_obj)) nfes += 1 nfes_per_task[i] += 1 pbar.update(1) # === Local Knowledge Transfer Phase === for i in range(nt): transfer_samples = [] # Probabilistic transfer based on task correlation for j in range(nt): if i == j: continue if np.random.rand() < abs(task_corr[i][j]): best_idx = np.argmin(pop_objs[j]) transfer_samples.append(pop_decs[j][best_idx]) if len(transfer_samples) > 0: transfer_samples = np.array(transfer_samples) # Adjust dimensions to match target task if transfer_samples.shape[1] > dims[i]: transfer_samples = transfer_samples[:, :dims[i]] elif transfer_samples.shape[1] < dims[i]: padding = np.zeros((transfer_samples.shape[0], dims[i] - transfer_samples.shape[1])) transfer_samples = np.hstack((transfer_samples, padding)) true_obj, _ = evaluation_single(problem, transfer_samples, i) # Select best transfer sample and update population best_idx = np.argmin(true_obj) best_sample = transfer_samples[[best_idx], :] best_sample_obj = true_obj[best_idx] if np.any(best_sample_obj < pop_objs[i]): is_duplicate = np.any(np.all(np.isclose(pop_decs[i], best_sample[0]), axis=1)) if not is_duplicate: worse_indices = np.where(pop_objs[i] > best_sample_obj)[0] if len(worse_indices) > 0: worst_idx = worse_indices[np.argmax(pop_objs[i][worse_indices])] pop_decs[i][worst_idx] = best_sample[0] pop_objs[i][worst_idx] = true_obj[0] decs[i], objs[i] = vstack_groups((decs[i], transfer_samples), (objs[i], true_obj)) nfes += len(transfer_samples) nfes_per_task[i] += len(transfer_samples) pbar.update(len(transfer_samples)) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=1) # Save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def de_generation_with_core(parents, parents_objs, core_parent, n_off, F, CR): """ Generate offspring using hybrid DE mutation strategy. Parameters ---------- parents : np.ndarray Parent solutions of shape (n, d) parents_objs : np.ndarray Objective values of parents of shape (n,) or (n, 1) core_parent : np.ndarray Current individual being evolved of shape (1, d) or (d,) n_off : int Number of offspring to generate F : float Differential weight (mutation scale factor) CR : float Crossover rate in [0, 1] for binomial crossover Returns ------- offdecs : np.ndarray Offspring array of shape (n_off, d), clipped to [0, 1] Notes ----- Uses a hybrid strategy with 50% probability each: - DE/best/1 with binomial crossover: v = best + F*(r2 - r3) - DE/current/1 without crossover: v = current + F*(r2 - r3) """ n, d = parents.shape # Ensure correct dimensions if core_parent.ndim == 2: core_parent = core_parent.squeeze() if parents_objs.ndim == 2: parents_objs = parents_objs.flatten() # Find best parent best_id = np.argmin(parents_objs) best_parent = parents[best_id] offdecs = np.zeros((n_off, d), dtype=float) # Generate offspring for j in range(n_off): id_set = np.random.permutation(n) r1, r2, r3 = id_set[0], id_set[1], id_set[2] if np.random.rand() < 0.5: # DE/best/1 with binomial crossover v = best_parent + F * (parents[r2] - parents[r3]) trial = np.where(np.random.rand(d) < CR, v, parents[r1]) else: # DE/current/1 without crossover trial = core_parent + F * (parents[r2] - parents[r3]) offdecs[j] = np.clip(trial, 0.0, 1.0) return offdecs def bo_next_point_de(decs, objs, dim, data_type=torch.float): """ Generate next sampling point using Bayesian Optimization with Log Expected Improvement. Parameters ---------- decs : np.ndarray Decision variables (training data) of shape (n_samples, dim) objs : np.ndarray Objective values (training data) of shape (n_samples, 1) dim : int Dimension of the problem data_type : torch.dtype, optional Data type for torch tensors (default: torch.float) Returns ------- candidate_np : np.ndarray Next sampling point of shape (1, dim) Notes ----- Builds a Gaussian Process with the provided data, constructs a Log Expected Improvement acquisition function, and optimizes it using Differential Evolution to find the next most promising point to evaluate. """ # Prepare training data for Gaussian Process train_X = torch.tensor(decs, dtype=data_type) train_Y = torch.tensor(-objs, dtype=data_type) # Build and fit Gaussian Process model gp = SingleTaskGP(train_X=train_X, train_Y=train_Y, outcome_transform=Standardize(m=1)) mll = ExactMarginalLogLikelihood(gp.likelihood, gp) fit_gpytorch_mll(mll) # Build Log Expected Improvement acquisition function best_f = train_Y.max() logEI = LogExpectedImprovement(model=gp, best_f=best_f) # Wrap LogEI as numpy function for DE optimizer def logEI_func(x): if x.ndim == 1: x = x.reshape(1, -1) x_torch = torch.tensor(x, dtype=data_type) with torch.no_grad(): logei_value = logEI(x_torch) logei_np = logei_value.detach().cpu().numpy() return -logei_np.flatten() if x.shape[0] == 1 else -logei_np # Optimize LogEI using Differential Evolution problem = MTOP() problem.add_task(logEI_func, dim=dim) de = DE(problem, n=50, max_nfes=5000, F=0.5, CR=0.9, save_data=False, disable_tqdm=True) result = de.optimize() return result.best_decs