Source code for ddmtolab.Algorithms.STSO.sep_CMA_ES

"""
sep-CMA-ES (Separable Covariance Matrix Adaptation Evolution Strategy)

This module implements the sep-CMA-ES algorithm for single-objective optimization problems.
sep-CMA-ES achieves linear time and space complexity by using a diagonal covariance matrix.

References
----------
    [1] Ros, R., & Hansen, N. (2008). A Simple Modification in CMA-ES Achieving Linear Time and Space Complexity. Parallel Problem Solving from Nature, PPSN X, 296-305. DOI: 10.1007/978-3-540-87700-4_30

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.27
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class sep_CMA_ES: """ sep-CMA-ES for single-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, sigma0=0.3, use_n=True, save_data=True, save_path='./Data', name='sep-CMA-ES', disable_tqdm=True): """ Initialize sep-CMA-ES Algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: None, will use 4+3*log(D)) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) sigma0 : float, optional Initial step size (default: 0.3) use_n : bool, optional If True, use provided n; if False, use 4+3*log(D) (default: True) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'sep_CMA_ES_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.sigma0 = sigma0 self.use_n = use_n self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the sep-CMA-ES Algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize parameters for each task params = [] for t in range(nt): dim = problem.dims[t] # Determine population size if self.use_n: lam = par_list(self.n, nt)[t] else: lam = int(4 + 3 * np.log(dim)) mu = int(np.round(lam / 2)) # Recombination weights weights = np.log(mu + 0.5) - np.log(np.arange(1, mu + 1)) weights = weights / np.sum(weights) mueff = 1.0 / np.sum(weights ** 2) # Step size control parameters (different from standard CMA-ES) cs = (mueff + 2) / (dim + mueff + 3) damps = 1 + cs + 2 * max(np.sqrt((mueff - 1) / (dim + 1)) - 1, 0) # Covariance update parameters (different from standard CMA-ES) cc = 4 / (4 + dim) ccov = (1 / mueff) * (2 / (dim + np.sqrt(2)) ** 2) + \ (1 - 1 / mueff) * min(1, (2 * mueff - 1) / ((dim + 2) ** 2 + mueff)) ccov = (dim + 2) / 3 * ccov # Initialize m_dec = np.random.rand(dim) ps = np.zeros(dim) pc = np.zeros(dim) C = np.ones(dim) # Diagonal covariance matrix (vector form) sigma = self.sigma0 chiN = np.sqrt(dim) * (1 - 1 / (4 * dim) + 1 / (21 * dim ** 2)) params.append({ 'dim': dim, 'lam': lam, 'mu': mu, 'weights': weights, 'mueff': mueff, 'cs': cs, 'damps': damps, 'cc': cc, 'ccov': ccov, 'm_dec': m_dec, 'ps': ps, 'pc': pc, 'C': C, 'sigma': sigma, 'chiN': chiN }) # Initialize tracking variables nfes_per_task = [0] * nt decs = [None] * nt objs = [None] * nt cons = [None] * nt all_decs = [[] for _ in range(nt)] all_objs = [[] for _ in range(nt)] all_cons = [[] for _ in range(nt)] pbar = tqdm(total=sum(max_nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: p = params[i] # Generate offspring using sep_cmaes_generation sample_decs = sep_cmaes_generation( m_dec=p['m_dec'], sigma=p['sigma'], C=p['C'], lam=p['lam'] ) # Evaluate samples sample_objs, sample_cons = evaluation_single(problem, sample_decs, i) # Sort by constraint violation first, then by objective cvs = np.sum(np.maximum(0, sample_cons), axis=1) sort_indices = np.lexsort((sample_objs.flatten(), cvs)) sample_decs = sample_decs[sort_indices] sample_objs = sample_objs[sort_indices] sample_cons = sample_cons[sort_indices] # Update current population decs[i] = sample_decs objs[i] = sample_objs cons[i] = sample_cons nfes_per_task[i] += p['lam'] pbar.update(p['lam']) # Append to history append_history(all_decs[i], decs[i], all_objs[i], objs[i], all_cons[i], cons[i]) # Update mean decision variables old_dec = p['m_dec'].copy() p['m_dec'] = p['weights'] @ sample_decs[:p['mu']] # Update evolution paths diff = (p['m_dec'] - old_dec) / p['sigma'] p['ps'] = (1 - p['cs']) * p['ps'] + \ np.sqrt(p['cs'] * (2 - p['cs']) * p['mueff']) * (diff / np.sqrt(p['C'])) ps_norm = np.linalg.norm(p['ps']) hsig = ps_norm / np.sqrt(1 - (1 - p['cs']) ** (2 * nfes_per_task[i] / p['lam'])) / p[ 'chiN'] < 1.4 + 2 / (p['dim'] + 1) p['pc'] = (1 - p['cc']) * p['pc'] + \ hsig * np.sqrt(p['cc'] * (2 - p['cc']) * p['mueff']) * diff # Update diagonal covariance matrix artmp = (sample_decs[:p['mu']] - old_dec) / p['sigma'] delta = (1 - hsig) * p['cc'] * (2 - p['cc']) p['C'] = (1 - p['ccov']) * p['C'] + \ (p['ccov'] / p['mueff']) * (p['pc'] ** 2 + delta * p['C']) + \ p['ccov'] * (1 - 1 / p['mueff']) * (artmp.T ** 2 @ p['weights']) # Update step size p['sigma'] = p['sigma'] * np.exp(p['cs'] / p['damps'] * (ps_norm / p['chiN'] - 1)) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
def sep_cmaes_generation(m_dec: np.ndarray, sigma: float, C: np.ndarray, lam: int = None) -> np.ndarray: """ Generate offspring population using sep-CMA-ES sampling strategy. Uses diagonal covariance matrix for linear time and space complexity. Parameters ---------- m_dec : np.ndarray Mean decision vector, shape (d,) sigma : float Step size (global scaling factor) C : np.ndarray Diagonal elements of covariance matrix, shape (d,) lam : int, optional Number of offspring to generate (default: None) Returns ------- offdecs : np.ndarray Offspring decision variables, shape (lam, d) """ d = len(m_dec) # If lam is None, generate a default number if lam is None: lam = int(4 + 3 * np.log(d)) offdecs = np.zeros((lam, d)) for i in range(lam): z = np.random.randn(d) offdec = m_dec + sigma * (np.sqrt(C) * z) offdec = np.clip(offdec, 0, 1) offdecs[i] = offdec return offdecs