"""
Kriging-Assisted Two-Archive Evolutionary Algorithm 2 (KTA2)
This module implements KTA2 for computationally expensive many-objective optimization.
It maintains two archives (convergence and diversity) and uses point-insensitive
Kriging models with adaptive sampling strategies.
References
----------
[1] Z. Song, H. Wang, C. He, and Y. Jin. A Kriging-assisted two-archive evolutionary algorithm for expensive many-objective optimization. IEEE Transactions on Evolutionary Computation, 2021, 25(6): 1013-1027.
Notes
-----
Author: Jiangtao Shen
Date: 2026.02.16
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from scipy.spatial.distance import cdist
from scipy.stats import wilcoxon, rankdata
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict
import warnings
warnings.filterwarnings("ignore")
[docs]
class KTA2:
"""
Kriging-Assisted Two-Archive Evolutionary Algorithm 2 for expensive
many-objective optimization.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, n=100,
tau=0.75, phi=0.1, wmax=10, mu=5,
save_data=True, save_path='./Data', name='KTA2', disable_tqdm=True):
"""
Initialize KTA2 algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 11*dim-1)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 200)
n : int or List[int], optional
Population/archive size per task (default: 100)
tau : float, optional
Proportion of training data for insensitive models (default: 0.75)
phi : float, optional
Fraction of DA for uncertainty sampling (default: 0.1)
wmax : int, optional
Number of inner surrogate evolution generations (default: 10)
mu : int, optional
Number of re-evaluated solutions per generation (default: 5)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'KTA2')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial
self.max_nfes = max_nfes if max_nfes is not None else 200
self.n = n
self.tau = tau
self.phi = phi
self.wmax = wmax
self.mu = mu
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the KTA2 algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
data_type = torch.float
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_objs = problem.n_objs
# Set default initial samples: 11*dim - 1
if self.n_initial is None:
n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)]
else:
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
n_per_task = par_list(self.n, nt)
# Initialize with LHS
decs = initialization(problem, n_initial_per_task, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Initialize archives for each task
CAs = [] # Convergence Archives: list of (objs, decs)
DAs = [] # Diversity Archives: list of (objs, decs)
for i in range(nt):
CA_size_i = n_per_task[i]
CA_objs, CA_decs = self._update_CA(None, objs[i], decs[i], CA_size_i)
DA_objs, DA_decs = objs[i].copy(), decs[i].copy()
CAs.append((CA_objs, CA_decs))
DAs.append((DA_objs, DA_decs))
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
M = n_objs[i]
dim = dims[i]
p_i = 1.0 / M
CA_size_i = n_per_task[i]
N = n_per_task[i]
# ===== Build Sensitive Models (one per objective on full data) =====
sensitive_models = []
for j in range(M):
model = gp_build(decs[i], objs[i][:, j:j + 1], data_type)
sensitive_models.append(model)
# ===== Build Insensitive Models (two per objective on subsets) =====
num_samples = decs[i].shape[0]
num_top = int(np.ceil(num_samples * self.tau))
insensitive_models = [[None, None] for _ in range(M)]
centers = np.zeros((M, 2))
for j in range(M):
sorted_idx = np.argsort(objs[i][:, j])
# Group 0: best tau fraction, Group 1: worst tau fraction
group_indices = [sorted_idx[:num_top], sorted_idx[-(num_top + 1):]]
for g in range(2):
centers[j, g] = np.mean(objs[i][group_indices[g], j])
train_X = decs[i][group_indices[g]]
train_Y = objs[i][group_indices[g], j:j + 1]
insensitive_models[j][g] = gp_build(train_X, train_Y, data_type)
# ===== Inner Loop: Surrogate-based Evolution =====
CA_objs_pred = CAs[i][0].copy()
CA_decs_pred = CAs[i][1].copy()
DA_objs_pred = DAs[i][0].copy()
DA_decs_pred = DAs[i][1].copy()
DA_mse_pred = np.zeros_like(DA_objs_pred)
for w in range(self.wmax):
# Mating selection
parentC_decs, parentM_decs = self._mating_selection(
CA_objs_pred, CA_decs_pred, DA_objs_pred, DA_decs_pred, N
)
# Generate offspring
# Crossover only (proC=1, disC=20, proM=0): matches MATLAB {1,20,0,0}
off_C = _crossover_only(parentC_decs, muc=20)
# Mutation only (proC=0, proM=1, disM=20): matches MATLAB {0,0,1,20}
off_M = _mutation_only(parentM_decs, mum=20)
off_decs_all = np.vstack([off_C, off_M])
# Combine populations: DA + CA + offspring
pop_decs = np.vstack([DA_decs_pred, CA_decs_pred, off_decs_all])
# Predict using insensitive models
pop_objs, pop_mse = self._predict_with_insensitive_models(
pop_decs, sensitive_models, insensitive_models, centers, M, data_type
)
# Update predicted CA and DA
CA_objs_pred, CA_decs_pred, _ = self._k_update_CA(
pop_objs, pop_decs, pop_mse, CA_size_i
)
DA_objs_pred, DA_decs_pred, DA_mse_pred = self._k_update_DA(
pop_objs, pop_decs, pop_mse, N, p_i
)
# ===== Adaptive Sampling =====
offspring_decs = self._adaptive_sampling(
CA_objs_pred, DA_objs_pred, CA_decs_pred, DA_decs_pred,
DA_mse_pred, DAs[i][0], DAs[i][1], self.mu, p_i, self.phi
)
# Remove duplicates against existing evaluated data
offspring_decs = remove_duplicates(offspring_decs, decs[i])
if offspring_decs.shape[0] > 0:
# Evaluate with real function
off_objs, _ = evaluation_single(problem, offspring_decs, i)
# Update all evaluated data
decs[i] = np.vstack([decs[i], offspring_decs])
objs[i] = np.vstack([objs[i], off_objs])
# Update real CA and DA
CA_objs, CA_decs = self._update_CA(
CAs[i], off_objs, offspring_decs, CA_size_i
)
DA_objs, DA_decs = self._update_DA(
DAs[i], off_objs, offspring_decs, N, p_i
)
CAs[i] = (CA_objs, CA_decs)
DAs[i] = (DA_objs, DA_decs)
nfes_per_task[i] += offspring_decs.shape[0]
pbar.update(offspring_decs.shape[0])
pbar.close()
runtime = time.time() - start_time
all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu)
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, bounds=problem.bounds,
save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# =========================================================================
# Helper Methods
# =========================================================================
@staticmethod
def _mating_selection(CA_objs, CA_decs, DA_objs, DA_decs, N):
"""
Select parents from CA and DA for offspring generation.
Parameters
----------
CA_objs : np.ndarray
Convergence archive objectives
CA_decs : np.ndarray
Convergence archive decisions
DA_objs : np.ndarray
Diversity archive objectives
DA_decs : np.ndarray
Diversity archive decisions
N : int
Population size
Returns
-------
parentC_decs : np.ndarray
Parents for SBX crossover (convergence-oriented)
parentM_decs : np.ndarray
Parents for polynomial mutation (diversity-oriented)
"""
CA_n = CA_objs.shape[0]
DA_n = DA_objs.shape[0]
half_N = int(np.ceil(N / 2))
# Select from CA with dominance comparison
idx1 = np.random.randint(0, CA_n, size=half_N)
idx2 = np.random.randint(0, CA_n, size=half_N)
any_less = np.any(CA_objs[idx1] < CA_objs[idx2], axis=1)
any_greater = np.any(CA_objs[idx1] > CA_objs[idx2], axis=1)
dominate = any_less.astype(int) - any_greater.astype(int)
# Keep parent1 if it dominates, parent2 otherwise
selected_CA = np.where(dominate == 1, idx1, idx2)
# Random parents from DA
selected_DA = np.random.randint(0, DA_n, size=half_N)
parentC_decs = np.vstack([CA_decs[selected_CA], DA_decs[selected_DA]])
# Mutation parents: random from CA
parentM_decs = CA_decs[np.random.randint(0, CA_n, size=N)]
return parentC_decs, parentM_decs
@staticmethod
def _update_CA(CA, new_objs, new_decs, max_size):
"""
Update Convergence Archive using IBEA fitness (UpdateCA.m).
Parameters
----------
CA : tuple or None
Current CA as (objs, decs) or None for initial
new_objs : np.ndarray
New objective values
new_decs : np.ndarray
New decision variables
max_size : int
Maximum archive size
Returns
-------
CA_objs : np.ndarray
Updated CA objectives
CA_decs : np.ndarray
Updated CA decisions
"""
if CA is None:
CA_objs = new_objs.copy()
CA_decs = new_decs.copy()
else:
CA_objs = np.vstack([CA[0], new_objs])
CA_decs = np.vstack([CA[1], new_decs])
N = CA_objs.shape[0]
if N <= max_size:
return CA_objs, CA_decs
# IBEA fitness-based selection
fitness, I, C = ibea_fitness(CA_objs, kappa=0.05)
choose = list(range(N))
while len(choose) > max_size:
fit_values = fitness[choose]
min_idx = np.argmin(fit_values)
to_remove = choose[min_idx]
if C[to_remove] > 1e-10:
fitness += np.exp(-I[to_remove, :] / C[to_remove] / 0.05)
choose.pop(min_idx)
return CA_objs[choose], CA_decs[choose]
@staticmethod
def _update_DA(DA, new_objs, new_decs, max_size, p):
"""
Update Diversity Archive with non-dominated sorting and truncation (UpdateDA.m).
Parameters
----------
DA : tuple or None
Current DA as (objs, decs) or None
new_objs : np.ndarray
New objective values
new_decs : np.ndarray
New decision variables
max_size : int
Maximum archive size
p : float
Parameter for fractional distance norm
Returns
-------
DA_objs : np.ndarray
Updated DA objectives
DA_decs : np.ndarray
Updated DA decisions
"""
if DA is None:
DA_objs = new_objs.copy()
DA_decs = new_decs.copy()
else:
DA_objs = np.vstack([DA[0], new_objs])
DA_decs = np.vstack([DA[1], new_decs])
# Non-dominated sorting
N = DA_objs.shape[0]
front_no, _ = nd_sort(DA_objs, N)
nd_mask = front_no == 1
DA_objs = DA_objs[nd_mask]
DA_decs = DA_decs[nd_mask]
N = DA_objs.shape[0]
if N <= max_size:
return DA_objs, DA_decs
# Select extreme solutions (min and max for each objective)
choose = np.zeros(N, dtype=bool)
for m in range(DA_objs.shape[1]):
choose[np.argmin(DA_objs[:, m])] = True
choose[np.argmax(DA_objs[:, m])] = True
if np.sum(choose) > max_size:
chosen_idx = np.where(choose)[0]
to_remove = np.random.choice(chosen_idx, size=np.sum(choose) - max_size, replace=False)
choose[to_remove] = False
elif np.sum(choose) < max_size:
# Truncation strategy: add solutions that maximize min distance to chosen set
diff = DA_objs[:, np.newaxis, :] - DA_objs[np.newaxis, :, :]
dist = np.sum(np.abs(diff) ** p, axis=2) ** (1.0 / p)
np.fill_diagonal(dist, np.inf)
while np.sum(choose) < max_size:
remaining = np.where(~choose)[0]
chosen = np.where(choose)[0]
min_dists = np.min(dist[np.ix_(remaining, chosen)], axis=1)
best = np.argmax(min_dists)
choose[remaining[best]] = True
return DA_objs[choose], DA_decs[choose]
@staticmethod
def _k_update_CA(pop_objs, pop_decs, pop_mse, max_size):
"""
Update predicted CA using IBEA fitness (K_UpdateCA.m).
Parameters
----------
pop_objs : np.ndarray
Predicted objective values
pop_decs : np.ndarray
Decision variables
pop_mse : np.ndarray
Predicted MSE values
max_size : int
Maximum archive size
Returns
-------
CA_objs, CA_decs, CA_mse : np.ndarray
Selected archive members
"""
N = pop_objs.shape[0]
if N <= max_size:
return pop_objs.copy(), pop_decs.copy(), pop_mse.copy()
fitness, I, C = ibea_fitness(pop_objs, kappa=0.05)
choose = list(range(N))
while len(choose) > max_size:
fit_values = fitness[choose]
min_idx = np.argmin(fit_values)
to_remove = choose[min_idx]
if C[to_remove] > 1e-10:
fitness += np.exp(-I[to_remove, :] / C[to_remove] / 0.05)
choose.pop(min_idx)
return pop_objs[choose], pop_decs[choose], pop_mse[choose]
@staticmethod
def _k_update_DA(pop_objs, pop_decs, pop_mse, max_size, p):
"""
Update predicted DA with ND sort + truncation on normalized objectives (K_UpdateDA.m).
Parameters
----------
pop_objs : np.ndarray
Predicted objective values
pop_decs : np.ndarray
Decision variables
pop_mse : np.ndarray
Predicted MSE values
max_size : int
Maximum archive size
p : float
Parameter for fractional distance norm
Returns
-------
DA_objs, DA_decs, DA_mse : np.ndarray
Selected archive members
"""
N = pop_objs.shape[0]
# Normalize objectives BEFORE non-dominated sorting
min_vals = np.min(pop_objs, axis=0)
max_vals = np.max(pop_objs, axis=0)
range_vals = max_vals - min_vals
range_vals[range_vals == 0] = 1.0
pop_objs_norm = (pop_objs - min_vals) / range_vals
# Non-dominated sorting on original objectives
front_no, _ = nd_sort(pop_objs, N)
nd_mask = front_no == 1
pop_objs = pop_objs[nd_mask]
pop_decs = pop_decs[nd_mask]
pop_mse = pop_mse[nd_mask]
pop_objs_norm = pop_objs_norm[nd_mask]
N = pop_objs.shape[0]
if N <= max_size:
return pop_objs, pop_decs, pop_mse
# Initial selection: random index from 1 to M (matching MATLAB)
M = pop_objs_norm.shape[1]
choose = np.zeros(N, dtype=bool)
select = np.random.permutation(M)
if select[0] < N:
choose[select[0]] = True
else:
choose[0] = True
if np.sum(choose) < max_size:
# Truncation using normalized objectives
diff = pop_objs_norm[:, np.newaxis, :] - pop_objs_norm[np.newaxis, :, :]
dist = np.sum(np.abs(diff) ** p, axis=2) ** (1.0 / p)
np.fill_diagonal(dist, np.inf)
while np.sum(choose) < max_size:
remaining = np.where(~choose)[0]
chosen = np.where(choose)[0]
if len(remaining) == 0:
break
min_dists = np.min(dist[np.ix_(remaining, chosen)], axis=1)
best = np.argmax(min_dists)
choose[remaining[best]] = True
return pop_objs[choose], pop_decs[choose], pop_mse[choose]
@staticmethod
def _predict_with_insensitive_models(pop_decs, sensitive_models, insensitive_models,
centers, M, data_type):
"""
Predict objectives and MSE using sensitive/insensitive model framework.
For each objective: first predict with sensitive model to determine which
insensitive model to use, then predict with the selected insensitive model.
Parameters
----------
pop_decs : np.ndarray
Population decision variables, shape (N, D)
sensitive_models : list
List of M sensitive GP models
insensitive_models : list
List of M lists, each containing 2 insensitive GP models
centers : np.ndarray
Centers for model selection, shape (M, 2)
M : int
Number of objectives
data_type : torch.dtype
Data type for GP prediction
Returns
-------
pop_objs : np.ndarray
Predicted objectives, shape (N, M)
pop_mse : np.ndarray
Predicted MSE, shape (N, M)
"""
pop_N = pop_decs.shape[0]
pop_objs = np.zeros((pop_N, M))
pop_mse = np.zeros((pop_N, M))
for j in range(M):
# Batch predict with sensitive model
sens_pred, _ = gp_predict(sensitive_models[j], pop_decs, data_type)
sens_pred = sens_pred.flatten()
# Determine which insensitive model to use
dist_to_center0 = np.abs(sens_pred - centers[j, 0])
dist_to_center1 = np.abs(sens_pred - centers[j, 1])
use_model0 = dist_to_center0 <= dist_to_center1
# Batch predict with insensitive model 0
idx0 = np.where(use_model0)[0]
if len(idx0) > 0:
pred0, std0 = gp_predict(insensitive_models[j][0], pop_decs[idx0], data_type)
pop_objs[idx0, j] = pred0.flatten()
pop_mse[idx0, j] = (std0.flatten()) ** 2
# Batch predict with insensitive model 1
idx1 = np.where(~use_model0)[0]
if len(idx1) > 0:
pred1, std1 = gp_predict(insensitive_models[j][1], pop_decs[idx1], data_type)
pop_objs[idx1, j] = pred1.flatten()
pop_mse[idx1, j] = (std1.flatten()) ** 2
return pop_objs, pop_mse
@staticmethod
def _adaptive_sampling(CA_objs, DA_objs, CA_decs, DA_decs, DA_mse,
real_DA_objs, real_DA_decs, mu, p, phi):
"""
Select solutions for expensive re-evaluation using adaptive strategy.
Three strategies:
1. Convergence sampling: IBEA-based selection from predicted CA
2. Uncertainty sampling: highest MSE from predicted DA
3. Diversity sampling: truncation-based from predicted DA
Parameters
----------
CA_objs, DA_objs : np.ndarray
Predicted archive objectives
CA_decs, DA_decs : np.ndarray
Predicted archive decisions
DA_mse : np.ndarray
Predicted DA MSE values
real_DA_objs, real_DA_decs : np.ndarray
Real (evaluated) DA objectives and decisions
mu : int
Number of solutions to select
p : float
Distance norm parameter
phi : float
Fraction of DA for uncertainty sampling
Returns
-------
offspring_decs : np.ndarray
Selected decision variables for re-evaluation
"""
# Compute ideal point
combined_objs = np.vstack([CA_objs, DA_objs])
ideal_point = np.min(combined_objs, axis=0)
# Check convergence
flag = KTA2._cal_convergence(CA_objs, DA_objs, ideal_point)
if flag == 1:
# Strategy 1: Convergence sampling from predicted CA
N = CA_objs.shape[0]
if N <= mu:
return CA_decs.copy()
fitness, I, C = ibea_fitness(CA_objs, kappa=0.05)
choose = list(range(N))
while len(choose) > mu:
fit_values = fitness[choose]
min_idx = np.argmin(fit_values)
to_remove = choose[min_idx]
if C[to_remove] > 1e-10:
fitness += np.exp(-I[to_remove, :] / C[to_remove] / 0.05)
choose.pop(min_idx)
return CA_decs[choose]
else:
pd_pred = KTA2._pure_diversity(DA_objs)
pd_real = KTA2._pure_diversity(real_DA_objs)
if pd_pred < pd_real:
# Strategy 2: Uncertainty sampling from predicted DA
DA_n = DA_mse.shape[0]
subset_size = max(1, int(np.ceil(phi * DA_n)))
chosen = []
for _ in range(mu):
perm = np.random.permutation(DA_n)
subset_idx = perm[:subset_size]
uncertainty = np.mean(DA_mse[subset_idx], axis=1)
best = subset_idx[np.argmax(uncertainty)]
chosen.append(best)
return DA_decs[chosen]
else:
# Strategy 3: Diversity sampling via truncation
all_objs = np.vstack([DA_objs, real_DA_objs])
min_vals = np.min(all_objs, axis=0)
max_vals = np.max(all_objs, axis=0)
range_vals = max_vals - min_vals
range_vals[range_vals == 0] = 1.0
DA_Nor = (real_DA_objs - min_vals) / range_vals
DA_Nor_pre = (DA_objs - min_vals) / range_vals
N_real = DA_Nor.shape[0]
Pop = np.vstack([DA_Nor, DA_Nor_pre])
Pop_dec = np.vstack([real_DA_decs, DA_decs])
NN = Pop.shape[0]
# Start with all real DA chosen
choose = np.zeros(NN, dtype=bool)
choose[:N_real] = True
target_size = N_real + mu
# Compute pairwise distances
diff = Pop[:, np.newaxis, :] - Pop[np.newaxis, :, :]
dist_matrix = np.sum(np.abs(diff) ** p, axis=2) ** (1.0 / p)
np.fill_diagonal(dist_matrix, np.inf)
offspring = []
while np.sum(choose) < target_size:
remaining = np.where(~choose)[0]
chosen_idx = np.where(choose)[0]
if len(remaining) == 0:
break
min_dists = np.min(dist_matrix[np.ix_(remaining, chosen_idx)], axis=1)
best = np.argmax(min_dists)
choose[remaining[best]] = True
offspring.append(Pop_dec[remaining[best]])
if len(offspring) == 0:
idx = np.random.choice(DA_decs.shape[0], size=min(mu, DA_decs.shape[0]),
replace=False)
return DA_decs[idx]
return np.array(offspring)
@staticmethod
def _cal_convergence(pop_obj1, pop_obj2, z_min):
"""
Check if predicted CA converges better than predicted DA using
Wilcoxon signed rank test (Cal_Convergence.m).
Parameters
----------
pop_obj1 : np.ndarray
Predicted CA objectives
pop_obj2 : np.ndarray
Predicted DA objectives
z_min : np.ndarray
Ideal point
Returns
-------
flag : int
1 if CA converges significantly better, 0 otherwise
"""
N1 = pop_obj1.shape[0]
N2 = pop_obj2.shape[0]
if N1 != N2:
return 0
try:
# Translate and normalize (matching MATLAB implementation)
pop_obj = np.vstack([pop_obj1, pop_obj2]) - z_min
denominator = np.max(pop_obj, axis=0) - z_min
denominator[np.abs(denominator) < 1e-10] = 1.0
pop_obj = pop_obj / denominator
# Compute distance to origin: sqrt(sum of elements)
distance1 = np.sqrt(np.clip(np.sum(pop_obj[:N1], axis=1), 0, None))
distance2 = np.sqrt(np.clip(np.sum(pop_obj[N1:], axis=1), 0, None))
# Remove pairs with negligible difference
diff = distance1 - distance2
abs_diff = np.abs(diff)
eps_tol = np.finfo(float).eps * (np.abs(distance1) + np.abs(distance2))
nonzero = abs_diff > eps_tol
if np.sum(nonzero) < 2:
return 0
diff_nz = diff[nonzero]
abs_diff_nz = abs_diff[nonzero]
n = len(diff_nz)
# Compute rank sums
ranks = rankdata(abs_diff_nz)
r1 = np.sum(ranks[diff_nz < 0]) # sum of ranks where CA closer (better)
r2 = n * (n + 1) / 2 - r1
# Wilcoxon signed rank test
_, p_value = wilcoxon(distance1[nonzero], distance2[nonzero])
flag = 1 if p_value <= 0.05 else 0
# If significant but DA converges better, set flag to 0
if flag == 1 and (r1 - r2) < 0:
flag = 0
return flag
except Exception:
return 0
@staticmethod
def _pure_diversity(pop_obj):
"""
Compute pure diversity metric using spanning tree approach (PD function).
Uses Minkowski distance with p=0.1 to build a maximum spanning tree
and returns the sum of edge weights.
Parameters
----------
pop_obj : np.ndarray
Objective values, shape (N, M)
Returns
-------
score : float
Pure diversity score
"""
N = pop_obj.shape[0]
if N <= 1:
return 0.0
# Connectivity matrix (each node connected to itself initially)
C = np.eye(N, dtype=bool)
# Minkowski distance with p=0.1
D = cdist(pop_obj, pop_obj, metric='minkowski', p=0.1)
np.fill_diagonal(D, np.inf)
score = 0.0
for k in range(N - 1):
while True:
d = np.min(D, axis=1)
J = np.argmin(D, axis=1)
i = np.argmax(d)
j = J[i]
# Mark edge as visited
if D[j, i] != -np.inf:
D[j, i] = np.inf
if D[i, j] != -np.inf:
D[i, j] = np.inf
# Check if i and j are already connected via BFS
P = C[i].copy()
while not P[j]:
new_P = np.any(C[P], axis=0)
if np.array_equal(P, new_P):
break
P = new_P
if not P[j]:
break
C[i, j] = True
C[j, i] = True
D[i, :] = -np.inf
score += d[i]
return score
def _crossover_only(parents, muc=20):
"""
Generate offspring using SBX crossover only (no mutation).
Matches MATLAB OperatorGA with {proC=1, disC=20, proM=0, disM=0}.
Parameters
----------
parents : np.ndarray
Parent population, shape (n, d)
muc : float
Distribution index for crossover
Returns
-------
offdecs : np.ndarray
Offspring decision variables, shape (n, d)
"""
n, d = parents.shape
offdecs = np.zeros((0, d))
parents = parents.copy()
np.random.shuffle(parents)
num_pairs = n // 2
for j in range(num_pairs):
offdec1, offdec2 = crossover(parents[j, :], parents[num_pairs + j, :], mu=muc)
offdecs = np.vstack((offdecs, offdec1, offdec2))
if n % 2 == 1:
offdec1, _ = crossover(parents[-1, :], parents[np.random.randint(0, n - 1), :], mu=muc)
offdecs = np.vstack((offdecs, offdec1))
return offdecs
def _mutation_only(parents, mum=20):
"""
Generate offspring using polynomial mutation only (no crossover).
Matches MATLAB OperatorGA with {proC=0, disC=0, proM=1, disM=20}.
Parameters
----------
parents : np.ndarray
Parent population, shape (n, d)
mum : float
Distribution index for mutation
Returns
-------
offdecs : np.ndarray
Offspring decision variables, shape (n, d)
"""
n, d = parents.shape
offdecs = np.zeros((n, d))
for j in range(n):
offdecs[j] = mutation(parents[j, :], mu=mum)
return offdecs