Source code for ddmtolab.Algorithms.STSO.EO

"""
Equilibrium Optimizer (EO)

This module implements the Equilibrium Optimizer for single-objective optimization problems.

References
----------
    [1] Faramarzi, A., Heidarinejad, M., Stephens, B., & Mirjalili, S. (2020). Equilibrium optimizer: A novel optimization algorithm. Knowledge-Based Systems, 191, 105190.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.12
Version: 1.0
"""
import time
from tqdm import tqdm
import numpy as np
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class EO: """ Equilibrium Optimizer for single-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, a1=2, a2=1, v=1, gp=0.5, save_data=True, save_path='./Data', name='EO', disable_tqdm=True): """ Initialize Equilibrium Optimizer. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) a1 : float, optional Exploration constant (default: 2) a2 : float, optional Exploitation constant (default: 1) v : float, optional Volume coefficient (default: 1) gp : float, optional Generation probability (default: 0.5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'EO_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.a1 = a1 self.a2 = a2 self.v = v self.gp = gp self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the Equilibrium Optimizer algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population in [0,1] space and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Initialize equilibrium candidates (Ceq) for each task # Ceq contains 4 best solutions ceq_decs = [None] * nt ceq_objs = [None] * nt ceq_cons = [None] * nt for i in range(nt): dim = problem.dims[i] # Initialize 4 equilibrium candidates with worst values ceq_decs[i] = np.zeros((4, dim)) ceq_objs[i] = np.full((4, 1), np.inf) ceq_cons[i] = np.full((4, 1), np.inf) total_nfes = sum(max_nfes_per_task) pbar = tqdm(total=total_nfes, initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < total_nfes: # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: dim = problem.dims[i] # Update equilibrium candidates (Ceq) cvs = np.sum(np.maximum(0, cons[i]), axis=1) for j in range(n_per_task[i]): p_obj = objs[i][j, 0] p_cv = cvs[j] # Compare with each Ceq and update if better for k in range(4): c_obj = ceq_objs[i][k, 0] c_cv = ceq_cons[i][k, 0] if (p_cv < c_cv) or (p_cv == c_cv and p_obj < c_obj): # Insert at position k and shift others if k < 3: ceq_decs[i][k + 1:] = ceq_decs[i][k:-1].copy() ceq_objs[i][k + 1:] = ceq_objs[i][k:-1].copy() ceq_cons[i][k + 1:] = ceq_cons[i][k:-1].copy() ceq_decs[i][k] = decs[i][j].copy() ceq_objs[i][k, 0] = p_obj ceq_cons[i][k, 0] = p_cv break # Calculate average equilibrium candidate (Ceq_ave) ceq_ave_dec = np.mean(ceq_decs[i], axis=0, keepdims=True) # Create equilibrium pool (4 Ceq + 1 Ceq_ave) c_pool_decs = np.vstack([ceq_decs[i], ceq_ave_dec]) # Calculate ratio for exponential term ratio = (1 - nfes_per_task[i] / max_nfes_per_task[i]) ** \ (self.a2 * nfes_per_task[i] / max_nfes_per_task[i]) # Generate offspring new_decs = np.zeros_like(decs[i]) for j in range(n_per_task[i]): # Random parameters lam = np.random.rand(dim) r = np.random.rand(dim) # Randomly select from equilibrium pool ceq_dec = c_pool_decs[np.random.randint(0, 5)] # Exponential term (F) F = self.a1 * np.sign(r - 0.5) * (np.exp(-lam * ratio) - 1) # Generation control parameter (GCP) r1 = np.random.rand() r2 = np.random.rand() GCP = 0.5 * r1 * np.ones(dim) * (r2 >= self.gp) # Generation (G) G0 = GCP * (ceq_dec - lam * decs[i][j]) G = G0 * F # Update position new_decs[j] = ceq_dec + (decs[i][j] - ceq_dec) * F + \ (G / (lam + 1e-10) * self.v) * (1 - F) # Boundary constraint handling: clip to [0,1] space new_decs = np.clip(new_decs, 0, 1) # Evaluate new solutions new_objs, new_cons = evaluation_single(problem, new_decs, i) # Tournament selection: keep better solution new_cvs = np.sum(np.maximum(0, new_cons), axis=1) old_cvs = np.sum(np.maximum(0, cons[i]), axis=1) for j in range(n_per_task[i]): # Compare by constraint violation first, then objective if (new_cvs[j] < old_cvs[j]) or \ (new_cvs[j] == old_cvs[j] and new_objs[j] < objs[i][j]): decs[i][j] = new_decs[j] objs[i][j] = new_objs[j] cons[i][j] = new_cons[j] nfes_per_task[i] += n_per_task[i] pbar.update(n_per_task[i]) # Append current population to history append_history(all_decs[i], decs[i], all_objs[i], objs[i], all_cons[i], cons[i]) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results