"""
OpenAI-ES (OpenAI Evolution Strategies)
This module implements the OpenAI-ES algorithm for single-objective optimization problems.
OpenAI-ES uses antithetic sampling and momentum-based gradient descent.
References
----------
[1] Salimans, T., Ho, J., Chen, X., Sidor, S., & Sutskever, I. (2017). Evolution Strategies as a Scalable Alternative to Reinforcement Learning. arXiv:1703.03864 [stat.ML].
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.27
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class OpenAI_ES:
"""
OpenAI-ES for single-objective optimization.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'False',
'n': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, sigma=1.0, lr=1e-3, momentum=0.9,
save_data=True, save_path='./Data', name='OpenAI-ES', disable_tqdm=True):
"""
Initialize OpenAI-ES Algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int or List[int], optional
Population size per task (must be even, default: None, will use 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 10000)
sigma : float, optional
Noise standard deviation (default: 1.0)
lr : float, optional
Learning rate (default: 1e-3)
momentum : float, optional
Momentum coefficient (default: 0.9)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'OpenAI_ES_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.sigma = sigma
self.lr = lr
self.momentum = momentum
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the OpenAI-ES Algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
max_nfes_per_task = par_list(self.max_nfes, nt)
# Initialize parameters for each task
params = []
for t in range(nt):
dim = problem.dims[t]
# Determine population size (must be even for antithetic sampling)
N = par_list(self.n, nt)[t]
if N % 2 != 0:
N = N + 1
# Normalize sigma based on problem range
# MATLAB: range = mean(Prob.Ub{t} - Prob.Lb{t});
# MATLAB: sigma{t} = Algo.sigma / range;
lb, ub = problem.bounds[t]
range_val = np.mean(ub - lb)
sigma = self.sigma / range_val
# Initialize mean and momentum
# MATLAB: x{t} = mean(unifrnd(zeros(Prob.D(t), N), ones(Prob.D(t), N)), 2);
x = np.mean(np.random.rand(dim, N), axis=1) # Random initialization, then average
v = np.zeros(dim) # Momentum vector
params.append({
'dim': dim, 'N': N, 'sigma': sigma, 'x': x, 'v': v
})
# Initialize tracking variables
nfes_per_task = [0] * nt
decs = [None] * nt
objs = [None] * nt
cons = [None] * nt
all_decs = [[] for _ in range(nt)]
all_objs = [[] for _ in range(nt)]
all_cons = [[] for _ in range(nt)]
pbar = tqdm(total=sum(max_nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
p = params[i]
# Antithetic Sampling
# MATLAB: Z_half = randn(Prob.D(t), N / 2);
# MATLAB: Z = [Z_half, -Z_half];
Z_half = np.random.randn(p['dim'], p['N'] // 2)
Z = np.hstack([Z_half, -Z_half]) # Shape: (dim, N)
# MATLAB: X = repmat(x{t}, 1, N) + sigma{t} * Z;
X = p['x'][:, np.newaxis] + p['sigma'] * Z # Shape: (dim, N)
# Decode samples (transpose to get (N, dim))
sample_decs = X.T
sample_decs = np.clip(sample_decs, 0, 1) # Boundary handling
# Also evaluate the mean
# MATLAB: mean_sample.Dec = x{t}';
mean_dec = p['x'][np.newaxis, :]
mean_dec = np.clip(mean_dec, 0, 1)
# Combine samples and mean for evaluation
all_sample_decs = np.vstack([sample_decs, mean_dec])
# Evaluate fitness
sample_objs, sample_cons = evaluation_single(problem, all_sample_decs, i)
# Separate mean evaluation from population
mean_obj = sample_objs[-1:]
mean_con = sample_cons[-1:]
sample_objs = sample_objs[:-1]
sample_cons = sample_cons[:-1]
# Update current population (including mean as the best)
decs[i] = np.vstack([mean_dec, sample_decs])
objs[i] = np.vstack([mean_obj, sample_objs])
cons[i] = np.vstack([mean_con, sample_cons])
nfes_per_task[i] += p['N'] + 1 # N samples + 1 mean
pbar.update(p['N'] + 1)
# Append to history
append_history(all_decs[i], decs[i], all_objs[i], objs[i], all_cons[i], cons[i])
# Centered rank shaping
# MATLAB: [~, sortIdx] = sort(fitness);
# MATLAB: ranks(sortIdx) = N - 1:-1:0; % Minimizing fitness
# MATLAB: shaped = ranks / (N - 1) - 0.5;
fitness = sample_objs.flatten()
sort_idx = np.argsort(fitness)
ranks = np.zeros(p['N'])
ranks[sort_idx] = np.arange(p['N'] - 1, -1, -1)
shaped = ranks / (p['N'] - 1) - 0.5
# Gradient estimation
# MATLAB: grad = (Z * shaped') / (N * sigma{t});
grad = (Z @ shaped) / (p['N'] * p['sigma'])
# Momentum update
# MATLAB: v{t} = Algo.momentum * v{t} + (1 - Algo.momentum) * grad;
p['v'] = self.momentum * p['v'] + (1 - self.momentum) * grad
# Update mean
# MATLAB: x{t} = x{t} + Algo.lr * v{t};
p['x'] = p['x'] + self.lr * p['v']
pbar.close()
runtime = time.time() - start_time
# Save results
results = build_save_results(
all_decs=all_decs,
all_objs=all_objs,
runtime=runtime,
max_nfes=nfes_per_task,
all_cons=all_cons,
bounds=problem.bounds,
save_path=self.save_path,
filename=self.name,
save_data=self.save_data
)
return results
def openai_es_generation(x: np.ndarray, sigma: float, N: int = None) -> tuple:
"""
Generate offspring population using OpenAI-ES antithetic sampling.
Parameters
----------
x : np.ndarray
Mean vector, shape (d,)
sigma : float
Noise standard deviation
N : int, optional
Number of offspring to generate (must be even, default: None)
Returns
-------
offdecs : np.ndarray
Offspring decision variables, shape (N, d)
Z : np.ndarray
Noise samples, shape (d, N)
"""
d = len(x)
# Default population size
if N is None:
N = 100
# Ensure N is even
if N % 2 != 0:
N = N + 1
# Antithetic sampling
Z_half = np.random.randn(d, N // 2)
Z = np.hstack([Z_half, -Z_half])
# Generate offspring
X = x[:, np.newaxis] + sigma * Z
offdecs = X.T
offdecs = np.clip(offdecs, 0, 1)
return offdecs, Z