Source code for ddmtolab.Algorithms.MTSO.EBS

"""
EBS (Evolutionary Biocoenosis-based Symbiosis)

This module implements the EBS algorithm for evolutionary many-tasking optimization.

References
----------
    [1] Liaw, R. T., & Ting, C. K. (2019). Evolutionary many-tasking based on biocoenosis through symbiosis: A framework and benchmark problems. In 2019 IEEE Congress on Evolutionary Computation (CEC) (pp. 2266-2273). IEEE.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.09
Version: 1.0
"""
from tqdm import tqdm
import copy
import time
import numpy as np
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class EBS: """ Evolutionary Biocoenosis-based Symbiosis for many-task optimization. EBS uses multiple CMA-ES instances with adaptive information exchange among tasks. Each task maintains two CMA-ES distributions: - One updated when knowledge transfer occurs - One updated when no knowledge transfer occurs The information exchange probability is controlled adaptively based on the improvement ratio from self-generated offspring versus offspring from other tasks. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, sigma0=0.3, use_n=True, gen_init=10, save_data=True, save_path='./Data', name='EBS', disable_tqdm=True): """ Initialize EBS Algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: None, will use 4+3*log(D)) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) sigma0 : float, optional Initial step size for CMA-ES (default: 0.3) use_n : bool, optional If True, use provided n; if False, use 4+3*log(D) (default: True) gen_init : int, optional Number of initial generations for alternating CMA-ES before using gamma (default: 10) During this phase, two CMA-ES alternate (one without transfer, one with transfer) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'EBS_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.sigma0 = sigma0 self.use_n = use_n self.gen_init = gen_init self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the EBS Algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims d_max = np.max(dims) # Unified dimension max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize CMA-ES parameters for each task (using unified dimension) params = [] for t in range(nt): # Determine population size based on original task dimension if self.use_n: lam = par_list(self.n, nt)[t] else: lam = int(4 + 3 * np.log(dims[t])) # Distribution for self-generated offspring (no knowledge transfer) params_s = cmaes_init_params(d_max, lam=lam, sigma0=self.sigma0) # Distribution for knowledge transfer offspring (same starting point) params_o = copy.deepcopy(params_s) params_o['m_dec'] = params_s['m_dec'].copy() params.append({ 'real_dim': dims[t], # Real dimension for this task 'params_s': params_s, # Self distribution (no transfer) 'params_o': params_o, # Other distribution (with transfer) }) # Initialize tracking variables nfes_per_task = [0] * nt decs = [None] * nt # In unified space objs = [None] * nt cons = [None] * nt best_objs = [np.inf] * nt # Best-so-far objective for each task all_decs = [[] for _ in range(nt)] all_objs = [[] for _ in range(nt)] all_cons = [[] for _ in range(nt)] # Initialize information exchange statistics improvements_s = [0] * nt evals_s = [0] * nt improvements_o = [0] * nt evals_o = [0] * nt gamma = [0.0] * nt # Probability of information exchange (will be computed after init phase) # Initialize generation counter for each task (for alternating during init phase) gen_count = [0] * nt pbar = tqdm(total=sum(max_nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break # Step 1: Determine transfer flags and generate offspring accordingly transfer_flags = [] offspring_list = [] for i in active_tasks: p = params[i] # Decide whether to perform knowledge transfer if gen_count[i] < self.gen_init: # During init phase: alternate between no transfer and transfer # Even generations: no transfer (is_transfer=False) # Odd generations: transfer (is_transfer=True) is_transfer = (gen_count[i] % 2 == 1) else: # After init phase: use gamma probability is_transfer = np.random.rand() < gamma[i] transfer_flags.append(is_transfer) # Select distribution based on transfer decision if is_transfer: ps = p['params_o'] # Use knowledge transfer distribution else: ps = p['params_s'] # Use self distribution # Generate offspring using selected CMA-ES distribution sample_decs = cmaes_sample( ps['m_dec'], ps['sigma'], ps['B'], ps['D'], ps['lam'] ) offspring_list.append(sample_decs) # Concatenate all offspring (all in unified space d_max) concat_offspring = np.vstack(offspring_list) # Step 2: For each task, select candidates based on transfer decision candidate_list = [] for idx, i in enumerate(active_tasks): p = params[i] is_transfer = transfer_flags[idx] if is_transfer: # Perform knowledge transfer: sample from concatenate offspring # Randomly select lambda candidates from concatenate offspring n_candidates = min(p['params_s']['lam'], concat_offspring.shape[0]) candidate_indices = np.random.choice( concat_offspring.shape[0], size=n_candidates, replace=False ) candidate_decs = concat_offspring[candidate_indices].copy() # If we need more candidates, duplicate some while candidate_decs.shape[0] < p['params_s']['lam']: extra_idx = np.random.choice(concat_offspring.shape[0]) candidate_decs = np.vstack([candidate_decs, concat_offspring[extra_idx:extra_idx + 1]]) candidate_list.append(candidate_decs) else: # No knowledge transfer: use self-generated offspring candidate_list.append(offspring_list[idx]) # Step 3: Evaluate, update population and CMA-ES parameters for idx, i in enumerate(active_tasks): p = params[i] candidate_decs = candidate_list[idx] # In unified space is_transfer = transfer_flags[idx] # Convert to real space for evaluation (truncate to real dimension) candidate_decs_real = candidate_decs[:, :dims[i]] candidate_decs_real = np.clip(candidate_decs_real, 0, 1) # Ensure bounds # Evaluate candidates (in real space) sample_objs, sample_cons = evaluation_single(problem, candidate_decs_real, i) # Sort by constraint violation first, then by objective cvs = np.sum(np.maximum(0, sample_cons), axis=1) sort_indices = constrained_sort(sample_objs, cvs) # Sort in unified space sorted_decs = candidate_decs[sort_indices] sorted_objs = sample_objs[sort_indices] sorted_cons = sample_cons[sort_indices] # Update evaluation counts if is_transfer: evals_o[i] += p['params_s']['lam'] else: evals_s[i] += p['params_s']['lam'] # Check if best-so-far is improved best_candidate_obj = sorted_objs[0, 0] if best_candidate_obj < best_objs[i]: best_objs[i] = best_candidate_obj if is_transfer: improvements_o[i] += 1 else: improvements_s[i] += 1 # Increment generation counter gen_count[i] += 1 # Update gamma after init phase is complete if gen_count[i] == self.gen_init: # Compute gamma based on accumulated statistics if evals_s[i] > 0 and evals_o[i] > 0: R_s = improvements_s[i] / evals_s[i] R_o = improvements_o[i] / evals_o[i] if (R_s + R_o) > 0: gamma[i] = R_o / (R_s + R_o) else: gamma[i] = 0.0 # Default if no improvements else: gamma[i] = 0.0 # Default elif gen_count[i] > self.gen_init: # Continue updating gamma based on accumulated statistics if evals_s[i] > 0 and evals_o[i] > 0: R_s = improvements_s[i] / evals_s[i] R_o = improvements_o[i] / evals_o[i] if (R_s + R_o) > 0: gamma[i] = R_o / (R_s + R_o) # Update current population (store in unified space) decs[i] = sorted_decs objs[i] = sorted_objs cons[i] = sorted_cons nfes_per_task[i] += p['params_s']['lam'] pbar.update(p['params_s']['lam']) # Convert to real space for history (truncate to real dimension) decs_real = sorted_decs[:, :dims[i]] append_history(all_decs[i], decs_real, all_objs[i], sorted_objs, all_cons[i], sorted_cons) # Update the appropriate CMA-ES distribution if is_transfer: # Update knowledge transfer CMA-ES cmaes_update(p['params_o'], sorted_decs, nfes_per_task[i]) else: # Update self CMA-ES cmaes_update(p['params_s'], sorted_decs, nfes_per_task[i]) pbar.close() runtime = time.time() - start_time # Save results (all_decs are already in real space) results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results