"""
Three-Level Radial Basis Function Method (TLRBF)
This module implements TLRBF for expensive single-objective optimization problems.
References
----------
[1] Li, Genghui, et al. "A three-level radial basis function method for expensive optimization." IEEE Transactions on Cybernetics 52.7 (2021): 5720-5731.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.13
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.interpolate import RBFInterpolator
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Algorithms.STSO.GA import GA
from ddmtolab.Methods.mtop import MTOP
import warnings
warnings.filterwarnings("ignore")
[docs]
class TLRBF:
"""
Three-Level Radial Basis Function Method for expensive optimization problems.
This algorithm uses three search strategies in rotation:
1. Global search: Random sampling with distance filtering
2. Subregion search: FCM clustering + local RBF models
3. Local search: K-nearest neighbors + local RBF model
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, save_data=True,
save_path='./Data', name='TLRBF', disable_tqdm=True):
"""
Initialize TLRBF algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 300)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'TLRBF_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 300
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the TLRBF algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Current working dataset (accumulated samples)
current_decs = [decs[i].copy() for i in range(nt)]
current_objs = [objs[i].copy() for i in range(nt)]
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Skip tasks that have exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
dim = dims[i]
# Determine search state: 0=global, 1=subregion, 2=local
state = (nfes_per_task[i] - n_initial_per_task[i]) % 3
if state == 0:
# ===== Global Search =====
candidate_np = self._global_search(current_decs[i], current_objs[i], dim)
elif state == 1:
# ===== Subregion Search =====
candidate_np = self._subregion_search(current_decs[i], current_objs[i], dim)
else: # state == 2
# ===== Local Search =====
candidate_np = self._local_search(current_decs[i], current_objs[i], dim)
# Ensure uniqueness before evaluation
candidate_np = self._ensure_uniqueness(candidate_np, current_decs[i], dim)
# Evaluate the candidate solution
obj, _ = evaluation_single(problem, candidate_np, i)
# Update current working dataset
current_decs[i] = np.vstack([current_decs[i], candidate_np])
current_objs[i] = np.vstack([current_objs[i], obj])
nfes_per_task[i] += 1
pbar.update(1)
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [current_decs[i].copy() for i in range(nt)]
db_objs = [current_objs[i].copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1)
# Build and save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# Build RBF surrogate model
def _build_rbf_model(self, X, Y):
Y = Y.flatten()
# Calculate spread parameter
n_samples, dim = X.shape
if n_samples > 1:
# Compute pairwise distances
dist_matrix = cdist(X, X, metric='euclidean')
max_dist = dist_matrix.max()
# Adaptive spread estimation
spread = max_dist / (dim * n_samples) ** (1.0 / dim)
else:
spread = 1.0
# Use Gaussian RBF
try:
rbf_model = RBFInterpolator(X, Y, kernel='gaussian', epsilon=1.0 / spread)
except Exception:
# Fallback to thin_plate_spline if gaussian fails
rbf_model = RBFInterpolator(X, Y, kernel='thin_plate_spline')
return rbf_model
# Global search with distance filtering
def _global_search(self, decs_i, objs_i, dim):
alpha = 0.4
m = 200 * dim
# Build RBF model
rbf_model = self._build_rbf_model(decs_i, objs_i)
# Generate random candidates in [0,1]^dim
solutions_global = np.random.rand(m, dim)
# Distance filtering
dist = cdist(solutions_global, decs_i, metric='euclidean')
mindist = np.min(dist, axis=1)
deltag = alpha * np.max(mindist)
# Remove candidates too close to existing points
valid_mask = mindist > deltag
solutions_global = solutions_global[valid_mask]
if len(solutions_global) == 0:
# If all filtered out, return a random point
return np.random.rand(1, dim)
# Predict and select best
objs_pre = rbf_model(solutions_global)
idx = np.argmin(objs_pre)
candidate = solutions_global[idx:idx + 1, :]
return candidate
# Subregion search using FCM clustering
def _subregion_search(self, decs_i, objs_i, dim):
N = len(decs_i)
L1 = 100
L2 = 80
if N <= L1:
# Use all data
X_subregion = decs_i
Y_subregion = objs_i
lb_subregion = np.zeros(dim)
ub_subregion = np.ones(dim)
else:
# Compute number of clusters
no_clusters = 1 + int(np.ceil((N - L1) / L2))
# Normalize data for clustering
X_min = decs_i.min(axis=0)
X_max = decs_i.max(axis=0)
X_range = X_max - X_min
X_range[X_range < 1e-10] = 1.0
X_normalized = (decs_i - X_min) / X_range
# Perform KMeans clustering (approximation of FCM)
kmeans = KMeans(n_clusters=no_clusters, random_state=42, n_init=10)
labels = kmeans.fit_predict(X_normalized)
# Compute soft membership using inverse distance weighting
distances = kmeans.transform(X_normalized)
# Avoid division by zero
distances = np.maximum(distances, 1e-10)
U = 1.0 / distances
U = U / U.sum(axis=1, keepdims=True)
# Select top L1 points from each cluster based on membership
X_clusters = []
Y_clusters = []
mean_objs = []
for k in range(no_clusters):
# Sort by membership in cluster k (descending)
membership_k = U[:, k]
idx_sorted = np.argsort(-membership_k)[:L1]
X_clusters.append(decs_i[idx_sorted])
Y_clusters.append(objs_i[idx_sorted])
mean_objs.append(np.mean(objs_i[idx_sorted]))
# Probabilistic cluster selection (MATLAB: [~,idx]=sort(mper,'descend'); pro=idx/K)
mean_objs = np.array(mean_objs)
idx_desc = np.argsort(-mean_objs) # descending sort indices
pro = np.zeros(no_clusters)
for pos in range(no_clusters):
pro[pos] = (idx_desc[pos] + 1) / no_clusters # MATLAB 1-indexed
# Rejection sampling (MATLAB: while rand > pro(sid))
selected_cluster = None
while selected_cluster is None:
sid = np.random.randint(0, no_clusters)
if np.random.rand() <= pro[sid]:
selected_cluster = sid
X_subregion = X_clusters[selected_cluster]
Y_subregion = Y_clusters[selected_cluster]
# Define subregion bounds
lb_subregion = np.min(X_subregion, axis=0)
ub_subregion = np.max(X_subregion, axis=0)
# Build RBF model on subregion
rbf_model = self._build_rbf_model(X_subregion, Y_subregion)
# Optimize using GA on surrogate (50 generations)
candidate = self._optimize_surrogate(rbf_model, lb_subregion, ub_subregion, dim, max_gen=50)
return candidate
# Local search using k-nearest neighbors
def _local_search(self, decs_i, objs_i, dim):
k = min(2 * dim, len(decs_i) - 1)
k = max(k, 1) # Ensure at least 1 neighbor
# Find k nearest neighbors to the best point
idx_min = np.argmin(objs_i)
dist = cdist(decs_i, decs_i[idx_min:idx_min + 1], metric='euclidean').flatten()
idx_sorted = np.argsort(dist)[:k] # k points total including best (matches MATLAB)
X_local = decs_i[idx_sorted]
Y_local = objs_i[idx_sorted]
# Build RBF model on local region
rbf_model = self._build_rbf_model(X_local, Y_local)
# Define local bounds
lb_local = np.min(X_local, axis=0)
ub_local = np.max(X_local, axis=0)
# Optimize using GA on surrogate (50 generations, matching MATLAB)
candidate = self._optimize_surrogate(rbf_model, lb_local, ub_local, dim, max_gen=50)
return candidate
# Optimize surrogate model using Genetic Algorithm
def _optimize_surrogate(self, surrogate_model, lb, ub, dim, n_pop=50, max_gen=50):
# Ensure bounds are valid
lb = np.asarray(lb)
ub = np.asarray(ub)
# Handle case where lb == ub
range_mask = (ub - lb) < 1e-10
if np.any(range_mask):
ub[range_mask] = lb[range_mask] + 1e-6
# Create surrogate problem wrapper
def surrogate_func(x):
x_denorm = lb + x * (ub - lb)
return surrogate_model(x_denorm).reshape(-1, 1)
surrogate_problem = MTOP()
surrogate_problem.add_task(objective_func=surrogate_func, dim=dim)
# Run GA
max_nfes = n_pop * max_gen
ga = GA(surrogate_problem, n=n_pop, max_nfes=max_nfes, disable_tqdm=True)
results = ga.optimize()
# Get best solution in [0,1] space
best_x_normalized = results.best_decs[0].reshape(1, -1)
# Denormalize to [lb, ub] space
best_solution = lb + best_x_normalized * (ub - lb)
# Ensure solution is within [0, 1] (original problem bounds)
best_solution = np.clip(best_solution, 0.0, 1.0)
return best_solution
def _ensure_uniqueness(self, candidate, X, dim, epsilon=5e-3, max_trials=50):
"""Ensure candidate is not too close to existing samples (Chebyshev distance)."""
scales = np.linspace(0.1, 1.0, max_trials)
for t in range(max_trials):
dist = cdist(candidate, X, metric='chebyshev').min()
if dist >= epsilon:
break
perturbation = scales[t] * (np.random.rand(1, dim) - 0.5)
candidate = np.clip(candidate + perturbation, 0.0, 1.0)
return candidate