"""
This script contains commonly used components for implementing algorithms.
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.10.18
Version: 1.0
"""
import numpy as np
import pickle
import os
from dataclasses import asdict
from scipy.spatial.distance import cdist
from sklearn.cluster import KMeans
from dataclasses import dataclass
import copy
from typing import Any, List, Tuple, Union, Optional
[docs]
@dataclass
class Results:
"""
Container for optimization results.
:no-index:
Attributes
----------
best_decs : List[np.ndarray]
Best decision variables for each task
best_objs : List[np.ndarray]
Best objective values for each task
all_decs : List[List[np.ndarray]]
Decision variables history for all tasks across generations
all_objs : List[List[np.ndarray]]
Objective values history for all tasks across generations
runtime : float
Total runtime in seconds
max_nfes : List[int]
Maximum function evaluations per task
best_cons : Optional[List[np.ndarray]]
Best constraint values for each task (None if unconstrained)
all_cons : Optional[List[List[np.ndarray]]]
Constraint values history for all tasks (None if unconstrained)
bounds : Optional[List[np.ndarray]]
Bounds for each task, where each element is a 2D array with shape (2, dim)
"""
best_decs: List[np.ndarray]
best_objs: List[np.ndarray]
all_decs: List[List[np.ndarray]]
all_objs: List[List[np.ndarray]]
runtime: float
max_nfes: List[int]
best_cons: Optional[List[np.ndarray]] = None
all_cons: Optional[List[List[np.ndarray]]] = None
bounds: Optional[List[Tuple[np.ndarray, np.ndarray]]] = None
[docs]
def build_save_results(
all_decs: List[List[np.ndarray]],
all_objs: List[List[np.ndarray]],
runtime: float,
max_nfes: List[int],
all_cons: Optional[List[List[np.ndarray]]] = None,
bounds: Optional[List[Tuple[np.ndarray, np.ndarray]]] = None,
save_path: Optional[str] = None,
filename: Optional[str] = None,
save_data: bool = True,
**kwargs
) -> Results:
"""
Extract best solutions, build results, and optionally save to file.
Automatically detects single-objective vs multi-objective tasks:
- Single-objective (n_objs=1): returns the best individual
- Multi-objective (n_objs>1): returns the entire final population (Pareto front)
Parameters
----------
all_decs : List[List[np.ndarray]]
Decision variables history for all tasks.
all_decs[i][g] has shape (n_samples, dim) for task i at generation g.
all_objs : List[List[np.ndarray]]
Objective values history for all tasks.
all_objs[i][g] has shape (n_samples, n_objs) for task i at generation g.
runtime : float
Total runtime in seconds
max_nfes : List[int]
Maximum function evaluations per task
all_cons : List[List[np.ndarray]], optional
Constraint values history for all tasks (default: None)
bounds : List[Tuple[np.ndarray, np.ndarray]], optional
Bounds (lower, upper) for each task (default: None)
save_path : str, optional
Directory path where the results will be saved (default: None)
filename : str, optional
Name of the output file without extension (default: None)
save_data : bool, optional
Whether to save the data to file (default: True)
**kwargs : dict
Additional data to include in the saved file
Returns
-------
results : Results
Results object containing best solutions and optimization history
"""
nt = len(all_decs)
best_decs = []
best_objs = []
best_cons = [] if all_cons is not None else None
for i in range(nt):
last_gen_objs = all_objs[i][-1]
last_gen_decs = all_decs[i][-1]
last_gen_cons = all_cons[i][-1] if all_cons is not None else None
n_objs = last_gen_objs.shape[1]
if n_objs == 1:
best_idx = np.argmin(last_gen_objs[:, 0])
best_objs.append(last_gen_objs[best_idx])
best_decs.append(last_gen_decs[best_idx])
if all_cons is not None:
best_cons.append(last_gen_cons[best_idx])
else:
best_objs.append(last_gen_objs)
best_decs.append(last_gen_decs)
if all_cons is not None:
best_cons.append(last_gen_cons)
results = Results(
best_decs=best_decs,
best_objs=best_objs,
all_decs=all_decs,
all_objs=all_objs,
runtime=runtime,
max_nfes=max_nfes,
best_cons=best_cons,
all_cons=all_cons,
bounds=bounds,
)
# Save results to file if path and filename are provided
if save_data and save_path is not None and filename is not None:
os.makedirs(save_path, exist_ok=True)
full_path = os.path.join(save_path, f"{filename}.pkl")
data_dict = asdict(results)
data_dict.update(kwargs)
with open(full_path, 'wb') as f:
pickle.dump(data_dict, f)
return results
def get_algorithm_information(algorithm_class: type, print_info: bool = True) -> dict:
"""
Get algorithm information from any algorithm class.
Parameters
----------
algorithm_class : type
Algorithm class with 'algorithm_information' attribute
print_info : bool, optional
Whether to print the information (default: True)
Returns
-------
algo_info : dict
Algorithm information dictionary
"""
if not hasattr(algorithm_class, 'algorithm_information'):
raise AttributeError(f"{algorithm_class.__name__} does not have 'algorithm_information' attribute")
algo_info = algorithm_class.algorithm_information
algo_name = algorithm_class.__name__
information = '\n'.join([f" - {k}: {v}" for k, v in algo_info.items()])
info = f"🤖️ {algo_name} \nAlgorithm Information:\n{information}"
if print_info:
print(info)
return algo_info
[docs]
def init_history(
decs: List[np.ndarray],
objs: List[np.ndarray],
cons: Optional[List[np.ndarray]] = None
) -> Union[tuple[List[List[np.ndarray]], List[List[np.ndarray]]],
tuple[List[List[np.ndarray]], List[List[np.ndarray]], List[List[np.ndarray]]]]:
"""
Initialize history storage for populations across generations.
Parameters
----------
decs : List[np.ndarray]
Initial decision variables for each task.
decs[i] has shape (n_samples, dim) for task i.
objs : List[np.ndarray]
Initial objective values for each task.
objs[i] has shape (n_samples, n_objs) for task i.
cons : List[np.ndarray], optional
Initial constraint values for each task (default: None).
cons[i] has shape (n_samples, n_cons) for task i.
Returns
-------
all_decs : List[List[np.ndarray]]
History storage for decision variables
all_objs : List[List[np.ndarray]]
History storage for objective values
all_cons : List[List[np.ndarray]], optional
History storage for constraint values (only returned if cons is not None)
"""
all_decs = [[d.copy()] for d in decs]
all_objs = [[o.copy()] for o in objs]
if cons is not None:
all_cons = [[c.copy()] for c in cons]
return all_decs, all_objs, all_cons
return all_decs, all_objs
def vstack_groups(*args: Union[List[np.ndarray], tuple[np.ndarray, ...], None]) -> Union[
np.ndarray, List[np.ndarray]]:
"""
Stack population arrays vertically.
Supports both single arrays (list of arrays) and tuples with variable number of arrays.
Parameters
----------
*args : Union[List[np.ndarray], tuple[np.ndarray, ...], None]
Variable number of arguments, each can be:
- List of arrays to stack vertically
- Tuple of arrays (any number) to stack
- None (will be skipped)
Returns
-------
results : Union[np.ndarray, List[np.ndarray]]
Stacked array if single input, or list of stacked arrays if multiple inputs
"""
results = []
for arg in args:
if arg is None:
continue
if isinstance(arg, tuple):
results.append(np.vstack(arg))
else:
results.append(np.vstack(arg))
return results if len(results) > 1 else results[0]
[docs]
def append_history(*pairs: Any) -> Tuple[list, ...]:
"""
Append current generation data to history storage.
Parameters
----------
*pairs : tuple
Alternating pairs of (history_list, current_data).
- history_list: List to store historical data
- current_data: Either a single np.ndarray (single task) or List[np.ndarray] (multi-task)
Returns
-------
results : tuple
All updated history lists (all_1, all_2, ...)
"""
results = []
for i in range(0, len(pairs), 2):
all_list, data = pairs[i], pairs[i + 1]
# Single task: input is a single array
if isinstance(data, np.ndarray):
all_list.append(data.copy())
# Multi-task: input is a list of arrays
else:
for j in range(len(data)):
all_list[j].append(data[j].copy())
results.append(all_list)
return tuple(results)
def select_by_index(index: np.ndarray, *arrays: Optional[np.ndarray]) -> Union[np.ndarray, List[Optional[np.ndarray]]]:
"""
Select rows from arrays by index.
Parameters
----------
index : np.ndarray
Indices to select, shape (n_selected,)
*arrays : np.ndarray or None
Variable number of arrays to select from.
Each array has shape (n_samples,) or (n_samples, dim).
None values are passed through unchanged.
Returns
-------
results : Union[np.ndarray, List[Optional[np.ndarray]]]
Selected array if single input, or list of selected arrays if multiple inputs.
None inputs return None in the corresponding position.
"""
results = []
for arr in arrays:
if arr is None: # 这一行必须在 arr.ndim 之前检查
results.append(None)
elif arr.ndim == 1:
results.append(arr[index])
else:
results.append(arr[index, :])
return results if len(results) > 1 else results[0]
def par_list(par: Union[int, List[int]], n_tasks: int) -> List[int]:
"""
Convert a parameter to a list for multi-task scenarios.
Parameters
----------
par : Union[int, List[int]]
Parameter value(s) - can be a single integer or a list
n_tasks : int
Number of tasks
Returns
-------
par_per_task : List[int]
List of parameter values, one for each task.
- If par is int: returns [par, par, ..., par] (n_tasks times)
- If par is list: returns the list as is
"""
if isinstance(par, int):
par_per_task = [par] * n_tasks
else:
par_per_task = list(par)
return par_per_task
def build_staircase_history(
db_decs: List[np.ndarray],
db_objs: List[np.ndarray],
k: int = 1,
db_cons: Optional[List[np.ndarray]] = None
) -> Union[
Tuple[List[List[np.ndarray]], List[List[np.ndarray]]],
Tuple[List[List[np.ndarray]], List[List[np.ndarray]], List[List[np.ndarray]]]
]:
"""
Convert flat database lists into staircase history structure.
For each task, creates an incrementally growing sequence of generations
where generation g contains min(k * (g+1), total) solutions from the database.
Parameters
----------
db_decs : List[np.ndarray]
Database of all decision variables per task. db_decs[i] has shape (n_i, dim_i).
db_objs : List[np.ndarray]
Database of all objective values per task. db_objs[i] has shape (n_i, n_objs).
k : int, optional
Step size controlling the staircase gradient (default: 1).
Generation g contains min(k * (g+1), total) solutions.
db_cons : List[np.ndarray], optional
Database of all constraint values per task (default: None).
Returns
-------
all_decs : List[List[np.ndarray]]
Staircased decision variable history.
all_objs : List[List[np.ndarray]]
Staircased objective value history.
all_cons : List[List[np.ndarray]], optional
Staircased constraint history (only returned if db_cons is not None).
"""
nt = len(db_decs)
all_decs = []
all_objs = []
all_cons = [] if db_cons is not None else None
for i in range(nt):
n = db_decs[i].shape[0]
task_decs = []
task_objs = []
task_cons = [] if db_cons is not None else None
size = k
while size < n:
task_decs.append(db_decs[i][:size].copy())
task_objs.append(db_objs[i][:size].copy())
if db_cons is not None:
task_cons.append(db_cons[i][:size].copy())
size += k
# Final generation with all points
task_decs.append(db_decs[i][:n].copy())
task_objs.append(db_objs[i][:n].copy())
if db_cons is not None:
task_cons.append(db_cons[i][:n].copy())
all_decs.append(task_decs)
all_objs.append(task_objs)
if db_cons is not None:
all_cons.append(task_cons)
if db_cons is not None:
return all_decs, all_objs, all_cons
return all_decs, all_objs
[docs]
def reorganize_initial_data(
data: List[np.ndarray],
nt: int,
n_initial_per_task: List[int],
interval: int = 1
) -> List[List[np.ndarray]]:
"""
Reorganize initial data by task and number of initial points.
Parameters
----------
data : List[np.ndarray]
Original data list, where data[i] is the data array for task i
nt : int
Number of tasks
n_initial_per_task : List[int]
List of number of initial points for each task
interval : int, optional
Interval for selecting points. Default is 1.
- interval=1: 1, 2, 3, 4, ... points
- interval=2: 2, 4, 6, 8, ... points
- interval=k: k, 2k, 3k, 4k, ... (plus remaining points if not divisible)
Returns
-------
all_data : List[List[np.ndarray]]
Reorganized data
"""
all_data = []
for i in range(nt):
task_data = []
n = n_initial_per_task[i]
# Store points at each interval
for j in range(interval, n + 1, interval):
task_data.append(data[i][:j].copy())
# Add remaining points if not divisible
if n % interval != 0:
task_data.append(data[i][:n].copy())
all_data.append(task_data)
return all_data
[docs]
def initialization(
problem: 'MTOP',
n: Union[int, List[int]],
method: str = 'random',
the_same: bool = False
) -> List[np.ndarray]:
"""
Initialize decision variable matrices for multiple tasks.
Parameters
----------
problem : MTOP
An instance of the MTOP class
n : Union[int, List[int]]
Number of samples per task.
- If int: same number of samples for all tasks
- If list: number of samples for each task, e.g., [30, 50]
method : str, optional
Sampling method: 'random' or 'lhs' (default: 'random')
the_same : bool, optional
If True, all tasks share the same sample points (default: False).
For tasks with different dimensions, samples are generated in the
maximum dimension and then truncated to each task's dimension.
Returns
-------
decs : List[np.ndarray]
List of decision variable matrices for each task.
decs[i] has shape (n_i, d_i) for task i.
"""
d = problem.dims
nt = problem.n_tasks
# Handle n: convert to array if it's an integer
if isinstance(n, int):
n_per_task = [n] * nt
else:
n_per_task = list(n)
if len(n_per_task) != nt:
raise ValueError(f"Length of n array ({len(n_per_task)}) must match number of tasks ({nt})")
decs = []
if the_same:
# Generate samples in the maximum dimension and truncate to each task's dimension
max_dim = max(d)
max_n = max(n_per_task)
if method.lower() == 'lhs':
# Generate LHS samples in maximum dimension
matrix_full = np.zeros((max_n, max_dim))
for j in range(max_dim):
intervals = np.linspace(0, 1, max_n + 1)
samples = np.random.uniform(intervals[:-1], intervals[1:])
np.random.shuffle(samples)
matrix_full[:, j] = samples
else:
# Generate random samples in maximum dimension
matrix_full = np.random.rand(max_n, max_dim)
# Truncate to each task's sample size and dimension
for i in range(nt):
n_samples = n_per_task[i]
dim = d[i]
decs.append(matrix_full[:n_samples, :dim])
else:
# Generate independent samples for each task
for i in range(nt):
n_samples = n_per_task[i]
if method.lower() == 'lhs':
dim = d[i]
matrix = np.zeros((n_samples, dim))
for j in range(dim):
intervals = np.linspace(0, 1, n_samples + 1)
samples = np.random.uniform(intervals[:-1], intervals[1:])
np.random.shuffle(samples)
matrix[:, j] = samples
else:
matrix = np.random.rand(n_samples, d[i])
decs.append(matrix)
return decs
[docs]
def evaluation(
problem,
decs: List[np.ndarray],
unified: bool = False,
fill_value: float = 0.0,
eval_objectives: Union[bool, List[Union[bool, int, List[int]]]] = True,
eval_constraints: Union[bool, List[Union[bool, int, List[int]]]] = True
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""
Evaluate a list of decision variable matrices on multiple tasks.
Parameters
----------
problem : MTOP
An instance of the MTOP class.
decs : list of ndarray
List of decision variable matrices for each task, shape [n, d_i], scaled in [0,1].
unified : bool, optional
If True, pad objectives to m_max and constraints to c_max. Default False.
fill_value : float, optional
Value used for padding in unified mode. Default 0.0.
eval_objectives : bool or list, optional
- True: evaluate all objectives for all tasks (default)
- False: skip objective evaluation for all tasks
- List: per-task specification, each element can be:
- True/False: evaluate all/none
- int: evaluate only the i-th objective
- List[int]: evaluate specified objectives
eval_constraints : bool or list, optional
- True: evaluate all constraints for all tasks (default)
- False: skip constraint evaluation for all tasks
- List: per-task specification, same format as eval_objectives
Returns
-------
objs : list of ndarray
List of objective value matrices for each task.
- Normal mode: shape [n, m_i] or [n, len(selected)]
- Unified mode: shape [n, m_max]
cons : list of ndarray
List of constraint value matrices for each task.
- Normal mode: shape [n, c_i] or [n, 1] if no constraints
- Unified mode: shape [n, c_max]
"""
nt = problem.n_tasks
bounds = problem.bounds
# Expand eval_objectives/eval_constraints to per-task lists
if not isinstance(eval_objectives, list) or len(eval_objectives) != nt:
obj_modes = [eval_objectives] * nt
else:
obj_modes = eval_objectives
if not isinstance(eval_constraints, list) or len(eval_constraints) != nt:
con_modes = [eval_constraints] * nt
else:
con_modes = eval_constraints
# Get max dimensions for unified mode
m_max = problem.m_max if unified else None
c_max = problem.c_max if unified else None
objs = []
cons = []
for i in range(nt):
# Scale decision variables from [0,1] to real bounds
lb, ub = bounds[i]
decs_real = decs[i] * (ub - lb) + lb
# Evaluate task
objectives, constraints = problem.evaluate_task(
i, decs_real,
eval_objectives=obj_modes[i],
eval_constraints=con_modes[i]
)
n_samples = decs[i].shape[0]
n_con = problem.get_n_constraints(i)
# Handle no constraints case
if con_modes[i] is not False and n_con == 0:
constraints = np.zeros((n_samples, 1), dtype=np.float64)
# Apply unified mode padding
if unified:
objectives = _pad_to_size(objectives, m_max, fill_value)
constraints = _pad_to_size(constraints, c_max, fill_value)
objs.append(objectives)
cons.append(constraints)
return objs, cons
def evaluation_single(
problem,
decs: np.ndarray,
index: int,
unified: bool = False,
fill_value: float = 0.0,
eval_objectives: Union[bool, int, List[int]] = True,
eval_constraints: Union[bool, int, List[int]] = True
) -> Tuple[np.ndarray, np.ndarray]:
"""
Evaluate decision variables on a specific task.
Parameters
----------
problem : MTOP
An instance of the MTOP class.
decs : ndarray
Decision variable matrix for the task, shape [n, d], scaled in [0,1].
index : int
Index of the task to evaluate.
unified : bool, optional
If True, pad objectives to m_max and constraints to c_max. Default False.
fill_value : float, optional
Value used for padding in unified mode. Default 0.0.
eval_objectives : bool, int, or list of int, optional
- True: evaluate all objectives (default)
- False: skip objective evaluation
- int: evaluate only the i-th objective
- List[int]: evaluate specified objectives
eval_constraints : bool, int, or list of int, optional
- True: evaluate all constraints (default)
- False: skip constraint evaluation
- int: evaluate only the i-th constraint
- List[int]: evaluate specified constraints
Returns
-------
objs : ndarray
Objective values for the task.
- Normal mode: shape [n, m] or [n, len(selected)]
- Unified mode: shape [n, m_max]
cons : ndarray
Constraint values for the task.
- Normal mode: shape [n, c] or [n, 1] if no constraints
- Unified mode: shape [n, c_max]
"""
decs = np.atleast_2d(decs)
n_samples = decs.shape[0]
# Scale decision variables from [0,1] to real bounds
bounds = problem.bounds
lb, ub = bounds[index]
decs_real = decs * (ub - lb) + lb
# Evaluate task
objs, cons = problem.evaluate_task(
index, decs_real,
eval_objectives=eval_objectives,
eval_constraints=eval_constraints
)
n_con = problem.get_n_constraints(index)
# Handle no constraints case
if eval_constraints is not False and n_con == 0:
cons = np.zeros((n_samples, 1), dtype=np.float64)
# Apply unified mode padding
if unified:
m_max = problem.m_max
c_max = problem.c_max
objs = _pad_to_size(objs, m_max, fill_value)
cons = _pad_to_size(cons, c_max, fill_value)
return objs, cons
def _pad_to_size(arr: np.ndarray, target_size: int, fill_value: float) -> np.ndarray:
"""
Pad array to target size along axis 1.
Parameters
----------
arr : np.ndarray
Input array to pad, shape (n_samples, dim)
target_size : int
Target size for axis 1
fill_value : float
Value to use for padding
Returns
-------
padded_arr : np.ndarray
Padded array, shape (n_samples, target_size).
Returns original array if already at or above target size.
"""
if arr.shape[1] >= target_size:
return arr
pad_width = target_size - arr.shape[1]
return np.pad(arr, ((0, 0), (0, pad_width)), mode='constant', constant_values=fill_value)
def crossover(
par_dec1: np.ndarray,
par_dec2: np.ndarray,
mu: float = 2
) -> tuple[np.ndarray, np.ndarray]:
"""
Perform Simulated Binary Crossover (SBX) on two decision vectors.
Parameters
----------
par_dec1 : np.ndarray
First parent decision vector, values scaled in [0, 1], shape (dim,)
par_dec2 : np.ndarray
Second parent decision vector, values scaled in [0, 1], shape (dim,)
mu : float, optional
Distribution index for crossover (default: 2)
Returns
-------
off_dec1 : np.ndarray
First offspring decision vector, values clipped to [0, 1], shape (dim,)
off_dec2 : np.ndarray
Second offspring decision vector, values clipped to [0, 1], shape (dim,)
"""
d = len(par_dec1)
u = np.random.rand(d)
beta = np.zeros(d)
mask = u <= 0.5
beta[mask] = (2 * u[mask]) ** (1 / (mu + 1))
beta[~mask] = (2 * (1 - u[~mask])) ** (-1 / (mu + 1))
beta *= (-1) ** np.random.randint(0, 2, size=d)
beta[np.random.rand(d) < 0.5] = 1
off_dec1 = 0.5 * ((1 + beta) * par_dec1 + (1 - beta) * par_dec2)
off_dec2 = 0.5 * ((1 + beta) * par_dec2 + (1 - beta) * par_dec1)
off_dec1 = np.clip(off_dec1, 0, 1)
off_dec2 = np.clip(off_dec2, 0, 1)
return off_dec1, off_dec2
def mutation(dec: np.ndarray, mu: float = 5) -> np.ndarray:
"""
Perform polynomial mutation on a decision vector.
Parameters
----------
dec : np.ndarray
Parent decision vector, values scaled in [0, 1], shape (dim,)
mu : float, optional
Distribution index for mutation (default: 5)
Returns
-------
mutated_dec : np.ndarray
Mutated decision vector, values clipped to [0, 1], shape (dim,)
"""
d = len(dec)
mutated_dec = dec.copy()
prob_m = 1 / d
for i in range(d):
if np.random.rand() < prob_m:
u = np.random.rand()
if u <= 0.5:
delta = ((2 * u + (1 - 2 * u) * (1 - mutated_dec[i]) ** (mu + 1))) ** (1 / (mu + 1)) - 1
mutated_dec[i] += delta
else:
delta = 1 - (2 * (1 - u) + 2 * (u - 0.5) * mutated_dec[i] ** (mu + 1)) ** (1 / (mu + 1))
mutated_dec[i] += delta
mutated_dec = np.clip(mutated_dec, 0, 1)
return mutated_dec
[docs]
def ga_generation(parents: np.ndarray, muc: float, mum: float) -> np.ndarray:
"""
Generate offspring population using genetic algorithm operators.
Applies simulated binary crossover (SBX) and polynomial mutation
to create offspring from parent population.
Parameters
----------
parents : np.ndarray
Parent population, shape (n, d)
muc : float
Distribution index for crossover
mum : float
Distribution index for mutation
Returns
-------
offdecs : np.ndarray
Offspring decision variables, shape (n, d)
"""
n, d = parents.shape
offdecs = np.zeros((0, d))
np.random.shuffle(parents)
num_pairs = n // 2
# Process pairs
for j in range(num_pairs):
offdec1, offdec2 = crossover(parents[j, :], parents[num_pairs + j, :], mu=muc)
offdec1 = mutation(offdec1, mu=mum)
offdec2 = mutation(offdec2, mu=mum)
offdecs = np.vstack((offdecs, offdec1, offdec2))
# Handle odd number of parents
if n % 2 == 1:
last_parent = parents[-1, :]
random_idx = np.random.randint(0, n - 1)
offdec1, _ = crossover(last_parent, parents[random_idx, :], mu=muc)
offdec1 = mutation(offdec1, mu=mum)
offdecs = np.vstack((offdecs, offdec1))
return offdecs
[docs]
def de_generation(parents: np.ndarray, F: float, CR: float) -> np.ndarray:
"""
Generate offspring for a population using Differential Evolution (DE).
Uses DE/rand/1/bin strategy: random base vector, one difference vector,
and binomial crossover.
Parameters
----------
parents : np.ndarray
Array of parent solutions, shape (n, d)
F : float
Differential weight (mutation scale factor)
CR : float
Crossover rate in [0, 1] for binomial crossover
Returns
-------
offdecs : np.ndarray
Offspring array, shape (n, d), clipped to [0, 1]
"""
n, d = parents.shape
offdecs = np.zeros((n, d), dtype=float)
for j in range(n):
# Choose 3 distinct indices != j
indices = np.arange(n)
indices = indices[indices != j]
a, b, c = np.random.choice(indices, 3, replace=False)
x_1, x_2, x_3 = parents[a], parents[b], parents[c]
# DE mutation (rand/1)
mutant = x_1 + F * (x_2 - x_3)
# Binomial crossover (ensure at least one gene from mutant)
trial = mutant.copy()
replace_mask = np.random.rand(d) > CR
replace_mask[np.random.randint(d)] = False
trial[replace_mask] = parents[j][replace_mask]
# Clip to bounds
offdecs[j] = np.clip(trial, 0.0, 1.0)
return offdecs
def space_transfer(
problem: 'MTOP',
decs: List[np.ndarray],
objs: Optional[List[np.ndarray]] = None,
cons: Optional[List[np.ndarray]] = None,
type: str = 'real',
padding: str = 'zero'
) -> Union[List[np.ndarray],
Tuple[List[np.ndarray], List[np.ndarray]],
Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]]:
"""
Transfer decision variables, objectives, and constraints between unified and real spaces.
Parameters
----------
problem : MTOP
An instance of the MTOP class containing task configuration
decs : List[np.ndarray]
List of decision variable matrices.
- Real space: decs[i] has shape (n_i, d_i)
- Unified space: decs[i] has shape (n_i, d_max)
objs : List[np.ndarray], optional
List of objective value matrices (default: None).
- Real space: objs[i] has shape (n_i, m_i)
- Unified space: objs[i] has shape (n_i, m_max)
cons : List[np.ndarray], optional
List of constraint value matrices (default: None).
- Real space: cons[i] has shape (n_i, c_i)
- Unified space: cons[i] has shape (n_i, c_max)
type : str, optional
Transfer type (default: 'real'):
- 'uni': Pad matrices with zeros or random values to the maximum dimension (Unified Space)
- 'real': Truncate matrices back to their original dimensions (Real Space)
padding : str, optional
Padding strategy when type='uni' (default: 'zero'):
- 'zero': Pad with zeros
- 'random': Pad with random values uniformly distributed in [0, 1]
Returns
-------
new_decs : List[np.ndarray]
Returned if objs and cons are None
(new_decs, new_objs) : tuple[List[np.ndarray], List[np.ndarray]]
Returned if objs is provided but cons is None
(new_decs, new_objs, new_cons) : tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]
Returned if both objs and cons are provided
Notes
-----
The padding parameter only affects the 'uni' transfer type. When type='real',
padding is ignored as truncation is performed instead.
Fix (2025.12.22): Properly handles cases where some tasks have constraints and others don't.
When ncons[i] = 0, ensures consistent shape (n, c_max) in unified space.
"""
# Copy lists to avoid modifying the original input references
new_decs = [d.copy() for d in decs]
new_objs = [o.copy() for o in objs] if objs is not None else None
new_cons = [c.copy() for c in cons] if cons is not None else None
n_tasks = problem.n_tasks
dims = problem.dims
nobjs = problem.n_objs
ncons = problem.n_cons
if type == 'uni':
# Handle Decision Variables (Padding)
d_max = np.max(dims)
for i in range(n_tasks):
n_samples = new_decs[i].shape[0]
dif = d_max - dims[i]
if dif > 0:
if padding == 'random':
pad_values = np.random.rand(n_samples, dif)
elif padding == 'mid':
pad_values = np.full((n_samples, dif), 0.5)
else: # padding == 'zero'
pad_values = np.zeros((n_samples, dif))
new_decs[i] = np.hstack([new_decs[i], pad_values])
# Handle Objectives (Padding)
if new_objs is not None:
m_max = np.max(nobjs)
for i in range(n_tasks):
n_samples = new_objs[i].shape[0]
dif = m_max - nobjs[i]
if dif > 0:
if padding == 'random':
pad_values = np.random.rand(n_samples, dif)
elif padding == 'mid':
pad_values = np.full((n_samples, dif), 0.5)
else: # padding == 'zero'
pad_values = np.zeros((n_samples, dif))
new_objs[i] = np.hstack([new_objs[i], pad_values])
# Handle Constraints (Padding) - FIXED VERSION
if new_cons is not None:
c_max = np.max(ncons)
if c_max > 0: # Only process if at least one task has constraints
for i in range(n_tasks):
n_samples = new_cons[i].shape[0]
current_ncons = ncons[i]
# Get current constraint shape
if current_ncons == 0:
# Task has no constraints: create matrix with c_max columns filled with zeros
new_cons[i] = np.zeros((n_samples, c_max))
else:
# Task has constraints: pad if necessary
dif = c_max - current_ncons
if dif > 0:
if padding == 'random':
pad_values = np.random.rand(n_samples, dif)
elif padding == 'mid':
pad_values = np.full((n_samples, dif), 0.5)
else: # padding == 'zero'
pad_values = np.zeros((n_samples, dif))
new_cons[i] = np.hstack([new_cons[i], pad_values])
else:
# No task has constraints: ensure all constraint matrices are empty with same shape
for i in range(n_tasks):
n_samples = new_cons[i].shape[0]
new_cons[i] = np.zeros((n_samples, 0))
elif type == 'real':
# Handle Decision Variables (Truncation)
for i in range(n_tasks):
new_decs[i] = new_decs[i][:, :dims[i]]
# Handle Objectives (Truncation)
if new_objs is not None:
for i in range(n_tasks):
new_objs[i] = new_objs[i][:, :nobjs[i]]
# Handle Constraints (Truncation) - FIXED VERSION
if new_cons is not None:
for i in range(n_tasks):
target_ncons = ncons[i]
if target_ncons == 0:
# Task should have no constraints: return empty constraint matrix
n_samples = new_cons[i].shape[0]
new_cons[i] = np.zeros((n_samples, 0))
else:
# Task has constraints: truncate to original size
new_cons[i] = new_cons[i][:, :target_ncons]
# Construct return values based on provided arguments
if objs is None and cons is None:
return new_decs
results = [new_decs]
if objs is not None:
results.append(new_objs)
if cons is not None:
results.append(new_cons)
return tuple(results)
[docs]
def nd_sort(objs: np.ndarray, *args) -> Tuple[np.ndarray, int]:
"""
Perform non-dominated sorting on a population of objective values.
Parameters
----------
objs : np.ndarray
Objective value matrix, shape (n, m)
*args : tuple
Optional arguments:
- (n_sort,): Number of solutions to sort
- (cons, n_sort): Constraint matrix and number of solutions to sort
Returns
-------
front_no : np.ndarray
Non-dominated front number for each solution, shape (n,)
max_fno : int
Maximum front number assigned
"""
pop_obj = objs.copy()
n, m = pop_obj.shape
# Parse arguments
if len(args) == 1:
# nd_sort(objs, n_sort)
n_sort = args[0]
elif len(args) == 2:
# nd_sort(objs, cons, n_sort)
pop_con = args[0]
n_sort = args[1]
# Handle constraints using constrained domination
if pop_con is not None:
infeasible = np.any(pop_con > 0, axis=1)
if np.any(infeasible):
max_obj = np.max(pop_obj, axis=0)
constraint_violation = np.sum(np.maximum(0, pop_con[infeasible, :]), axis=1)
pop_obj[infeasible, :] = max_obj + constraint_violation[:, np.newaxis]
else:
raise ValueError("Invalid number of arguments. Use nd_sort(objs, n_sort) or nd_sort(objs, cons, n_sort)")
# Find unique rows and their locations
unique_obj, inverse_indices = np.unique(pop_obj, axis=0, return_inverse=True)
# Count occurrences of each unique row
table = np.bincount(inverse_indices, minlength=len(unique_obj))
n_unique, m = unique_obj.shape
front_no = np.full(n_unique, np.inf)
max_fno = 0
# Continue until enough solutions are sorted
while np.sum(table[front_no < np.inf]) < min(n_sort, len(inverse_indices)):
max_fno += 1
for i in range(n_unique):
if front_no[i] == np.inf:
dominated = False
# Check domination by solutions in current front
for j in range(i - 1, -1, -1):
if front_no[j] == max_fno:
# Check if solution j dominates solution i
m_idx = 1
while m_idx < m and unique_obj[i, m_idx] >= unique_obj[j, m_idx]:
m_idx += 1
dominated = (m_idx == m)
if dominated or m == 2:
break
if not dominated:
front_no[i] = max_fno
# Map back to original indices
front_no = front_no[inverse_indices]
return front_no, max_fno
[docs]
def crowding_distance(pop_obj: np.ndarray, front_no: Optional[np.ndarray] = None) -> np.ndarray:
"""
Calculate the crowding distance for a population of solutions.
Parameters
----------
pop_obj : np.ndarray
Objective value matrix, shape (n, m), where n is the number of
solutions and m is the number of objectives
front_no : np.ndarray, optional
Non-dominated front number for each solution, shape (n,).
If not provided, all solutions are assumed to belong to the same front.
Returns
-------
crowd_dis : np.ndarray
Crowding distance for each solution, shape (n,).
Boundary solutions are assigned infinite distance.
"""
n, m = pop_obj.shape
# If front_no is not provided, assume all solutions are in the same front
if front_no is None:
front_no = np.ones(n)
crowd_dis = np.zeros(n)
# Get all fronts except inf
fronts = np.setdiff1d(np.unique(front_no), [np.inf])
# Calculate crowding distance for each front
for f in fronts:
# Get indices of solutions in current front
front = np.where(front_no == f)[0]
# Skip if front has less than 2 solutions
if len(front) < 2:
crowd_dis[front] = np.inf
continue
# Get max and min values for each objective in this front
fmax = np.max(pop_obj[front, :], axis=0)
fmin = np.min(pop_obj[front, :], axis=0)
# Calculate crowding distance for each objective
for i in range(m):
# Sort solutions by i-th objective
rank = np.argsort(pop_obj[front, i])
# Boundary solutions get infinite crowding distance
crowd_dis[front[rank[0]]] = np.inf
crowd_dis[front[rank[-1]]] = np.inf
# Calculate crowding distance for intermediate solutions
for j in range(1, len(front) - 1):
if fmax[i] - fmin[i] > 0:
crowd_dis[front[rank[j]]] += (
(pop_obj[front[rank[j + 1]], i] - pop_obj[front[rank[j - 1]], i]) /
(fmax[i] - fmin[i])
)
return crowd_dis
[docs]
def tournament_selection(
K: int,
N: int,
*fitness_arrays: np.ndarray,
rng: Optional[np.random.Generator] = None
) -> np.ndarray:
"""
Perform tournament selection on a population.
Parameters
----------
K : int
Tournament size. If K <= 1, selection is purely random with replacement.
N : int
Number of individuals to select
*fitness_arrays : np.ndarray
One or more arrays of fitness values. Higher fitness is considered better.
rng : np.random.Generator, optional
NumPy random number generator. If None, a new default RNG is created.
Returns
-------
selected : np.ndarray
Array of selected individual indices, shape (N,), dtype=int
"""
# Convert inputs to 1D numpy arrays and validate lengths
fitnesss = []
for arr in fitness_arrays:
a = np.asarray(arr).ravel()
fitnesss.append(a)
pop_size = fitnesss[0].shape[0]
if rng is None:
rng = np.random.default_rng()
if K <= 1:
# Purely random selection with replacement
selected = rng.integers(0, pop_size, size=N, dtype=int)
else:
keys = tuple(fitnesss[::-1])
order = np.lexsort(keys)
ranks = np.empty(pop_size, dtype=int)
ranks[order] = np.arange(pop_size)
# Sample K contestants for each of N tournaments (with replacement)
parents = rng.integers(0, pop_size, size=(K, N))
parent_ranks = ranks[parents]
winners_pos = np.argmin(parent_ranks, axis=0)
selected = parents[winners_pos, np.arange(N)]
return selected.astype(int)
def selection_elit(
objs: np.ndarray,
n: int,
cons: Optional[np.ndarray] = None,
epsilon: float = 0
) -> np.ndarray:
"""
Elite selection for single-objective (optionally constrained) optimization.
Parameters
----------
objs : np.ndarray
Objective values of the population (smaller is better), shape (N, 1)
n : int
Number of individuals to select
cons : np.ndarray, optional
Constraint violation values, each column is a constraint, shape (N, c).
Default is None.
epsilon : float, optional
Threshold for constraint violations to be treated as feasible (default: 0)
Returns
-------
indices : np.ndarray
Selected individual indices (0-based), shape (n,)
"""
N = objs.shape[0]
if cons is not None:
CVs = np.sum(np.maximum(0, cons), axis=1)
CVs[CVs < epsilon] = 0
else:
CVs = np.zeros(N)
rank = np.lexsort((objs.flatten(), CVs.flatten()))
indices = rank[:n]
return indices
def trim_excess_evaluations(
all_decs: List[List[np.ndarray]],
all_objs: List[List[np.ndarray]],
nt: int,
max_nfes_per_task: List[int],
nfes_per_task: List[int],
all_cons: Optional[List[List[np.ndarray]]] = None
) -> Union[
Tuple[List[List[np.ndarray]], List[List[np.ndarray]], List[int]],
Tuple[List[List[np.ndarray]], List[List[np.ndarray]], List[int], List[List[np.ndarray]]]
]:
"""
Trim excess evaluations when nfes_per_task exceeds max_nfes_per_task
Parameters
----------
all_decs : List[List[np.ndarray]]
Decision variables history for all tasks
all_objs : List[List[np.ndarray]]
Objective values history for all tasks
nt : int
Number of tasks
max_nfes_per_task : List[int]
Maximum evaluation budget per task
nfes_per_task : List[int]
Actual evaluations used per task
all_cons : List[List[np.ndarray]], optional
Constraint values history for all tasks
Returns
-------
all_decs : List[List[np.ndarray]]
Trimmed decision variables history
all_objs : List[List[np.ndarray]]
Trimmed objective values history
nfes_per_task : List[int]
Updated evaluation counts per task (after trimming)
all_cons : List[List[np.ndarray]], optional
Trimmed constraint values history (only if input all_cons is provided)
"""
# Deep copy to avoid modifying original data
all_decs_trimmed: List[List[np.ndarray]] = copy.deepcopy(all_decs)
all_objs_trimmed: List[List[np.ndarray]] = copy.deepcopy(all_objs)
nfes_per_task_updated: List[int] = copy.deepcopy(nfes_per_task)
has_cons = all_cons is not None
all_cons_trimmed: Optional[List[List[np.ndarray]]] = None
if has_cons:
all_cons_trimmed = copy.deepcopy(all_cons)
# Process each task
for i in range(nt):
# Calculate excess evaluations
excess = nfes_per_task_updated[i] - max_nfes_per_task[i]
if excess > 0:
# Check if there are generations to trim
if len(all_decs_trimmed[i]) == 0:
continue
original_excess = excess
# Get the last generation
last_gen_decs = all_decs_trimmed[i][-1]
last_gen_objs = all_objs_trimmed[i][-1]
# Check if excess is greater than or equal to all points in last generation
if excess >= len(last_gen_decs):
# Remove entire last generation(s) if needed
while excess > 0 and len(all_decs_trimmed[i]) > 0:
last_gen_size = len(all_decs_trimmed[i][-1])
if excess >= last_gen_size:
# Remove entire last generation
all_decs_trimmed[i].pop()
all_objs_trimmed[i].pop()
if has_cons and all_cons_trimmed is not None:
all_cons_trimmed[i].pop()
excess -= last_gen_size
else:
# Trim partial last generation
all_decs_trimmed[i][-1] = all_decs_trimmed[i][-1][:-excess]
all_objs_trimmed[i][-1] = all_objs_trimmed[i][-1][:-excess]
if has_cons and all_cons_trimmed is not None:
all_cons_trimmed[i][-1] = all_cons_trimmed[i][-1][:-excess]
excess = 0
else:
# Trim the last 'excess' rows from the last generation
all_decs_trimmed[i][-1] = last_gen_decs[:-excess]
all_objs_trimmed[i][-1] = last_gen_objs[:-excess]
if has_cons and all_cons_trimmed is not None:
last_gen_cons = all_cons_trimmed[i][-1]
all_cons_trimmed[i][-1] = last_gen_cons[:-excess]
# Update nfes_per_task: subtract the number of trimmed evaluations
nfes_per_task_updated[i] -= original_excess
# Return results based on whether constraints are present
if has_cons and all_cons_trimmed is not None:
return all_decs_trimmed, all_objs_trimmed, nfes_per_task_updated, all_cons_trimmed
else:
return all_decs_trimmed, all_objs_trimmed, nfes_per_task_updated
def normalize(
data: Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]],
axis: int = 0,
method: str = 'minmax'
) -> Tuple[
Union[np.ndarray, List[np.ndarray]],
Union[np.ndarray, List[np.ndarray]],
Union[np.ndarray, List[np.ndarray]]
]:
"""
Normalize input data (arrays or matrices).
Parameters
----------
data : Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]]
Supports the following input formats:
- Single 1D array: [1, 2, 3]
- Single 2D matrix: [[1,2], [3,4]]
- List of multiple arrays/matrices: [[1,2,3], [4,5,6]] or [matrix1, matrix2]
axis : int, optional
Axis along which to normalize (only applies to 2D+ data), default is 0.
- 0: Column-wise normalization (default, recommended for multi-objective optimization)
- 1: Row-wise normalization
- For 1D arrays, global normalization is always used
method : str, optional
Normalization method, default is 'minmax'.
- 'minmax': Min-max normalization, scales to [0, 1] (default)
- 'zscore': Z-score normalization, mean=0, std=1
Returns
-------
normalized : Union[np.ndarray, List[np.ndarray]]
Normalized result (same format as input)
stat1 : Union[np.ndarray, List[np.ndarray]]
Min values (minmax) or mean values (zscore)
stat2 : Union[np.ndarray, List[np.ndarray]]
Max values (minmax) or std values (zscore)
"""
if method not in ['minmax', 'zscore']:
raise ValueError("method must be 'minmax' or 'zscore'")
if axis not in [0, 1]:
raise ValueError("axis must be 0 or 1")
# Determine if input is a single array/matrix or a list of arrays
def is_single_input(data):
if isinstance(data, np.ndarray) and data.dtype != object:
return True
if isinstance(data, list) and len(data) > 0 and np.isscalar(data[0]):
return True
return False
single_input = is_single_input(data)
if single_input:
data_list = [np.array(data)]
else:
data_list = [np.array(d) for d in data]
normalized = []
stat1_list = [] # min or mean
stat2_list = [] # max or std
for arr in data_list:
arr = np.array(arr)
arr_ndim = arr.ndim
if arr_ndim == 1:
# 1D array: global normalization
if method == 'minmax':
stat1 = np.min(arr)
stat2 = np.max(arr)
range_val = stat2 - stat1
if range_val < 1e-10:
range_val = 1.0
arr_norm = (arr - stat1) / range_val
else: # zscore
stat1 = np.mean(arr)
stat2 = np.std(arr)
stat2_safe = stat2 if stat2 >= 1e-10 else 1.0
arr_norm = (arr - stat1) / stat2_safe
normalized.append(arr_norm)
stat1_list.append(stat1)
stat2_list.append(stat2)
else:
# 2D+ array: normalize along specified axis
if method == 'minmax':
stat1 = np.min(arr, axis=axis, keepdims=True)
stat2 = np.max(arr, axis=axis, keepdims=True)
range_val = stat2 - stat1
range_val = np.where(range_val < 1e-10, 1.0, range_val)
arr_norm = (arr - stat1) / range_val
else: # zscore
stat1 = np.mean(arr, axis=axis, keepdims=True)
stat2 = np.std(arr, axis=axis, keepdims=True)
stat2_safe = np.where(stat2 < 1e-10, 1.0, stat2)
arr_norm = (arr - stat1) / stat2_safe
# Remove keepdims dimension
stat1 = np.squeeze(stat1)
stat2 = np.squeeze(stat2)
normalized.append(arr_norm)
stat1_list.append(stat1)
stat2_list.append(stat2)
# Return single result if input was single
if single_input:
return normalized[0], stat1_list[0], stat2_list[0]
return normalized, stat1_list, stat2_list
def denormalize(
data: Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]],
stat1: Union[np.ndarray, List[np.ndarray]],
stat2: Union[np.ndarray, List[np.ndarray]],
axis: int = 0,
method: str = 'minmax'
) -> Union[np.ndarray, List[np.ndarray]]:
"""
Inverse normalization to restore original scale.
Parameters
----------
data : Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]]
Normalized data (same format as normalize() output).
- Single 1D array: [0, 0.25, 0.5, 0.75, 1]
- Single 2D matrix: [[0, 0], [0.5, 0.5], [1, 1]]
- List of multiple arrays/matrices
stat1 : Union[np.ndarray, List[np.ndarray]]
Min values (minmax) or mean values (zscore) from normalize()
stat2 : Union[np.ndarray, List[np.ndarray]]
Max values (minmax) or std values (zscore) from normalize()
axis : int, optional
Must match the axis used in normalize(), default is 0.
- 0: Column-wise normalization (default)
- 1: Row-wise normalization
method : str, optional
Must match the method used in normalize(), default is 'minmax'.
- 'minmax': Inverse min-max normalization (default)
- 'zscore': Inverse z-score normalization
Returns
-------
restored : Union[np.ndarray, List[np.ndarray]]
Restored data in original scale (same format as input)
"""
if method not in ['minmax', 'zscore']:
raise ValueError("method must be 'minmax' or 'zscore'")
if axis not in [0, 1]:
raise ValueError("axis must be 0 or 1")
# Determine if input is a single array/matrix or a list of arrays
def is_single_input(data):
if isinstance(data, np.ndarray) and data.dtype != object:
return True
if isinstance(data, list) and len(data) > 0 and np.isscalar(data[0]):
return True
return False
single_input = is_single_input(data)
if single_input:
data_list = [np.array(data)]
stat1_list = [stat1]
stat2_list = [stat2]
else:
data_list = [np.array(d) for d in data]
stat1_list = stat1
stat2_list = stat2
restored = []
for arr, s1, s2 in zip(data_list, stat1_list, stat2_list):
arr = np.array(arr)
arr_ndim = arr.ndim
# Ensure at least 2D for unified processing
arr_2d = np.atleast_2d(arr)
# Prepare stats for broadcasting
s1 = np.atleast_2d(s1)
s2 = np.atleast_2d(s2)
if axis == 0:
# For column-wise, stats should be (1, n_cols)
if s1.shape[0] != 1:
s1 = s1.reshape(1, -1)
s2 = s2.reshape(1, -1)
elif axis == 1:
# For row-wise, stats should be (n_rows, 1)
if s1.shape[1] != 1:
s1 = s1.reshape(-1, 1)
s2 = s2.reshape(-1, 1)
if method == 'minmax':
# Inverse min-max: x = norm * (max - min) + min
range_val = s2 - s1
range_val = np.where(np.abs(range_val) < 1e-10, 1.0, range_val)
arr_restored = arr_2d * range_val + s1
else: # zscore
# Inverse z-score: x = norm * std + mean
s2_safe = np.where(np.abs(s2) < 1e-10, 1.0, s2)
arr_restored = arr_2d * s2_safe + s1
# Restore original dimensions
if arr_ndim == 1:
arr_restored = arr_restored.flatten()
restored.append(arr_restored)
# Return single result if input was single
if single_input:
return restored[0]
return restored
def remove_duplicates(new_decs, existing_decs=None, tol=1e-6):
"""
Remove duplicate solutions from new decision variables.
Parameters
----------
new_decs : np.ndarray
New decision variables to be filtered, shape (N, D)
existing_decs : np.ndarray, optional
Existing decision variables to check against, shape (M, D)
If None, only remove duplicates within new_decs
tol : float, optional
Tolerance for duplicate detection (default: 1e-6)
Returns
-------
unique_decs : np.ndarray
Unique decision variables, shape (K, D) where K <= N
"""
if new_decs.shape[0] == 0:
return np.empty((0, new_decs.shape[1]))
# Step 1: Remove duplicates within new_decs
unique_indices = []
seen = set()
for i, dec in enumerate(new_decs):
dec_tuple = tuple(np.round(dec, 8))
if dec_tuple not in seen:
seen.add(dec_tuple)
unique_indices.append(i)
if len(unique_indices) == 0:
return np.empty((0, new_decs.shape[1]))
unique_decs = new_decs[unique_indices]
# Step 2: Remove solutions already in existing_decs (if provided)
if existing_decs is not None and existing_decs.shape[0] > 0:
final_indices = []
for i, dec in enumerate(unique_decs):
distances = np.min(cdist(dec.reshape(1, -1), existing_decs))
if distances > tol:
final_indices.append(i)
if len(final_indices) == 0:
return np.empty((0, new_decs.shape[1]))
unique_decs = unique_decs[final_indices]
return unique_decs
def kmeans_clustering(data, k):
"""
K-means clustering using sklearn.
Parameters
----------
data : np.ndarray
Data points, shape (N, D)
k : int
Number of clusters
Returns
-------
labels : np.ndarray
Cluster labels for each point, shape (N,)
Labels are integers in range [0, k-1]
"""
N = data.shape[0]
# If k >= N, assign each point to its own cluster
if k >= N:
return np.arange(N)
kmeans = KMeans(n_clusters=k, n_init='auto', random_state=None)
labels = kmeans.fit_predict(data)
return labels
def ibea_fitness(objs, kappa):
"""
Calculate fitness values for the population using IBEA indicator.
Parameters
----------
objs : ndarray
Objective values with shape (N, M), where N is the number of
individuals and M is the number of objectives.
kappa : float
Fitness scaling factor.
Returns
-------
fitness : ndarray
Fitness values for each individual with shape (N,).
I : ndarray
Indicator matrix with shape (N, N).
C : ndarray
Normalization constants with shape (N,).
"""
# Normalize objective values to [0, 1]
min_val = np.min(objs, axis=0)
max_val = np.max(objs, axis=0)
range_val = max_val - min_val
range_val[range_val == 0] = 1.0
objs_norm = (objs - min_val) / range_val
# Calculate indicator matrix (vectorized)
I = np.max(objs_norm[:, np.newaxis, :] - objs_norm[np.newaxis, :, :], axis=2)
# Calculate normalization constants
C = np.max(np.abs(I), axis=0)
C[C == 0] = 1e-6 # Avoid division by zero
# Calculate fitness
fitness = np.sum(-np.exp(-I / C / kappa), axis=0) + 1
return fitness, I, C
def is_duplicate(x, X, epsilon=1e-10):
"""
Check if position(s) x are duplicates in X or within themselves.
Parameters
----------
x : np.ndarray
Position(s) to check, shape (dim,) or (1, dim) or (n_points, dim)
X : np.ndarray
Existing positions, shape (n_samples, dim)
epsilon : float
Tolerance for duplicate detection
Returns
-------
bool or list of bool
- If x is single point: returns bool
- If x contains multiple points: returns list of bool for each point
"""
# Ensure x is at least 2D
if x.ndim == 1:
x = x.reshape(1, -1)
if x.shape[0] == 1:
# Single point case: return bool
if len(X) == 0:
return False
dist = cdist(x, X, metric='euclidean')
return bool(np.min(dist) < epsilon)
else:
# Multiple points case: return list of bool
n_points = x.shape[0]
results = [False] * n_points
# Track which points in x are unique (not duplicate with any point checked so far)
unique_points = [] # List of indices that are still considered unique
# First pass: check against X
if len(X) > 0:
dist_to_X = cdist(x, X, metric='euclidean')
for i in range(n_points):
if np.any(dist_to_X[i] < epsilon):
results[i] = True
else:
unique_points.append(i)
else:
unique_points = list(range(n_points))
# Second pass: check duplicates among unique x points
# We'll build up a list of truly unique points
final_unique = []
for i in unique_points:
is_unique = True
# Check against previously identified unique points
for j in final_unique:
dist_ij = np.linalg.norm(x[i] - x[j])
if dist_ij < epsilon:
# x[i] is duplicate of x[j]
results[i] = True
is_unique = False
break
if is_unique:
final_unique.append(i)
return results
# =============================================================================
# RBF Model (Multiquadric)
# =============================================================================
[docs]
def rbf_build(X_train, Y_train, bf_c=1.0):
"""
Build an RBF (Multiquadric) interpolation model.
Parameters
----------
X_train : np.ndarray
Training inputs, shape (n, d)
Y_train : np.ndarray
Training outputs, shape (n,) or (n, 1)
bf_c : float
Multiquadric parameter (default: 1.0)
Returns
-------
model : dict
RBF model containing coefficients and parameters
"""
Y_train = Y_train.flatten()
n = X_train.shape[0]
mean_y = np.mean(Y_train)
# Compute MQ distance matrix
dist_sq = cdist(X_train, X_train, metric='sqeuclidean')
dist_matrix = np.sqrt(dist_sq + bf_c ** 2)
# Solve for coefficients
try:
coefs = np.linalg.solve(dist_matrix, Y_train - mean_y)
except np.linalg.LinAlgError:
coefs = np.linalg.lstsq(dist_matrix, Y_train - mean_y, rcond=None)[0]
return {
'n': n,
'mean_y': mean_y,
'bf_c': bf_c,
'coefs': coefs
}
[docs]
def rbf_predict(model, X_train, X_query):
"""
Predict using RBF model.
Parameters
----------
model : dict
RBF model from rbf_build
X_train : np.ndarray
Training inputs, shape (n, d)
X_query : np.ndarray
Query points, shape (nq, d)
Returns
-------
Y_pred : np.ndarray
Predicted values, shape (nq,)
"""
bf_c = model['bf_c']
# MQ distance from query to training points
dist_sq = cdist(X_query, X_train, metric='sqeuclidean')
dist_matrix = np.sqrt(dist_sq + bf_c ** 2)
Y_pred = model['mean_y'] + dist_matrix @ model['coefs']
return Y_pred
# =============================================================================
# dsmerge: Merge duplicate design sites
# =============================================================================
[docs]
def dsmerge(S, Y, ds=1e-14):
"""
Merge data for duplicate/near-duplicate design sites.
Finds clusters of points within threshold distance in normalized space
and merges them by averaging.
Parameters
----------
S : np.ndarray
Design sites, shape (m, n)
Y : np.ndarray
Responses, shape (m,) or (m, 1)
ds : float
Threshold for near-duplicate detection (default: 1e-14)
Returns
-------
mS : np.ndarray
Merged design sites
mY : np.ndarray
Merged responses
"""
S = S.copy()
Y = Y.flatten().copy()
more = True
while more:
m = S.shape[0]
if m <= 1:
break
# Normalize sites
mean_s = np.mean(S, axis=0)
std_s = np.std(S, axis=0)
std_s[std_s == 0] = 1.0
sc_s = (S - mean_s) / std_s
# Compute pairwise Euclidean distances
D = cdist(sc_s, sc_s, metric='euclidean')
np.fill_diagonal(D, np.inf)
# Count near-duplicates for each point
mult = np.sum(D < ds, axis=0)
if np.max(mult) == 0:
break
ladr = []
while np.max(mult) > 0:
jj = np.argmax(mult)
ladr.append(jj)
# Find neighbors
ngb = np.where(D[:, jj] < ds)[0]
if len(ngb) == 0:
mult[jj] = 0
continue
# Merge: average S and Y at the center point
all_pts = np.concatenate([[jj], ngb])
S[jj] = np.mean(S[all_pts], axis=0)
Y[jj] = np.mean(Y[all_pts])
mult[ngb] = 0
mult[jj] = 0
if len(ladr) == 0:
break
# Remove neighbors that were merged (not centers)
keep = list(set(range(m)))
for center in ladr:
ngb = np.where(D[:, center] < ds)[0]
for n_idx in ngb:
if n_idx != center and n_idx in keep:
keep.remove(n_idx)
keep = sorted(keep)
S = S[keep]
Y = Y[keep]
# Check if any more duplicates
if len(S) <= 1:
break
mean_s2 = np.mean(S, axis=0)
std_s2 = np.std(S, axis=0)
std_s2[std_s2 == 0] = 1.0
sc_s2 = (S - mean_s2) / std_s2
D2 = cdist(sc_s2, sc_s2, metric='euclidean')
np.fill_diagonal(D2, np.inf)
if np.min(D2) >= ds:
more = False
return S, Y
# =============================================================================
# Archive Update
# =============================================================================
[docs]
def merge_archive(arc_decs, arc_objs, new_decs, new_objs):
"""
Update archive by merging and removing duplicates.
Parameters
----------
arc_decs, arc_objs : np.ndarray
Current archive
new_decs, new_objs : np.ndarray
New solutions
Returns
-------
merged_decs, merged_objs : np.ndarray
Updated archive
"""
merged_decs = np.vstack([arc_decs, new_decs])
merged_objs = np.vstack([arc_objs, new_objs])
_, unique_idx = np.unique(merged_decs, axis=0, return_index=True)
unique_idx = np.sort(unique_idx)
return merged_decs[unique_idx], merged_objs[unique_idx]
# =============================================================================
# SPEA2 Fitness
# =============================================================================
[docs]
def spea2_fitness(pop_obj, pop_con=None):
"""
Calculate SPEA2 fitness with constrained dominance.
Parameters
----------
pop_obj : np.ndarray, shape (N, M)
pop_con : np.ndarray, shape (N, C), optional
Constraint violation values (positive = violation).
Returns
-------
fitness : np.ndarray, shape (N,)
SPEA2 fitness values. Lower is better; < 1 means non-dominated.
"""
N = pop_obj.shape[0]
if N == 0:
return np.array([])
if pop_con is not None:
CV = np.sum(np.maximum(0, pop_con), axis=1) if pop_con.ndim == 2 else np.maximum(0, pop_con)
else:
CV = np.zeros(N)
# Constrained dominance relation
dominate = np.zeros((N, N), dtype=bool)
for i in range(N - 1):
for j in range(i + 1, N):
if CV[i] < CV[j]:
dominate[i, j] = True
elif CV[i] > CV[j]:
dominate[j, i] = True
else:
better_i = np.any(pop_obj[i] < pop_obj[j])
worse_i = np.any(pop_obj[i] > pop_obj[j])
if better_i and not worse_i:
dominate[i, j] = True
elif worse_i and not better_i:
dominate[j, i] = True
# Strength S(i)
S = np.sum(dominate, axis=1)
# Raw fitness R(i)
R = np.zeros(N, dtype=float)
for i in range(N):
dominators = np.where(dominate[:, i])[0]
R[i] = np.sum(S[dominators])
# Density D(i)
dist = cdist(pop_obj, pop_obj)
np.fill_diagonal(dist, np.inf)
dist_sorted = np.sort(dist, axis=1)
k = max(1, int(np.floor(np.sqrt(N))))
D = 1.0 / (dist_sorted[:, min(k - 1, N - 1)] + 2.0)
return R + D
# =============================================================================
# SPEA2 Truncation (Lexicographic)
# =============================================================================
[docs]
def spea2_truncation(pop_obj, N):
"""
Select N solutions by iteratively removing the most crowded.
Uses SPEA2-style lexicographic nearest-neighbor comparison.
Parameters
----------
pop_obj : np.ndarray, shape (n, M)
Objectives.
N : int
Number of solutions to keep.
Returns
-------
selected : np.ndarray
Indices of selected solutions.
"""
n = pop_obj.shape[0]
if n <= N:
return np.arange(n)
dist = cdist(pop_obj, pop_obj)
np.fill_diagonal(dist, np.inf)
deleted = np.zeros(n, dtype=bool)
while np.sum(~deleted) > N:
remaining = np.where(~deleted)[0]
sub_dist = dist[np.ix_(remaining, remaining)]
sorted_dist = np.sort(sub_dist, axis=1)
rank = np.lexsort(sorted_dist[:, ::-1].T)
deleted[remaining[rank[0]]] = True
return np.where(~deleted)[0]
# =============================================================================
# SPEA2 Truncation (Fast, Min-Distance)
# =============================================================================
[docs]
def spea2_truncation_fast(pop_obj, N):
"""
Select N solutions by iteratively removing the one with smallest
nearest-neighbor distance.
Parameters
----------
pop_obj : np.ndarray, shape (n, M)
Objectives.
N : int
Number of solutions to keep.
Returns
-------
selected : np.ndarray
Indices of selected solutions.
"""
n = pop_obj.shape[0]
if n <= N:
return np.arange(n)
dist = cdist(pop_obj, pop_obj)
np.fill_diagonal(dist, np.inf)
deleted = np.zeros(n, dtype=bool)
K = n - N
for _ in range(K):
remaining = np.where(~deleted)[0]
sub_dist = dist[np.ix_(remaining, remaining)]
min_dists = np.min(sub_dist, axis=1)
worst = np.argmin(min_dists)
deleted[remaining[worst]] = True
return np.where(~deleted)[0]
# =============================================================================
# Dimension Alignment
# =============================================================================
def align_dimensions(dec, target_dim, fill='random'):
"""
Align decision variable dimensions to target dimension.
Supports both 1D vectors and 2D arrays. If the input has fewer dimensions
than the target, padding is added. If it has more, it is truncated.
Parameters
----------
dec : np.ndarray
Decision variable(s), shape (dim,) for 1D or (n, dim) for 2D
target_dim : int
Target dimension
fill : str, optional
Padding strategy: 'random' for U[0,1] values, 'zero' for zeros (default: 'random')
Returns
-------
aligned_dec : np.ndarray
Aligned decision variable(s), shape (target_dim,) or (n, target_dim)
"""
if dec.ndim == 1:
current_dim = len(dec)
if current_dim == target_dim:
return dec.copy()
elif current_dim < target_dim:
if fill == 'random':
padding = np.random.rand(target_dim - current_dim)
else:
padding = np.zeros(target_dim - current_dim)
return np.concatenate([dec, padding])
else:
return dec[:target_dim].copy()
else:
n, current_dim = dec.shape
if current_dim == target_dim:
return dec.copy()
elif current_dim < target_dim:
if fill == 'random':
padding = np.random.rand(n, target_dim - current_dim)
else:
padding = np.zeros((n, target_dim - current_dim))
return np.hstack([dec, padding])
else:
return dec[:, :target_dim].copy()
# =============================================================================
# Constrained Sort
# =============================================================================
def constrained_sort(objs, cvs):
"""
Sort indices by constraint violation first, then by objective value (ascending).
Parameters
----------
objs : np.ndarray
Objective values, shape (n,) or (n, 1)
cvs : np.ndarray
Constraint violation values (total CV per individual), shape (n,) or (n, 1)
Returns
-------
sorted_indices : np.ndarray
Indices that would sort the population by (cvs, objs) ascending, shape (n,)
"""
return np.lexsort((np.asarray(objs).flatten(), np.asarray(cvs).flatten()))
# =============================================================================
# CMA-ES Utilities
# =============================================================================
def cmaes_init_params(dim, lam=None, sigma0=0.3):
"""
Initialize CMA-ES parameters for a given dimension.
Parameters
----------
dim : int
Problem dimension
lam : int, optional
Population size (lambda). If None, uses ``4 + floor(3 * log(dim))``.
sigma0 : float, optional
Initial step size (default: 0.3)
Returns
-------
params : dict
CMA-ES state dictionary containing:
- **Strategy parameters**: dim, lam, mu, weights, mueff
- **Control parameters**: cs, damps, cc, c1, cmu, chiN
- **State variables**: m_dec, ps, pc, B, D, C, invsqrtC, sigma, eigenFE
"""
if lam is None:
lam = int(4 + 3 * np.log(dim))
mu = lam // 2
weights = np.log(mu + 0.5) - np.log(np.arange(1, mu + 1))
weights = weights / np.sum(weights)
mueff = 1.0 / np.sum(weights ** 2)
cs = (mueff + 2) / (dim + mueff + 5)
damps = 1 + cs + 2 * max(np.sqrt((mueff - 1) / (dim + 1)) - 1, 0)
cc = (4 + mueff / dim) / (4 + dim + 2 * mueff / dim)
c1 = 2 / ((dim + 1.3) ** 2 + mueff)
cmu = min(1 - c1, 2 * (mueff - 2 + 1 / mueff) / ((dim + 2) ** 2 + 2 * mueff / 2))
chiN = np.sqrt(dim) * (1 - 1 / (4 * dim) + 1 / (21 * dim ** 2))
return {
'dim': dim, 'lam': lam, 'mu': mu, 'weights': weights, 'mueff': mueff,
'cs': cs, 'damps': damps, 'cc': cc, 'c1': c1, 'cmu': cmu, 'chiN': chiN,
'm_dec': np.random.rand(dim),
'ps': np.zeros(dim), 'pc': np.zeros(dim),
'B': np.eye(dim), 'D': np.ones(dim),
'C': np.eye(dim), 'invsqrtC': np.eye(dim),
'sigma': sigma0, 'eigenFE': 0
}
def cmaes_sample(m_dec, sigma, B, D, lam, clip=True):
"""
Generate offspring using CMA-ES sampling.
Samples from ``N(m_dec, sigma^2 * C)`` where ``C = B * diag(D^2) * B^T``.
Parameters
----------
m_dec : np.ndarray
Mean decision vector, shape (d,)
sigma : float
Step size (global scaling factor)
B : np.ndarray
Eigenvector matrix from covariance decomposition, shape (d, d)
D : np.ndarray
Square root of eigenvalues (standard deviations), shape (d,)
lam : int
Number of offspring to generate
clip : bool, optional
If True, clip offspring to [0, 1] (default: True)
Returns
-------
offdecs : np.ndarray
Offspring decision variables, shape (lam, d)
"""
d = len(m_dec)
z = np.random.randn(lam, d)
offdecs = m_dec + sigma * ((z * D) @ B.T)
if clip:
offdecs = np.clip(offdecs, 0, 1)
return offdecs
def cmaes_update(params, sorted_decs, nfes):
"""
Update CMA-ES parameters given fitness-sorted offspring.
Updates mean, evolution paths, covariance matrix, step size, and
performs eigendecomposition when needed. Modifies params dict in-place.
Parameters
----------
params : dict
CMA-ES parameter dictionary from :func:`cmaes_init_params`
sorted_decs : np.ndarray
Offspring sorted by fitness (best first), shape (>=mu, dim)
nfes : int
Current number of function evaluations (for eigendecomposition timing)
Returns
-------
restarted : bool
True if eigendecomposition failed and covariance was reset to identity
"""
p = params
# Update mean
old_dec = p['m_dec'].copy()
p['m_dec'] = p['weights'] @ sorted_decs[:p['mu']]
# Update evolution paths
diff = (p['m_dec'] - old_dec) / p['sigma']
p['ps'] = (1 - p['cs']) * p['ps'] + \
np.sqrt(p['cs'] * (2 - p['cs']) * p['mueff']) * (p['invsqrtC'] @ diff)
ps_norm = np.linalg.norm(p['ps'])
factor = 1 - (1 - p['cs']) ** (2 * nfes / p['lam'])
if factor > 0:
hsig = float(ps_norm / np.sqrt(factor) / p['chiN'] < 1.4 + 2 / (p['dim'] + 1))
else:
hsig = 1.0
p['pc'] = (1 - p['cc']) * p['pc'] + \
hsig * np.sqrt(p['cc'] * (2 - p['cc']) * p['mueff']) * diff
# Update covariance matrix
artmp = (sorted_decs[:p['mu']] - old_dec) / p['sigma']
delta = (1 - hsig) * p['cc'] * (2 - p['cc'])
p['C'] = (1 - p['c1'] - p['cmu']) * p['C'] + \
p['c1'] * (np.outer(p['pc'], p['pc']) + delta * p['C']) + \
p['cmu'] * (artmp.T @ np.diag(p['weights']) @ artmp)
# Update step size
p['sigma'] *= np.exp(p['cs'] / p['damps'] * (ps_norm / p['chiN'] - 1))
# Eigendecomposition (periodic)
restarted = False
if nfes - p['eigenFE'] > p['lam'] / (p['c1'] + p['cmu']) / p['dim'] / 10:
p['eigenFE'] = nfes
if not np.all(np.isfinite(p['C'])):
restarted = True
else:
p['C'] = np.triu(p['C']) + np.triu(p['C'], 1).T
try:
eigvals, eigvecs = np.linalg.eigh(p['C'])
if np.min(eigvals) <= 0:
restarted = True
else:
p['B'] = eigvecs
p['D'] = np.sqrt(eigvals)
except np.linalg.LinAlgError:
restarted = True
if restarted:
p['B'] = np.eye(p['dim'])
p['D'] = np.ones(p['dim'])
p['C'] = np.eye(p['dim'])
p['invsqrtC'] = p['B'] @ np.diag(1.0 / p['D']) @ p['B'].T
return restarted