"""
Surrogate-Assisted Cooperative Swarm Optimization (SA-COSO)
This module implements SA-COSO for expensive single-objective optimization problems.
References
----------
[1] Sun, Chaoli, et al. "Surrogate-assisted cooperative swarm optimization of high-dimensional expensive problems." IEEE Transactions on Evolutionary Computation 21.4 (2017): 644-660.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.13
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.mtop import MTOP
import warnings
warnings.filterwarnings("ignore")
[docs]
class SA_COSO:
"""
Surrogate-Assisted Cooperative Swarm Optimization for expensive optimization problems.
This algorithm uses two cooperative swarms:
1. FES-PSO: Small swarm with Fitness Estimation Strategy to reduce evaluations
2. RBF-SLPSO: Large swarm with RBF-assisted Social Learning PSO
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, n_fes=30, n_rbf=100,
mu=5, save_data=True, save_path='./Data',
name='SA-COSO', disable_tqdm=True):
"""
Initialize SA-COSO algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: n_fes + n_rbf)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 300)
n_fes : int, optional
Population size of FES-assisted PSO (default: 30)
n_rbf : int, optional
Population size of RBF-assisted SL-PSO (default: 100)
mu : int, optional
Total number of samples per iteration (default: 5, must be >= 2)
- (mu - 1) samples from FES-PSO
- 1 sample from RBF-SLPSO
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'SACOSO_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_fes = n_fes # FES-PSO population size
self.n_rbf = n_rbf # RBF-SLPSO population size
self.mu = mu # Total samples per iteration
self.n_sample_fes = mu - 1 # Samples from FES-PSO
self.n_sample_rbf = 1 # Samples from RBF-SLPSO (fixed to 1)
self.n_initial = n_initial if n_initial is not None else (n_fes + n_rbf)
self.max_nfes = max_nfes if max_nfes is not None else 300
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
# PSO parameters
self.c1 = 2.05 # Cognitive coefficient
self.c2 = 1.025 # Social coefficient (GbestFES)
self.c3 = 1.025 # Social coefficient (GbestRBF)
self.w = 0.7298 # Inertia weight (constriction factor)
# RBF parameters
self.max_node = 8
[docs]
def optimize(self):
"""
Execute the SA-COSO algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Initialize data structures for each task
hx = [decs[i].copy() for i in range(nt)]
hf = [objs[i].flatten().copy() for i in range(nt)]
# FES-PSO swarm
swarm_fes = []
vel_fes = []
pbest_fes = []
pbest_fes_val = []
gbest_fes = []
gbest_fes_val = []
# RBF-SLPSO swarm
swarm_rbf = []
delta_rbf = []
gbest_rbf = []
gbest_rbf_val = []
# Spread sum for RBF
spread_sum = [0.0] * nt
for i in range(nt):
dim = dims[i]
n_total = n_initial_per_task[i]
# Split initial population
n_fes_actual = min(self.n_fes, n_total)
n_rbf_actual = min(self.n_rbf, n_total - n_fes_actual)
# FES-PSO initialization
pos_fes = hx[i][:n_fes_actual].copy()
obj_fes = hf[i][:n_fes_actual].copy()
swarm_fes.append({'pos': pos_fes, 'obj': obj_fes})
vel_i = np.random.rand(n_fes_actual, dim) - 0.5
vel_fes.append(vel_i)
pbest_fes.append(pos_fes.copy())
pbest_fes_val.append(obj_fes.copy())
best_idx = np.argmin(obj_fes)
gbest_fes.append(pos_fes[best_idx].copy())
gbest_fes_val.append(obj_fes[best_idx])
# RBF-SLPSO initialization
if n_rbf_actual > 0:
pos_rbf = hx[i][n_fes_actual:n_fes_actual + n_rbf_actual].copy()
obj_rbf = hf[i][n_fes_actual:n_fes_actual + n_rbf_actual].copy()
else:
# Generate additional random samples if needed
pos_rbf = np.random.rand(self.n_rbf, dim)
obj_rbf_arr, _ = evaluation_single(problem, pos_rbf, i)
obj_rbf = obj_rbf_arr.flatten()
hx[i] = np.vstack([hx[i], pos_rbf])
hf[i] = np.concatenate([hf[i], obj_rbf])
nfes_per_task[i] += self.n_rbf
swarm_rbf.append({'pos': pos_rbf, 'obj': obj_rbf})
delta_i = np.random.rand(len(pos_rbf), dim) - 0.5
delta_rbf.append(delta_i)
best_idx_rbf = np.argmin(obj_rbf)
gbest_rbf.append(pos_rbf[best_idx_rbf].copy())
gbest_rbf_val.append(obj_rbf[best_idx_rbf])
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(nfes_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
iter_count = [0] * nt
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
dim = dims[i]
iter_count[i] += 1
# Determine global best across both swarms
if gbest_fes_val[i] < gbest_rbf_val[i]:
gbest = gbest_fes[i].copy()
else:
gbest = gbest_rbf[i].copy()
# Update spread for RBF
pos_range = np.max(hx[i], axis=0) - np.min(hx[i], axis=0)
spread_sum[i] += np.sqrt(np.sum(pos_range ** 2))
spread = spread_sum[i] / iter_count[i]
# Build RBF surrogate model
rbf_model = self._build_rbf_model(hx[i], hf[i], spread, dim)
# ===== FES-PSO Update =====
new_evals_fes = self._fes_pso_update(
i, problem, dim, rbf_model,
swarm_fes[i], vel_fes[i], pbest_fes[i], pbest_fes_val[i],
gbest_fes[i], gbest_fes_val[i], gbest_rbf[i],
hx[i], hf[i], self.n_sample_fes
)
# Update history and counters for FES-PSO
for pos, obj in new_evals_fes:
if nfes_per_task[i] >= max_nfes_per_task[i]:
break
hx[i] = np.vstack([hx[i], pos.reshape(1, -1)])
hf[i] = np.concatenate([hf[i], [obj]])
nfes_per_task[i] += 1
pbar.update(1)
# Update FES gbest
best_idx = np.argmin(pbest_fes_val[i])
if pbest_fes_val[i][best_idx] < gbest_fes_val[i]:
gbest_fes[i] = pbest_fes[i][best_idx].copy()
gbest_fes_val[i] = pbest_fes_val[i][best_idx]
# ===== RBF-SLPSO Update =====
if nfes_per_task[i] < max_nfes_per_task[i]:
# Rebuild RBF model with updated data
spread = spread_sum[i] / iter_count[i]
rbf_model = self._build_rbf_model(hx[i], hf[i], spread, dim)
new_evals_rbf = self._rbf_slpso_update(
i, problem, dim, rbf_model,
swarm_rbf[i], delta_rbf[i], gbest,
hx[i], hf[i], self.n_sample_rbf
)
# Update history for RBF-SLPSO
for pos, obj in new_evals_rbf:
if nfes_per_task[i] >= max_nfes_per_task[i]:
break
hx[i] = np.vstack([hx[i], pos.reshape(1, -1)])
hf[i] = np.concatenate([hf[i], [obj]])
nfes_per_task[i] += 1
pbar.update(1)
# Update RBF gbest
if obj < gbest_rbf_val[i]:
gbest_rbf[i] = pos.copy()
gbest_rbf_val[i] = obj
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [hx[i].copy() for i in range(nt)]
db_objs = [hf[i].reshape(-1, 1).copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=self.mu)
# Build and save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# Build RBF surrogate model
def _build_rbf_model(self, hx, hf, spread, dim):
if spread <= 0:
spread = 1.0
try:
rbf_interpolator = RBFInterpolator(hx, hf, kernel='gaussian', epsilon=1.0 / spread)
except:
rbf_interpolator = RBFInterpolator(hx, hf, kernel='thin_plate_spline')
def rbf_model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
pred = rbf_interpolator(x)
return pred.flatten()
return rbf_model
# FES-PSO update with fixed number of samples
def _fes_pso_update(self, task_idx, problem, dim, rbf_model,
swarm, vel, pbest, pbest_val,
gbest_fes, gbest_fes_val, gbest_rbf,
hx, hf, n_sample):
new_evals = []
n = len(swarm['pos'])
# PSO velocity and position update
r1 = np.random.rand(n, dim)
r2 = np.random.rand(n, dim)
r3 = np.random.rand(n, dim)
# Velocity update: influenced by pbest, gbest_fes, and gbest_rbf
vel_new = self.w * (vel +
self.c1 * r1 * (pbest - swarm['pos']) +
self.c2 * r2 * (gbest_fes - swarm['pos']) +
self.c3 * r3 * (gbest_rbf - swarm['pos']))
# Velocity clamping (±0.5 in [0,1] space, matching MATLAB ±0.5*(ub-lb))
vel_new = np.clip(vel_new, -0.5, 0.5)
vel[:] = vel_new
# Position update
pos_new = swarm['pos'] + vel_new
pos_new = np.clip(pos_new, 0.0, 1.0)
swarm['pos'] = pos_new
# Use surrogate model for fitness approximation
srgt_obj = rbf_model(pos_new)
swarm['obj'] = srgt_obj.copy()
# Select top n_sample particles by surrogate prediction (ascending order)
sorted_idx = np.argsort(srgt_obj)
# Evaluate top n_sample non-duplicate particles
eval_count = 0
for idx in sorted_idx:
if eval_count >= n_sample:
break
if not is_duplicate(pos_new[idx], hx):
obj_arr, _ = evaluation_single(problem, pos_new[idx:idx + 1], task_idx)
obj = obj_arr[0, 0]
swarm['obj'][idx] = obj
new_evals.append((pos_new[idx].copy(), obj))
eval_count += 1
# Update personal best
if obj < pbest_val[idx]:
pbest[idx] = pos_new[idx].copy()
pbest_val[idx] = obj
return new_evals
# RBF-assisted SL-PSO update with fixed number of samples
def _rbf_slpso_update(self, task_idx, problem, dim, rbf_model,
swarm, delta, gbest, hx, hf, n_sample):
new_evals = []
n = len(swarm['pos'])
# Combine swarm with historical best as demonstrators
n_demons = min(n, len(hf))
idx_sorted = np.argsort(hf)[:n_demons]
demons_pos = np.vstack([swarm['pos'], hx[idx_sorted]])
demons_obj = np.concatenate([swarm['obj'], hf[idx_sorted]])
# Social Learning PSO update
for j in range(n):
# Find better individuals (demonstrators)
better_mask = demons_obj < swarm['obj'][j]
better_idx = np.where(better_mask)[0]
if len(better_idx) > 0:
# Choose demonstrators for each dimension
chosen = np.zeros(dim)
if len(better_idx) == 1:
chosen = demons_pos[better_idx[0]]
elif len(better_idx) == 2:
mix = np.random.rand(dim) > 0.5
chosen[mix] = demons_pos[better_idx[0], mix]
chosen[~mix] = demons_pos[better_idx[1], ~mix]
else:
for d in range(dim):
rand_idx = better_idx[np.random.randint(len(better_idx))]
chosen[d] = demons_pos[rand_idx, d]
# Update velocity (delta) and position
delta[j] = np.random.rand(dim) * delta[j] + np.random.rand(dim) * (chosen - swarm['pos'][j])
delta[j] = np.clip(delta[j], -0.5, 0.5)
swarm['pos'][j] = swarm['pos'][j] + delta[j]
# Boundary handling
swarm['pos'] = np.clip(swarm['pos'], 0.0, 1.0)
# Update fitness using surrogate
swarm['obj'] = rbf_model(swarm['pos'])
# Select top n_sample particles by surrogate prediction
sorted_idx = np.argsort(swarm['obj'])
# Evaluate top n_sample non-duplicate particles
eval_count = 0
for idx in sorted_idx:
if eval_count >= n_sample:
break
if not is_duplicate(swarm['pos'][idx], hx):
obj_arr, _ = evaluation_single(problem, swarm['pos'][idx:idx + 1], task_idx)
obj = obj_arr[0, 0]
swarm['obj'][idx] = obj
new_evals.append((swarm['pos'][idx].copy(), obj))
eval_count += 1
return new_evals