"""
Kriging-assisted Reference Vector Guided Evolutionary Algorithm (K-RVEA)
This module implements K-RVEA for computationally expensive multi/many-objective optimization.
References
----------
[1] T. Chugh, Y. Jin, K. Miettinen, J. Hakanen, and K. Sindhya. A surrogate-assisted reference vector guided evolutionary algorithm for computationally expensive many-objective optimization. IEEE Transactions on Evolutionary Computation, 2018, 22(1): 129-142.
Notes
-----
Author: Jiangtao Shen
Date: 2025.01.11
Version: 2.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import mo_gp_build, mo_gp_predict
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Algorithms.STMO.RVEA import rvea_selection
import warnings
warnings.filterwarnings("ignore")
[docs]
class K_RVEA:
"""
Kriging-assisted Reference Vector Guided Evolutionary Algorithm for expensive
multi/many-objective optimization.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, n=100, alpha=2.0, wmax=20, mu=5,
save_data=True, save_path='./Data', name='K-RVEA', disable_tqdm=True):
"""
Initialize K-RVEA algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 11*dim-1)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 200)
n : int or List[int], optional
Population size (number of reference vectors) per task (default: 100)
alpha : float, optional
Parameter controlling the rate of change of penalty (default: 2.0)
wmax : int, optional
Number of generations before updating Kriging models (default: 20)
mu : int, optional
Number of re-evaluated solutions at each generation (default: 5)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'K-RVEA')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial
self.max_nfes = max_nfes if max_nfes is not None else 200
self.n = n
self.alpha = alpha
self.wmax = wmax
self.mu = mu
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the K-RVEA algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
data_type = torch.float
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_objs = problem.n_objs
# Set default initial samples: 11*dim - 1
if self.n_initial is None:
n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)]
else:
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
n_per_task = par_list(self.n, nt)
# Generate uniformly distributed reference vectors for each task
V0 = []
for i in range(nt):
v_i, actual_n = uniform_point(n_per_task[i], n_objs[i])
V0.append(v_i)
n_per_task[i] = actual_n
# Initialize adaptive reference vectors
V = [v.copy() for v in V0]
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, n_initial_per_task, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# A1: training archive (maintained at ~NI size for model building)
arc_decs = [d.copy() for d in decs]
arc_objs = [o.copy() for o in objs]
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
m = n_objs[i]
dim = dims[i]
NI = n_initial_per_task[i]
# Build GP models for each objective using training archive
models = mo_gp_build(arc_decs[i], arc_objs[i], data_type)
# --- Inner optimization loop on surrogates ---
pop_decs = arc_decs[i].copy()
for w in range(1, self.wmax + 1):
# Generate offspring via GA operators
off_decs = ga_generation(pop_decs, muc=20.0, mum=20.0)
# Merge parent and offspring
pop_decs = np.vstack([pop_decs, off_decs])
# Predict objectives and MSE for all solutions
pop_objs, pop_mse = mo_gp_predict(models, pop_decs, data_type, mse=True)
# Environmental selection using angle-penalized distance
theta = (w / self.wmax) ** self.alpha
cons_zero = np.zeros((pop_decs.shape[0], 1))
index = rvea_selection(pop_objs, cons_zero, V[i], theta)
pop_decs = pop_decs[index]
pop_objs = pop_objs[index]
pop_mse = pop_mse[index]
# Adapt reference vectors periodically
adapt_interval = max(1, int(np.ceil(self.wmax * 0.1)))
if w % adapt_interval == 0:
obj_range = pop_objs.max(axis=0) - pop_objs.min(axis=0)
V[i] = V0[i] * obj_range
# --- Infill selection: select mu solutions for re-evaluation ---
num_inactive_archive = _count_inactive(arc_objs[i], V0[i])
theta_final = ((self.wmax + 1) / self.wmax) ** self.alpha
new_decs = _kriging_select(
pop_decs, pop_objs, pop_mse, V[i], V0[i],
num_inactive_archive, 0.05 * n_per_task[i], self.mu, theta_final
)
# Remove duplicates against all previously evaluated solutions
new_decs = remove_duplicates(new_decs, decs[i])
if new_decs.shape[0] > 0:
# Evaluate new solutions with expensive objective function
new_objs, _ = evaluation_single(problem, new_decs, i)
# Update cumulative dataset (A2 in MATLAB)
decs[i] = np.vstack([decs[i], new_decs])
objs[i] = np.vstack([objs[i], new_objs])
# Update training archive (A1 in MATLAB)
arc_decs[i], arc_objs[i] = _update_archive(
arc_decs[i], arc_objs[i], new_decs, new_objs,
V[i], self.mu, NI
)
nfes_per_task[i] += new_decs.shape[0]
pbar.update(new_decs.shape[0])
pbar.close()
runtime = time.time() - start_time
all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu)
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, bounds=problem.bounds,
save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
def _count_inactive(objs, V):
"""
Count inactive reference vectors.
Parameters
----------
objs : np.ndarray
Objective values, shape (N, M)
V : np.ndarray
Reference vectors, shape (NV, M)
Returns
-------
num_inactive : int
Number of inactive reference vectors
"""
objs_translated = objs - objs.min(axis=0)
angle = np.arccos(np.clip(1 - cdist(objs_translated, V, metric='cosine'), -1, 1))
associate = np.argmin(angle, axis=1)
active = np.unique(associate)
return V.shape[0] - len(active)
def _get_active_info(objs, V):
"""
Get count of inactive vectors and indices of active vectors.
Parameters
----------
objs : np.ndarray
Objective values, shape (N, M)
V : np.ndarray
Reference vectors, shape (NV, M)
Returns
-------
num_inactive : int
Number of inactive reference vectors
active : np.ndarray
Indices of active reference vectors
"""
objs_translated = objs - objs.min(axis=0)
angle = np.arccos(np.clip(1 - cdist(objs_translated, V, metric='cosine'), -1, 1))
associate = np.argmin(angle, axis=1)
active = np.unique(associate)
num_inactive = V.shape[0] - len(active)
return num_inactive, active
def _kriging_select(pop_decs, pop_objs, pop_mse, V, V0, num_v1, delta, mu, theta):
"""
Select solutions for expensive re-evaluation (KrigingSelect in MATLAB).
Switches between exploitation (APD-based) and exploration (uncertainty-based)
depending on the change in inactive reference vectors.
Parameters
----------
pop_decs : np.ndarray
Population decisions after surrogate optimization
pop_objs : np.ndarray
Predicted objectives, shape (N, M)
pop_mse : np.ndarray
Predicted MSE, shape (N, M)
V : np.ndarray
Adapted reference vectors
V0 : np.ndarray
Original reference vectors
num_v1 : int
Number of inactive vectors in archive (against V0)
delta : float
Threshold for switching between exploitation and exploration
mu : int
Number of solutions to select
theta : float
APD penalty parameter
Returns
-------
selected_decs : np.ndarray
Selected decision variables for re-evaluation
"""
N, M = pop_objs.shape
# Compute NumV2: inactive vectors in surrogate population against V0
num_v2 = _count_inactive(pop_objs, V0)
# Get active reference vectors against V (adapted)
n_inactive_v, active_indices = _get_active_info(pop_objs, V)
n_clusters = min(mu, len(active_indices))
if n_clusters == 0:
indices = np.random.choice(N, size=min(mu, N), replace=False)
return pop_decs[indices]
Va = V[active_indices]
# Cluster active reference vectors
if n_clusters >= len(active_indices):
cluster_labels = np.arange(len(active_indices))
n_clusters = len(active_indices)
else:
cluster_labels = kmeans_clustering(Va, n_clusters)
# Translate objectives
objs_translated = pop_objs - pop_objs.min(axis=0)
# Compute gamma: smallest angle between each active vector and others
if Va.shape[0] > 1:
cosine = 1 - cdist(Va, Va, metric='cosine')
np.fill_diagonal(cosine, 0)
gamma = np.min(np.arccos(np.clip(cosine, -1, 1)), axis=1)
gamma = np.maximum(gamma, 1e-6)
else:
gamma = np.array([1.0])
# Associate each solution to its nearest active reference vector
angle = np.arccos(np.clip(1 - cdist(objs_translated, Va, metric='cosine'), -1, 1))
associate = np.argmin(angle, axis=1)
# Compute APD for all solutions
apd = np.ones(N)
for j in np.unique(associate):
current = np.where(associate == j)[0]
if len(current) > 0:
apd[current] = (1 + M * theta * angle[current, j] / gamma[j]) * \
np.sqrt(np.sum(objs_translated[current] ** 2, axis=1))
# Map solutions to clusters via their associated reference vector
cindex = cluster_labels[associate]
# Selection: exploitation or exploration based on inactive vector change
flag = num_v2 - num_v1
selected_indices = []
for c in np.unique(cindex):
current = np.where(cindex == c)[0]
if len(current) == 0:
continue
if flag <= delta:
# Exploitation: for each active vector in this cluster, find best APD solution,
# then select the overall best among them
t = np.unique(associate[current])
solution_best = []
for j in t:
current_s = np.where(associate == j)[0]
best_id = current_s[np.argmin(apd[current_s])]
solution_best.append(best_id)
solution_best = np.array(solution_best)
best = solution_best[np.argmin(apd[solution_best])]
selected_indices.append(best)
else:
# Exploration: select solution with highest mean MSE in this cluster
uncertainty = np.mean(pop_mse[current], axis=1)
best = current[np.argmax(uncertainty)]
selected_indices.append(best)
if len(selected_indices) == 0:
indices = np.random.choice(N, size=min(mu, N), replace=False)
return pop_decs[indices]
return pop_decs[selected_indices]
def _update_archive(arc_decs, arc_objs, new_decs, new_objs, V, mu, NI):
"""
Update training archive (UpdataArchive in MATLAB).
Maintains archive size at approximately NI by keeping newly evaluated solutions
and selecting diverse representatives from old solutions.
Parameters
----------
arc_decs : np.ndarray
Current archive decision variables
arc_objs : np.ndarray
Current archive objective values
new_decs : np.ndarray
Newly evaluated decision variables
new_objs : np.ndarray
Newly evaluated objective values
V : np.ndarray
Adapted reference vectors
mu : int
Number of newly evaluated solutions
NI : int
Target archive size
Returns
-------
updated_decs : np.ndarray
Updated archive decision variables
updated_objs : np.ndarray
Updated archive objective values
"""
# Merge and deduplicate
merged_decs = np.vstack([arc_decs, new_decs])
merged_objs = np.vstack([arc_objs, new_objs])
_, unique_idx = np.unique(merged_decs, axis=0, return_index=True)
unique_idx = np.sort(unique_idx)
merged_decs = merged_decs[unique_idx]
merged_objs = merged_objs[unique_idx]
if len(merged_decs) <= NI:
return merged_decs, merged_objs
# Separate new solutions from old in the merged (deduplicated) set
n_old_original = len(arc_decs)
new_mask = unique_idx >= n_old_original
old_mask = ~new_mask
old_decs = merged_decs[old_mask]
old_objs = merged_objs[old_mask]
kept_new_decs = merged_decs[new_mask]
kept_new_objs = merged_objs[new_mask]
n_select = NI - len(kept_new_decs)
if n_select <= 0:
return kept_new_decs[:NI], kept_new_objs[:NI]
if len(old_decs) <= n_select:
return np.vstack([old_decs, kept_new_decs]), np.vstack([old_objs, kept_new_objs])
# Select n_select diverse solutions from old using clustering
n_clusters = min(n_select, len(old_decs))
labels = kmeans_clustering(old_objs, n_clusters)
selected = []
for c in np.unique(labels):
cluster_idx = np.where(labels == c)[0]
selected.append(cluster_idx[np.random.randint(len(cluster_idx))])
selected = np.array(selected)
return np.vstack([old_decs[selected], kept_new_decs]), np.vstack([old_objs[selected], kept_new_objs])