"""
Kriging-Assisted Two-Archive Search (KTS)
This module implements KTS for computationally expensive constrained/unconstrained
multi-objective optimization. It adaptively switches between two search modes:
Mode 0 (unconstrained/KTA2-style): two-archive CA/DA with convergence/diversity
Mode 1 (constrained/KCCMO-style): SPEA2-based fitness with K-means sampling
The switching is based on the correlation between convergence metric Q and
constraint violation CV.
References
----------
[1] Z. Song, H. Wang, and B. Xue. A Kriging-Assisted Two-Archive Evolutionary Algorithm for Expensive Multi-Objective Optimization. IEEE Transactions on Evolutionary Computation, 2024.
Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from scipy.spatial.distance import cdist
from scipy.stats import wilcoxon, rankdata
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict
import warnings
warnings.filterwarnings("ignore")
[docs]
class KTS:
"""
Kriging-Assisted Two-Archive Search for expensive constrained/unconstrained
multi-objective optimization with adaptive mode switching.
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'equal',
'n_cons': '[0, C]',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, n=100,
tau=0.6, phi=-0.2, mu=20, phi1=0.1, wmax1=10, mu1=5,
save_data=True, save_path='./Data', name='KTS', disable_tqdm=True):
"""
Initialize KTS algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 11*dim-1)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 200)
n : int or List[int], optional
Population/archive size per task (default: 100)
tau : float, optional
Correlation threshold for mode 0 (default: 0.6)
phi : float, optional
Correlation threshold for mode 1 (default: -0.2)
mu : int, optional
Number of elite solutions for correlation (default: 20)
phi1 : float, optional
Uncertainty sampling fraction (default: 0.1)
wmax1 : int, optional
Inner surrogate evolution generations (default: 10)
mu1 : int, optional
Number of re-evaluated solutions per generation (default: 5)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'KTS')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial
self.max_nfes = max_nfes if max_nfes is not None else 200
self.n = n
self.tau = tau
self.phi = phi
self.mu = mu
self.phi1 = phi1
self.wmax1 = wmax1
self.mu1 = mu1
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the KTS algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
data_type = torch.float
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_objs = problem.n_objs
n_cons = problem.n_cons if hasattr(problem, 'n_cons') else [0] * nt
if self.n_initial is None:
n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)]
else:
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
n_per_task = par_list(self.n, nt)
# Initialize with LHS
decs = initialization(problem, n_initial_per_task, method='lhs')
objs, cons = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Initialize archives for each task
CAs = [] # Convergence Archives (IBEA-based)
DA1s = [] # Diversity Archives (ND+Lp)
P1s = [] # Population 1 (SPEA2, no constraints)
P2s = [] # Population 2 (SPEA2, with constraints)
for i in range(nt):
N = n_per_task[i]
M = n_objs[i]
p_i = 1.0 / M
# CA: IBEA-based
CA_objs, CA_decs = _update_CA(None, objs[i], decs[i], N)
CAs.append((CA_objs, CA_decs))
# DA1: ND + diversity
DA_objs, DA_decs = _update_DA(None, objs[i], decs[i], N, p_i)
DA1s.append((DA_objs, DA_decs))
# P1: SPEA2 without constraints
con_i = cons[i] if cons[i] is not None else None
P1_objs, P1_decs, P1_cons = _update_P(
objs[i], decs[i], None, N, is_origin=False)
P1s.append((P1_objs, P1_decs, P1_cons))
# P2: SPEA2 with constraints
P2_objs, P2_decs, P2_cons = _update_P(
objs[i], decs[i], con_i, N, is_origin=True)
P2s.append((P2_objs, P2_decs, P2_cons))
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
M = n_objs[i]
dim = dims[i]
p_i = 1.0 / M
N = n_per_task[i]
C = n_cons[i]
# ===== Determine Search Mode =====
search_mode = self._determine_search_mode(
objs[i], cons[i], self.mu, self.tau, self.phi
)
# ===== Build GP models =====
obj_models = []
for j in range(M):
model = gp_build(decs[i], objs[i][:, j:j + 1], data_type)
obj_models.append(model)
con_models = []
if C > 0 and search_mode == 1:
con_i = cons[i]
for j in range(C):
model = gp_build(decs[i], con_i[:, j:j + 1], data_type)
con_models.append(model)
# ===== Inner Loop: Surrogate-based Evolution =====
CCA_objs = CAs[i][0].copy()
CCA_decs = CAs[i][1].copy()
CCA_mse = np.zeros((CCA_objs.shape[0], M))
# MATLAB: DA = DA1 (mode 0) or P1 (mode 1)
if search_mode == 0:
CDA_objs = DA1s[i][0].copy()
CDA_decs = DA1s[i][1].copy()
else:
CDA_objs = P1s[i][0].copy()
CDA_decs = P1s[i][1].copy()
CDA_mse = np.zeros((CDA_objs.shape[0], M))
CP2_objs = P2s[i][0].copy()
CP2_decs = P2s[i][1].copy()
CP2_cons = P2s[i][2].copy() if P2s[i][2] is not None else None
CP2_mse = np.zeros((CP2_objs.shape[0], M))
for w in range(self.wmax1):
if search_mode == 0:
# KTA2-style: crossover-only + mutation-only
parentC_decs, parentM_decs = _mating_selection(
CCA_objs, CCA_decs, CDA_objs, CDA_decs, N
)
off_C = _crossover_only(parentC_decs, muc=20)
off_M = _mutation_only(parentM_decs, mum=20)
off_decs = np.vstack([off_C, off_M])
else:
# KCCMO-style: full GA on each pool independently
fitness_p2 = spea2_fitness(CP2_objs, CP2_cons)
fitness_da = spea2_fitness(CDA_objs)
pool1 = _tournament_selection_fitness(fitness_p2, N)
pool2 = _tournament_selection_fitness(fitness_da, N)
off1 = _full_ga(CP2_decs[pool1])
off2 = _full_ga(CDA_decs[pool2])
off_decs = np.vstack([off1, off2])
# Predict objectives for offspring only
off_objs, off_mse = _predict_objectives(
off_decs, obj_models, M, data_type)
# Predict constraints for offspring if needed
off_cons = None
if search_mode == 1 and len(con_models) > 0:
off_cons = _predict_constraints(
off_decs, con_models, C, data_type)
# Update surrogate CA: CCA + offspring
ca_objs = np.vstack([CCA_objs, off_objs])
ca_decs = np.vstack([CCA_decs, off_decs])
ca_mse = np.vstack([CCA_mse, off_mse])
CCA_objs, CCA_decs, CCA_mse = _k_update_CA(
ca_objs, ca_decs, ca_mse, N)
# Update surrogate DA: CDA + offspring
da_objs = np.vstack([CDA_objs, off_objs])
da_decs = np.vstack([CDA_decs, off_decs])
da_mse = np.vstack([CDA_mse, off_mse])
if search_mode == 0:
CDA_objs, CDA_decs, CDA_mse = _k_update_DA(
da_objs, da_decs, da_mse, N, p_i)
else:
CDA_objs, CDA_decs, _, CDA_mse = _k_update_P(
da_objs, da_decs, None, da_mse, N,
is_origin=False)
# Update surrogate P2: CP2 + offspring
p2_objs = np.vstack([CP2_objs, off_objs])
p2_decs = np.vstack([CP2_decs, off_decs])
p2_mse = np.vstack([CP2_mse, off_mse])
p2_cons = None
if off_cons is not None and CP2_cons is not None:
p2_cons = np.vstack([CP2_cons, off_cons])
elif off_cons is not None:
p2_cons = off_cons
CP2_objs, CP2_decs, CP2_cons, CP2_mse = _k_update_P(
p2_objs, p2_decs, p2_cons, p2_mse, N,
is_origin=True)
# ===== Adaptive Sampling =====
if search_mode == 0:
# Remove solutions already evaluated (MATLAB: setxor)
keep_ca = _not_in_archive(CCA_decs, decs[i])
keep_da = _not_in_archive(CDA_decs, decs[i])
# KTA2-style sampling
offspring_decs = _adaptive_sampling(
CCA_objs[keep_ca], CDA_objs[keep_da],
CCA_decs[keep_ca], CDA_decs[keep_da],
CDA_mse[keep_da], DA1s[i][0], DA1s[i][1],
self.mu1, p_i, self.phi1
)
else:
# KCCMO-style sampling with K-means
# MATLAB: KCCMO_sampling(CP2, P2, mu1)
ref_front = _get_best_objs(P2s[i][0], P2s[i][2])
offspring_decs = _kccmo_sampling(
CP2_objs, CP2_decs, CP2_cons,
ref_front, self.mu1
)
# Remove duplicates
offspring_decs = remove_duplicates(offspring_decs, decs[i])
if offspring_decs.shape[0] > 0:
# Evaluate
off_objs, off_cons = evaluation_single(problem, offspring_decs, i)
# Update all evaluated data
decs[i] = np.vstack([decs[i], offspring_decs])
objs[i] = np.vstack([objs[i], off_objs])
if cons[i] is not None and off_cons is not None:
cons[i] = np.vstack([cons[i], off_cons])
# Update real archives
CA_objs, CA_decs = _update_CA(
CAs[i], off_objs, offspring_decs, N)
CAs[i] = (CA_objs, CA_decs)
DA_objs, DA_decs = _update_DA(
DA1s[i], off_objs, offspring_decs, N, p_i)
DA1s[i] = (DA_objs, DA_decs)
off_cons_i = off_cons if off_cons is not None else None
P1_objs, P1_decs, P1_cons = _update_P_real(
P1s[i], off_objs, offspring_decs, None, N, is_origin=False)
P1s[i] = (P1_objs, P1_decs, P1_cons)
P2_objs, P2_decs, P2_cons = _update_P_real(
P2s[i], off_objs, offspring_decs, off_cons_i, N, is_origin=True)
P2s[i] = (P2_objs, P2_decs, P2_cons)
nfes_per_task[i] += offspring_decs.shape[0]
pbar.update(offspring_decs.shape[0])
pbar.close()
runtime = time.time() - start_time
all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu1)
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, bounds=problem.bounds,
save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
@staticmethod
def _determine_search_mode(objs_all, cons_all, mu, tau, phi):
"""
Determine search mode based on correlation between convergence Q and CV.
Parameters
----------
objs_all : np.ndarray
All evaluated objectives
cons_all : np.ndarray or None
All evaluated constraints
mu : int
Number of elite solutions for correlation
tau : float
Threshold for mode 0
phi : float
Threshold for mode 1
Returns
-------
search_mode : int
0 for unconstrained/KTA2 mode, 1 for constrained/KCCMO mode
"""
N = objs_all.shape[0]
# Compute constraint violation
if cons_all is None or cons_all.shape[1] == 0:
# No constraints → always mode 0
return 0
cv = np.sum(np.maximum(0, cons_all), axis=1)
# If all feasible, mode 0
if np.all(cv < 1e-10):
return 0
# Compute convergence metric Q using IBEA indicator
Q = _cal_Q(objs_all)
# Sort Q descending and take last mu+1 (best convergence) — MATLAB:
# [Q, index] = sort(Q,'descend'); CV = CV(index);
# coef = corrcoef(Q(end-mu:end), CV(end-mu:end))
sorted_indices = np.argsort(Q)[::-1]
Q_sorted = Q[sorted_indices]
cv_sorted = cv[sorted_indices]
n_elite = min(mu + 1, N)
Q_elite = Q_sorted[-n_elite:]
cv_elite = cv_sorted[-n_elite:]
# Correlation coefficient
if np.std(Q_elite) < 1e-10 or np.std(cv_elite) < 1e-10:
return 0
try:
r_coef = np.corrcoef(Q_elite, cv_elite)[0, 1]
except Exception:
return 0
if np.isnan(r_coef):
return 0
# Mode switching
if r_coef < phi:
return 1
elif r_coef >= tau:
return 0
else:
return np.random.choice([0, 1])
# =============================================================================
# Helper Functions
# =============================================================================
def _cal_Q(objs):
"""
Compute convergence metric Q for each solution using IBEA indicator.
Q(i) = 1/F(i) where F(i) is the IBEA fitness.
Parameters
----------
objs : np.ndarray
Objective values, shape (N, M)
Returns
-------
Q : np.ndarray
Convergence metric, shape (N,)
"""
fitness, _, _ = ibea_fitness(objs, kappa=0.05)
Q = 1.0 / np.maximum(fitness, 1e-10)
return Q
def _cal_fitness_spea2_ref(pop_objs, pop_cons, ref_objs):
"""
Modified SPEA2 fitness using distance to reference front instead of density.
Parameters
----------
pop_objs : np.ndarray
Objectives, shape (N, M)
pop_cons : np.ndarray or None
Constraints, shape (N, C)
ref_objs : np.ndarray
Reference Pareto front objectives
Returns
-------
fitness : np.ndarray
Fitness values, shape (N,)
"""
N = pop_objs.shape[0]
if pop_cons is not None:
cv = np.sum(np.maximum(0, pop_cons), axis=1)
else:
cv = np.zeros(N)
dominate = np.zeros((N, N), dtype=bool)
for p in range(N):
for q in range(p + 1, N):
if cv[p] < cv[q]:
dominate[p, q] = True
elif cv[p] > cv[q]:
dominate[q, p] = True
else:
any_less = np.any(pop_objs[p] < pop_objs[q])
any_greater = np.any(pop_objs[p] > pop_objs[q])
if any_less and not any_greater:
dominate[p, q] = True
elif any_greater and not any_less:
dominate[q, p] = True
S = np.sum(dominate, axis=1).astype(float)
R = np.zeros(N)
for p in range(N):
dominators = np.where(dominate[:, p])[0]
R[p] = np.sum(S[dominators])
# Distance to reference front (min Euclidean distance)
dist_to_ref = cdist(pop_objs, ref_objs)
min_dist = np.min(dist_to_ref, axis=1)
D = 1.0 / (min_dist + 2.0)
fitness = R + D
return fitness
def _predict_objectives(pop_decs, obj_models, M, data_type):
"""Predict objectives and MSE using GP models."""
N = pop_decs.shape[0]
pop_objs = np.zeros((N, M))
pop_mse = np.zeros((N, M))
for j in range(M):
pred, std = gp_predict(obj_models[j], pop_decs, data_type)
pop_objs[:, j] = pred.flatten()
pop_mse[:, j] = (std.flatten()) ** 2
return pop_objs, pop_mse
def _predict_constraints(pop_decs, con_models, C, data_type):
"""Predict constraints using GP models."""
N = pop_decs.shape[0]
pop_cons = np.zeros((N, C))
for j in range(C):
pred, _ = gp_predict(con_models[j], pop_decs, data_type)
pop_cons[:, j] = pred.flatten()
return pop_cons
def _mating_selection(CA_objs, CA_decs, DA_objs, DA_decs, N):
"""
KTA2-style mating selection from CA and DA.
Returns parents for crossover and mutation.
"""
CA_n = CA_objs.shape[0]
DA_n = DA_objs.shape[0]
half_N = int(np.ceil(N / 2))
# Select from CA with dominance comparison
idx1 = np.random.randint(0, CA_n, size=half_N)
idx2 = np.random.randint(0, CA_n, size=half_N)
any_less = np.any(CA_objs[idx1] < CA_objs[idx2], axis=1)
any_greater = np.any(CA_objs[idx1] > CA_objs[idx2], axis=1)
dominate = any_less.astype(int) - any_greater.astype(int)
selected_CA = np.where(dominate == 1, idx1, idx2)
selected_DA = np.random.randint(0, DA_n, size=half_N)
parentC_decs = np.vstack([CA_decs[selected_CA], DA_decs[selected_DA]])
# Mutation parents from CA
parentM_decs = CA_decs[np.random.randint(0, CA_n, size=N)]
return parentC_decs, parentM_decs
def _tournament_selection_fitness(fitness, n_select, tournament_size=2):
"""Binary tournament selection (lower fitness is better)."""
N = len(fitness)
selected = np.zeros(n_select, dtype=int)
for k in range(n_select):
candidates = np.random.randint(0, N, size=tournament_size)
selected[k] = candidates[np.argmin(fitness[candidates])]
return selected
def _update_CA(CA, new_objs, new_decs, max_size):
"""Update Convergence Archive using IBEA fitness."""
if CA is None:
CA_objs = new_objs.copy()
CA_decs = new_decs.copy()
else:
CA_objs = np.vstack([CA[0], new_objs])
CA_decs = np.vstack([CA[1], new_decs])
N = CA_objs.shape[0]
if N <= max_size:
return CA_objs, CA_decs
fitness, I, C = ibea_fitness(CA_objs, kappa=0.05)
choose = list(range(N))
while len(choose) > max_size:
fit_values = fitness[choose]
min_idx = np.argmin(fit_values)
to_remove = choose[min_idx]
if C[to_remove] > 1e-10:
fitness += np.exp(-I[to_remove, :] / C[to_remove] / 0.05)
choose.pop(min_idx)
return CA_objs[choose], CA_decs[choose]
def _update_DA(DA, new_objs, new_decs, max_size, p):
"""Update Diversity Archive with non-dominated sorting and Lp truncation."""
if DA is None:
DA_objs = new_objs.copy()
DA_decs = new_decs.copy()
else:
DA_objs = np.vstack([DA[0], new_objs])
DA_decs = np.vstack([DA[1], new_decs])
N = DA_objs.shape[0]
front_no, _ = nd_sort(DA_objs, N)
nd_mask = front_no == 1
DA_objs = DA_objs[nd_mask]
DA_decs = DA_decs[nd_mask]
N = DA_objs.shape[0]
if N <= max_size:
return DA_objs, DA_decs
# Select extreme solutions
choose = np.zeros(N, dtype=bool)
for m in range(DA_objs.shape[1]):
choose[np.argmin(DA_objs[:, m])] = True
choose[np.argmax(DA_objs[:, m])] = True
if np.sum(choose) > max_size:
chosen_idx = np.where(choose)[0]
to_remove = np.random.choice(chosen_idx, size=np.sum(choose) - max_size, replace=False)
choose[to_remove] = False
elif np.sum(choose) < max_size:
diff = DA_objs[:, np.newaxis, :] - DA_objs[np.newaxis, :, :]
dist = np.sum(np.abs(diff) ** p, axis=2) ** (1.0 / p)
np.fill_diagonal(dist, np.inf)
while np.sum(choose) < max_size:
remaining = np.where(~choose)[0]
chosen = np.where(choose)[0]
if len(remaining) == 0:
break
min_dists = np.min(dist[np.ix_(remaining, chosen)], axis=1)
best = np.argmax(min_dists)
choose[remaining[best]] = True
return DA_objs[choose], DA_decs[choose]
def _update_P(pop_objs, pop_decs, pop_cons, N, is_origin):
"""
SPEA2-based population update.
Parameters
----------
pop_objs : np.ndarray
Objectives
pop_decs : np.ndarray
Decisions
pop_cons : np.ndarray or None
Constraints
N : int
Target size
is_origin : bool
If True, include constraints in fitness
Returns
-------
selected_objs, selected_decs, selected_cons : np.ndarray
"""
if is_origin and pop_cons is not None:
fitness = spea2_fitness(pop_objs, pop_cons)
else:
fitness = spea2_fitness(pop_objs)
n_pop = pop_objs.shape[0]
# Select solutions with fitness < 1 (non-dominated in SPEA2 sense)
next_mask = fitness < 1.0
if np.sum(next_mask) < N:
# Not enough: take top N by fitness
sorted_idx = np.argsort(fitness)
selected = sorted_idx[:min(N, n_pop)]
elif np.sum(next_mask) > N:
# Too many: truncate by distance
candidates = np.where(next_mask)[0]
selected = spea2_truncation(pop_objs[candidates], N)
selected = candidates[selected]
else:
selected = np.where(next_mask)[0]
out_cons = pop_cons[selected] if pop_cons is not None else None
return pop_objs[selected], pop_decs[selected], out_cons
def _update_P_real(P, new_objs, new_decs, new_cons, N, is_origin):
"""Update real population P with new solutions."""
P_objs = np.vstack([P[0], new_objs])
P_decs = np.vstack([P[1], new_decs])
if is_origin and P[2] is not None and new_cons is not None:
P_cons = np.vstack([P[2], new_cons])
elif is_origin and P[2] is not None:
P_cons = np.vstack([P[2], np.zeros((new_objs.shape[0], P[2].shape[1]))])
else:
P_cons = None
return _update_P(P_objs, P_decs, P_cons, N, is_origin)
def _k_update_CA(pop_objs, pop_decs, pop_mse, max_size):
"""Update predicted CA using IBEA fitness."""
N = pop_objs.shape[0]
if N <= max_size:
return pop_objs.copy(), pop_decs.copy(), pop_mse.copy()
fitness, I, C = ibea_fitness(pop_objs, kappa=0.05)
choose = list(range(N))
while len(choose) > max_size:
fit_values = fitness[choose]
min_idx = np.argmin(fit_values)
to_remove = choose[min_idx]
if C[to_remove] > 1e-10:
fitness += np.exp(-I[to_remove, :] / C[to_remove] / 0.05)
choose.pop(min_idx)
return pop_objs[choose], pop_decs[choose], pop_mse[choose]
def _k_update_DA(pop_objs, pop_decs, pop_mse, max_size, p):
"""Update predicted DA with ND sort + Lp truncation."""
N = pop_objs.shape[0]
min_vals = np.min(pop_objs, axis=0)
max_vals = np.max(pop_objs, axis=0)
range_vals = max_vals - min_vals
range_vals[range_vals == 0] = 1.0
pop_objs_norm = (pop_objs - min_vals) / range_vals
front_no, _ = nd_sort(pop_objs, N)
nd_mask = front_no == 1
pop_objs = pop_objs[nd_mask]
pop_decs = pop_decs[nd_mask]
pop_mse = pop_mse[nd_mask]
pop_objs_norm = pop_objs_norm[nd_mask]
N = pop_objs.shape[0]
if N <= max_size:
return pop_objs, pop_decs, pop_mse
M = pop_objs_norm.shape[1]
choose = np.zeros(N, dtype=bool)
select = np.random.permutation(M)
if select[0] < N:
choose[select[0]] = True
else:
choose[0] = True
if np.sum(choose) < max_size:
diff = pop_objs_norm[:, np.newaxis, :] - pop_objs_norm[np.newaxis, :, :]
dist = np.sum(np.abs(diff) ** p, axis=2) ** (1.0 / p)
np.fill_diagonal(dist, np.inf)
while np.sum(choose) < max_size:
remaining = np.where(~choose)[0]
chosen = np.where(choose)[0]
if len(remaining) == 0:
break
min_dists = np.min(dist[np.ix_(remaining, chosen)], axis=1)
best = np.argmax(min_dists)
choose[remaining[best]] = True
return pop_objs[choose], pop_decs[choose], pop_mse[choose]
def _k_update_P(pop_objs, pop_decs, pop_cons, pop_mse, N, is_origin):
"""
Update predicted population P using SPEA2 fitness.
Returns (objs, decs, cons, mse) tuple.
"""
if is_origin and pop_cons is not None:
fitness = spea2_fitness(pop_objs, pop_cons)
else:
fitness = spea2_fitness(pop_objs)
n_pop = pop_objs.shape[0]
next_mask = fitness < 1.0
if np.sum(next_mask) < N:
sorted_idx = np.argsort(fitness)
selected = sorted_idx[:min(N, n_pop)]
elif np.sum(next_mask) > N:
candidates = np.where(next_mask)[0]
sel = spea2_truncation(pop_objs[candidates], N)
selected = candidates[sel]
else:
selected = np.where(next_mask)[0]
out_cons = pop_cons[selected] if pop_cons is not None else None
return pop_objs[selected], pop_decs[selected], out_cons, pop_mse[selected]
def _adaptive_sampling(CA_objs, DA_objs, CA_decs, DA_decs, DA_mse,
real_DA_objs, real_DA_decs, mu, p, phi):
"""
KTA2-style adaptive sampling with convergence/uncertainty/diversity strategies.
"""
combined_objs = np.vstack([CA_objs, DA_objs])
ideal_point = np.min(combined_objs, axis=0)
flag = _cal_convergence(CA_objs, DA_objs, ideal_point)
if flag == 1:
# Convergence sampling from CA
N = CA_objs.shape[0]
if N <= mu:
return CA_decs.copy()
fitness, I, C = ibea_fitness(CA_objs, kappa=0.05)
choose = list(range(N))
while len(choose) > mu:
fit_values = fitness[choose]
min_idx = np.argmin(fit_values)
to_remove = choose[min_idx]
if C[to_remove] > 1e-10:
fitness += np.exp(-I[to_remove, :] / C[to_remove] / 0.05)
choose.pop(min_idx)
return CA_decs[choose]
else:
pd_pred = _pure_diversity(DA_objs)
pd_real = _pure_diversity(real_DA_objs)
if pd_pred < pd_real:
# Uncertainty sampling
DA_n = DA_mse.shape[0]
subset_size = max(1, int(np.ceil(phi * DA_n)))
chosen = []
for _ in range(mu):
perm = np.random.permutation(DA_n)
subset_idx = perm[:subset_size]
n_obj_cols = min(DA_mse.shape[1], DA_objs.shape[1])
uncertainty = np.mean(DA_mse[subset_idx, :n_obj_cols], axis=1)
best = subset_idx[np.argmax(uncertainty)]
chosen.append(best)
return DA_decs[chosen]
else:
# Diversity sampling
all_objs = np.vstack([DA_objs, real_DA_objs])
min_vals = np.min(all_objs, axis=0)
max_vals = np.max(all_objs, axis=0)
range_vals = max_vals - min_vals
range_vals[range_vals == 0] = 1.0
DA_Nor = (real_DA_objs - min_vals) / range_vals
DA_Nor_pre = (DA_objs - min_vals) / range_vals
N_real = DA_Nor.shape[0]
Pop = np.vstack([DA_Nor, DA_Nor_pre])
Pop_dec = np.vstack([real_DA_decs, DA_decs])
NN = Pop.shape[0]
choose = np.zeros(NN, dtype=bool)
choose[:N_real] = True
target_size = N_real + mu
diff = Pop[:, np.newaxis, :] - Pop[np.newaxis, :, :]
dist_matrix = np.sum(np.abs(diff) ** p, axis=2) ** (1.0 / p)
np.fill_diagonal(dist_matrix, np.inf)
offspring = []
while np.sum(choose) < target_size:
remaining = np.where(~choose)[0]
chosen_idx = np.where(choose)[0]
if len(remaining) == 0:
break
min_dists = np.min(dist_matrix[np.ix_(remaining, chosen_idx)], axis=1)
best = np.argmax(min_dists)
choose[remaining[best]] = True
offspring.append(Pop_dec[remaining[best]])
if len(offspring) == 0:
idx = np.random.choice(DA_decs.shape[0],
size=min(mu, DA_decs.shape[0]), replace=False)
return DA_decs[idx]
return np.array(offspring)
def _kccmo_sampling(pop_objs, pop_decs, pop_cons, ref_objs, mu1):
"""
KCCMO-style sampling using K-means clustering and fitness selection.
Parameters
----------
pop_objs : np.ndarray
Population objectives
pop_decs : np.ndarray
Population decisions
pop_cons : np.ndarray or None
Population constraints
ref_objs : np.ndarray
Reference front objectives (from CA)
mu1 : int
Number of solutions to select
Returns
-------
selected_decs : np.ndarray
Selected decision variables
"""
N = pop_objs.shape[0]
n_clusters = min(mu1, N)
if N <= mu1:
return pop_decs.copy()
# Compute fitness using reference front
if ref_objs is not None and ref_objs.shape[0] > 0:
fitness = _cal_fitness_spea2_ref(pop_objs, pop_cons, ref_objs)
else:
fitness = spea2_fitness(pop_objs, pop_cons)
# K-means clustering on objectives
try:
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=n_clusters, n_init=3, random_state=None)
labels = kmeans.fit_predict(pop_objs)
except ImportError:
# Fallback: simple K-means
labels = _simple_kmeans(pop_objs, n_clusters)
# Select best from each cluster
selected = []
for c in range(n_clusters):
cluster_idx = np.where(labels == c)[0]
if len(cluster_idx) == 0:
continue
best_in_cluster = cluster_idx[np.argmin(fitness[cluster_idx])]
selected.append(best_in_cluster)
if len(selected) == 0:
idx = np.random.choice(N, size=min(mu1, N), replace=False)
return pop_decs[idx]
return pop_decs[selected]
def _simple_kmeans(X, k, max_iter=50):
"""Simple K-means fallback if sklearn unavailable."""
N = X.shape[0]
idx = np.random.choice(N, size=k, replace=False)
centers = X[idx].copy()
labels = np.zeros(N, dtype=int)
for _ in range(max_iter):
dist = cdist(X, centers)
new_labels = np.argmin(dist, axis=1)
if np.array_equal(labels, new_labels):
break
labels = new_labels
for c in range(k):
members = X[labels == c]
if len(members) > 0:
centers[c] = members.mean(axis=0)
return labels
def _cal_convergence(pop_obj1, pop_obj2, z_min):
"""
Wilcoxon signed-rank test comparing CA vs DA convergence.
Returns 1 if CA converges significantly better.
"""
N1 = pop_obj1.shape[0]
N2 = pop_obj2.shape[0]
if N1 != N2:
return 0
try:
pop_obj = np.vstack([pop_obj1, pop_obj2]) - z_min
denominator = np.max(pop_obj, axis=0) - z_min
denominator[np.abs(denominator) < 1e-10] = 1.0
pop_obj = pop_obj / denominator
distance1 = np.sqrt(np.clip(np.sum(pop_obj[:N1], axis=1), 0, None))
distance2 = np.sqrt(np.clip(np.sum(pop_obj[N1:], axis=1), 0, None))
diff = distance1 - distance2
abs_diff = np.abs(diff)
eps_tol = np.finfo(float).eps * (np.abs(distance1) + np.abs(distance2))
nonzero = abs_diff > eps_tol
if np.sum(nonzero) < 2:
return 0
diff_nz = diff[nonzero]
abs_diff_nz = abs_diff[nonzero]
ranks = rankdata(abs_diff_nz)
r1 = np.sum(ranks[diff_nz < 0])
_, p_value = wilcoxon(distance1[nonzero], distance2[nonzero])
flag = 1 if p_value <= 0.05 else 0
r2 = np.sum(ranks) - r1
if flag == 1 and (r1 - r2) < 0:
flag = 0
return flag
except Exception:
return 0
def _pure_diversity(pop_obj):
"""
Pure diversity using maximum spanning tree with Minkowski(0.1) distance.
"""
N = pop_obj.shape[0]
if N <= 1:
return 0.0
C = np.eye(N, dtype=bool)
D = cdist(pop_obj, pop_obj, metric='minkowski', p=0.1)
np.fill_diagonal(D, np.inf)
score = 0.0
for k in range(N - 1):
while True:
d = np.min(D, axis=1)
J = np.argmin(D, axis=1)
i_node = np.argmax(d)
j_node = J[i_node]
if D[j_node, i_node] != -np.inf:
D[j_node, i_node] = np.inf
if D[i_node, j_node] != -np.inf:
D[i_node, j_node] = np.inf
P = C[i_node].copy()
while not P[j_node]:
new_P = np.any(C[P], axis=0)
if np.array_equal(P, new_P):
break
P = new_P
if not P[j_node]:
break
C[i_node, j_node] = True
C[j_node, i_node] = True
D[i_node, :] = -np.inf
score += d[i_node]
return score
def _crossover_only(parents, muc=20):
"""SBX crossover only (no mutation)."""
n, d = parents.shape
offdecs = np.zeros((0, d))
parents = parents.copy()
np.random.shuffle(parents)
num_pairs = n // 2
for j in range(num_pairs):
offdec1, offdec2 = crossover(parents[j, :], parents[num_pairs + j, :], mu=muc)
offdecs = np.vstack((offdecs, offdec1, offdec2))
if n % 2 == 1:
offdec1, _ = crossover(parents[-1, :], parents[np.random.randint(0, n - 1), :], mu=muc)
offdecs = np.vstack((offdecs, offdec1))
return offdecs
def _mutation_only(parents, mum=20):
"""Polynomial mutation only (no crossover)."""
n, d = parents.shape
offdecs = np.zeros((n, d))
for j in range(n):
offdecs[j] = mutation(parents[j, :], mu=mum)
return offdecs
def _full_ga(parents, muc=20, mum=20):
"""Full GA: SBX crossover followed by polynomial mutation."""
off = _crossover_only(parents, muc=muc)
for j in range(off.shape[0]):
off[j] = mutation(off[j], mu=mum)
return off
def _not_in_archive(new_decs, archive_decs):
"""Return boolean mask of rows in new_decs NOT present in archive_decs."""
if new_decs.shape[0] == 0 or archive_decs.shape[0] == 0:
return np.ones(new_decs.shape[0], dtype=bool)
dist = cdist(new_decs, archive_decs)
return np.min(dist, axis=1) > 1e-5
def _get_best_objs(pop_objs, pop_cons):
"""
Get non-dominated feasible objectives (MATLAB: Population.best.objs).
Returns the Pareto front of feasible solutions, or all objectives if
no feasible solutions exist.
"""
N = pop_objs.shape[0]
if pop_cons is not None:
cv = np.sum(np.maximum(0, pop_cons), axis=1)
feasible = cv < 1e-10
if np.any(feasible):
feas_objs = pop_objs[feasible]
front_no, _ = nd_sort(feas_objs, feas_objs.shape[0])
return feas_objs[front_no == 1]
front_no, _ = nd_sort(pop_objs, N)
return pop_objs[front_no == 1]