"""
Bayesian Optimization with Lower Confidence Bound and Competitive Knowledge Transfer (BO-LCB-CKT)
This module implements BO-LCB-CKT for expensive sequential transfer optimization problems.
Key Features:
- Task 0: Target task (to be optimized)
- Tasks 1:k: Source tasks (provide knowledge base)
- Only task 0 is actively optimized; source tasks are pre-optimized once
References
----------
[1] Xue, Xiaoming, et al. "Surrogate-assisted search with competitive knowledge transfer for expensive optimization." IEEE Transactions on Evolutionary Computation (2024).
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.09
Version: 1.0
"""
from tqdm import tqdm
import torch
import numpy as np
from scipy.stats import spearmanr
from scipy.optimize import minimize
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict, bo_next_point_lcb
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
import time
warnings.filterwarnings("ignore")
[docs]
class BO_LCB_CKT:
"""
Bayesian Optimization with Lower Confidence Bound and Competitive Knowledge Transfer.
This algorithm optimizes task 0 (target) by leveraging knowledge from tasks 1:k (sources).
Source tasks are pre-optimized once to build a knowledge base, then only the target task
is optimized with competitive knowledge transfer.
**Now supports tasks with different dimensions using space_transfer.**
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'True',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None,
gen_gap=10, ada_flag=False,
save_data=True, save_path='./Data',
name='BO-LCB-CKT', disable_tqdm=False):
"""
Initialize BO-LCB-CKT algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance.
Task 0 is the target task, tasks 1:k are source tasks.
n_initial : int or List[int], optional
Number of initial samples per task (default: 50)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: [500, 100, 100, ...])
- First value: budget for target task (task 0)
- Remaining values: budget for source tasks
gen_gap : int, optional
Knowledge transfer trigger frequency (default: 10)
ada_flag : bool, optional
Whether to enable task adaptation (default: False)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'BO_LCB_CKT_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: False)
"""
self.problem = problem
self.nt = problem.n_tasks
self.dims = problem.dims
self.dim_max = max(self.dims) # Maximum dimension across all tasks
self.dim_target = self.dims[0] # Target task dimension
self.n_initial = n_initial if n_initial is not None else 20
# Default budget: target task gets more evaluations
if max_nfes is None:
self.max_nfes = [200] + [50] * (self.nt - 1)
else:
self.max_nfes = max_nfes
self.gen_gap = gen_gap
self.ada_flag = ada_flag
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
self.data_type = torch.float
[docs]
def optimize(self):
"""
Execute the BO-LCB-CKT algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = self.nt
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# ===================================================================
# Phase 1: Initialize all tasks in real space
# ===================================================================
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Unified progress bar
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}",
disable=self.disable_tqdm)
# ===================================================================
# Phase 2: Build knowledge base from source tasks (tasks 1:k)
# ===================================================================
for i in range(1, nt):
# Get task dimension
dim_i = self.dims[i]
# Optimize source task with BO-LCB (in real space)
while nfes_per_task[i] < max_nfes_per_task[i]:
candidate, _ = bo_next_point_lcb(
dim_i, decs[i], objs[i], data_type=self.data_type
)
# Evaluate (in real space)
obj, _ = evaluation_single(problem, candidate, i)
# Update dataset
decs[i], objs[i] = vstack_groups((decs[i], candidate), (objs[i], obj))
nfes_per_task[i] += 1
pbar.update(1)
# ===================================================================
# Phase 3: Convert to unified space and build GP models
# ===================================================================
# Convert all source tasks to unified space (pad to dim_max)
decs_uni = space_transfer(problem, decs, type='uni', padding='zero')
objs_uni = [objs[i].copy() for i in range(nt)]
# Build knowledge base and surrogates in unified space
knowledge_base = []
surrogates_source = []
for i in range(1, nt):
knowledge_base.append({
'task_id': i,
'solutions_uni': decs_uni[i],
'solutions_real': decs[i],
'objs': objs[i],
'best_solution_uni': decs_uni[i][np.argmin(objs[i])],
'best_solution_real': decs[i][np.argmin(objs[i])],
'best_obj': np.min(objs[i])
})
# Build GP in unified space
gp = gp_build(decs_uni[i], objs[i], data_type=self.data_type)
surrogates_source.append(gp)
# ===================================================================
# Phase 4: Task adaptation (optional) - in unified space
# ===================================================================
ada_vectors = np.zeros((len(knowledge_base), self.dim_max))
if self.ada_flag:
ada_vectors = self._solution_adaptation(
surrogates_source, decs_uni[0], objs[0]
)
# ===================================================================
# Phase 5: Optimize target task (task 0) with CKT
# ===================================================================
target_idx = 0
transfer_states = []
while nfes_per_task[target_idx] < max_nfes_per_task[target_idx]:
# ============ Step 1: Internal Acquisition (BO-LCB) ============
# Work in real space for target task
solution_in, improvement_in = self._internal_acquisition(
decs[target_idx], objs[target_idx]
)
# ============ Step 2: Knowledge Competition (periodic) ============
if nfes_per_task[target_idx] % self.gen_gap == 0:
# Work in unified space for knowledge transfer
solution_ex_uni, improvement_ex, source_id = self._knowledge_competition(
decs_uni[target_idx], objs[target_idx], knowledge_base,
surrogates_source, ada_vectors
)
# Convert external solution from unified to real space
solution_ex = solution_ex_uni[:self.dim_target]
# Competitive selection
if improvement_in >= improvement_ex:
solution_candidate = solution_in
transfer_states.append(0) # Internal win
else:
solution_candidate = solution_ex
transfer_states.append(source_id + 1) # External win
else:
solution_candidate = solution_in
transfer_states.append(0)
# ============ Step 3: Ensure Diversity in real space ============
solution_candidate = self._ensure_diversity(
solution_candidate, decs[target_idx], epsilon=5e-3
)
# ============ Step 4: Evaluate in real space ============
obj_candidate, _ = evaluation_single(
problem, solution_candidate.reshape(1, -1), target_idx
)
# ============ Step 5: Update Database ============
# Update real space data
decs[target_idx], objs[target_idx] = vstack_groups(
(decs[target_idx], solution_candidate), (objs[target_idx], obj_candidate)
)
# Update unified space data for target task
solution_candidate_uni = np.pad(
solution_candidate,
(0, self.dim_max - self.dim_target),
mode='constant',
constant_values=0
)
decs_uni[target_idx] = np.vstack([decs_uni[target_idx], solution_candidate_uni])
nfes_per_task[target_idx] += 1
pbar.update(1)
pbar.close()
runtime = time.time() - start_time
all_decs, all_objs = build_staircase_history(decs, objs, k=1)
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path, filename=self.name,
save_data=self.save_data)
# Add transfer statistics
results.transfer_states = transfer_states
return results
def _solution_adaptation(self, surrogates_source, solutions_target_uni, objs_target):
"""
Perform task adaptation in unified space.
Parameters
----------
surrogates_source : List[SingleTaskGP]
List of source task GP models (trained in unified space)
solutions_target_uni : ndarray
Target task decision variables (in unified space, dim_max)
objs_target : ndarray
Target task objectives
Returns
-------
ada_vectors : ndarray
Adaptation vectors for each source task (k x dim_max)
"""
num_sources = len(surrogates_source)
ada_vectors = np.zeros((num_sources, self.dim_max))
for i in range(num_sources):
def objective(theta):
# Adapt target solutions in unified space
# No clipping: allow GP extrapolation (matching MATLAB obj_ada.m)
X_adapted = solutions_target_uni - theta
try:
# Predict using source GP
objs_val, _ = gp_predict(
surrogates_source[i], X_adapted, data_type=self.data_type
)
objs_val = objs_val.flatten()
except:
objs_val = np.full(len(X_adapted), np.mean(objs_target))
# Compute ranks
rank_target = self._compute_ranks(objs_target.flatten())
rank_val = self._compute_ranks(objs_val)
# Spearman correlation
rho, _ = spearmanr(rank_target, rank_val)
if np.isnan(rho):
rho = 0.0
# Regularization term
alpha_theta = 1.0 - np.max(np.abs(theta))
# Adaptation-based similarity
s_adapted = alpha_theta * rho
return -s_adapted
# Multi-start optimization
no_local_opt = 10
bounds = [(-1.0, 1.0)] * self.dim_max # Use dim_max
best_theta = None
best_obj = np.inf
# Try zero vector first
try:
result_zero = minimize(
objective, np.zeros(self.dim_max),
method='L-BFGS-B', bounds=bounds,
options={'maxiter': 100, 'disp': False}
)
if result_zero.fun < best_obj:
best_obj = result_zero.fun
best_theta = result_zero.x
except:
pass
# Random starts
for _ in range(no_local_opt - 1):
x0 = np.random.uniform(-0.5, 0.5, self.dim_max)
try:
result = minimize(
objective, x0,
method='L-BFGS-B', bounds=bounds,
options={'maxiter': 100, 'disp': False}
)
if result.fun < best_obj:
best_obj = result.fun
best_theta = result.x
except:
continue
if best_theta is not None:
ada_vectors[i] = best_theta
return ada_vectors
def _internal_acquisition(self, decs, objs):
"""
Internal acquisition using BO-LCB (in real space for target task).
Parameters
----------
decs : ndarray
Current decision variables (real space, dim_target)
objs : ndarray
Current objectives
Returns
-------
solution : ndarray
Selected candidate solution (real space, dim_target)
improvement : float
Estimated internal improvement
"""
# Select next point using BO-LCB in real space
solution, gp = bo_next_point_lcb(
self.dim_target, decs, objs, data_type=self.data_type
)
# Compute internal improvement
improvement = self._improvement_internal(decs, objs, solution, gp)
return solution.flatten(), improvement
def _improvement_internal(self, decs, objs, solution_promising, gp):
"""
Estimate internal improvement (same as before).
"""
try:
kappa = 2.0
mu_db, std_db = gp_predict(gp, decs, data_type=self.data_type)
lcb_db = mu_db.flatten() - kappa * std_db.flatten()
mu_pr, std_pr = gp_predict(
gp, solution_promising.reshape(1, -1), data_type=self.data_type
)
lcb_pr = mu_pr[0, 0] - kappa * std_pr[0, 0]
improvement = np.min(lcb_db) - lcb_pr
return improvement
except:
return 0.0
def _knowledge_competition(self, decs_target_uni, objs_target, knowledge_base,
surrogates_source, ada_vectors):
"""
Knowledge competition in unified space.
Parameters
----------
decs_target_uni : ndarray
Target task decision variables (unified space, dim_max)
objs_target : ndarray
Target task objectives
knowledge_base : List[dict]
Source task data
surrogates_source : List[SingleTaskGP]
Source task GP models (trained in unified space)
ada_vectors : ndarray
Adaptation vectors (k x dim_max)
Returns
-------
solution_external_uni : ndarray
Selected external candidate (unified space, dim_max)
improvement_max : float
Maximum external improvement
source_id : int
ID of the winning source task
"""
num_sources = len(knowledge_base)
improvements = np.zeros(num_sources)
solutions_uni = np.zeros((num_sources, self.dim_max))
ranks_target = self._compute_ranks(objs_target.flatten())
for i in range(num_sources):
# Adapt target solutions in unified space
# No clipping: allow GP extrapolation (matching MATLAB knowledge_competition.m)
X_adapted = decs_target_uni - ada_vectors[i]
try:
objs_val, _ = gp_predict(
surrogates_source[i], X_adapted, data_type=self.data_type
)
objs_val = objs_val.flatten()
except:
objs_val = np.full(len(X_adapted), np.mean(objs_target))
ranks_val = self._compute_ranks(objs_val)
rho, _ = spearmanr(ranks_target, ranks_val)
if np.isnan(rho):
rho = 0.0
if self.ada_flag and np.any(ada_vectors[i] != 0):
alpha_theta = 1.0 - np.max(np.abs(ada_vectors[i]))
similarity = alpha_theta * rho
else:
similarity = rho
# Estimate improvement
objs_source = knowledge_base[i]['objs'].flatten()
improvement = self._improvement_external(
objs_source, objs_val, objs_target.flatten(), similarity
)
improvements[i] = improvement
# Prepare adapted solution (in unified space)
x_source_best_uni = knowledge_base[i]['best_solution_uni']
x_adapted = x_source_best_uni + ada_vectors[i]
x_adapted = np.clip(x_adapted, 0, 1)
solutions_uni[i] = x_adapted
# Select best source
source_id = np.argmax(improvements)
solution_external_uni = solutions_uni[source_id]
improvement_max = improvements[source_id]
return solution_external_uni, improvement_max, source_id
def _improvement_external(self, objs_source, objs_val, objs_target, similarity):
"""
Estimate external improvement using exponential decay model.
Matches MATLAB improvement_external.m:
- Fits pure exponential y(t) = a * exp(-b*t) to sorted-descending convergence curves
- Estimates time savings from source knowledge transfer
- Computes analogized improvement on target curve
Parameters
----------
objs_source : ndarray
Source task true objectives (unsorted)
objs_val : ndarray
Source GP predictions on target data
objs_target : ndarray
Target task true objectives (unsorted)
similarity : float
Rank-based similarity between source and target
Returns
-------
improvement : float
Estimated external improvement (non-negative)
"""
if similarity < 0:
return 0.0
# Sort descending (matching MATLAB: sort(...,'descend'))
objs_source_sorted = np.sort(objs_source)[::-1]
objs_target_sorted = np.sort(objs_target)[::-1]
n_source = len(objs_source_sorted)
n_target = len(objs_target_sorted)
# Pure exponential model: y(t) = a * exp(-b*t)
# Fit: log(y) = log(a) - b*t (requires y > 0)
# MATLAB: coe = [ones(n,1), (1:n)'] \ log(objs)
# ps = [exp(coe(1)), -coe(2)]
# Source curve fit
if np.any(objs_source_sorted <= 0):
return 0.0
t_source = np.arange(1, n_source + 1)
try:
# polyfit: log(y) = coeffs[0]*t + coeffs[1]
coeffs_source = np.polyfit(t_source, np.log(objs_source_sorted), 1)
a_source = np.exp(coeffs_source[1]) # exp(intercept)
b_source = -coeffs_source[0] # -slope
except:
return 0.0
# Target curve fit
if np.any(objs_target_sorted <= 0):
return 0.0
t_target = np.arange(1, n_target + 1)
try:
coeffs_target = np.polyfit(t_target, np.log(objs_target_sorted), 1)
a_target = np.exp(coeffs_target[1])
b_target = -coeffs_target[0]
except:
return 0.0
# Time position of min(objs_val) on source curve
# MATLAB: tv = max(0, -log(min(objs_val)/ps(1)) / ps(2))
min_val = np.min(objs_val)
try:
if min_val <= 0 or a_source <= 0 or b_source <= 0:
tv = 0.0
else:
tv = -np.log(min_val / a_source) / b_source
tv = max(0.0, tv)
except:
tv = 0.0
# Analogized time and predicted improvement
# MATLAB: imp = sim*(min(objs_target) - decay_target(sim*(n_target+n_source-tv), pt))
analogized_time = similarity * (n_target + n_source - tv)
predicted_obj = a_target * np.exp(-b_target * analogized_time)
improvement = similarity * (np.min(objs_target) - predicted_obj)
return max(0.0, improvement)
def _compute_ranks(self, values):
"""
Compute ranks (same as before).
"""
n = len(values)
ranks = np.zeros(n)
for i in range(n):
ranks[i] = np.sum(values < values[i]) + 1
return ranks
def _ensure_diversity(self, solution, database, epsilon=5e-3, max_trials=50):
"""
Ensure diversity (works in real space for target task).
"""
scales = np.linspace(0.1, 1.0, max_trials)
for trial in range(max_trials):
distances = np.max(np.abs(database - solution), axis=1)
min_dist = np.min(distances)
if min_dist >= epsilon:
break
scale = scales[trial]
perturbation = scale * (np.random.rand(self.dim_target) - 0.5)
solution = solution + perturbation
solution = np.clip(solution, 0, 1)
return solution