Source code for ddmtolab.Algorithms.MTSO.BO_LCB_BCKT

"""
BO-LCB-BCKT: Bayesian Optimization with Lower Confidence Bound and Bayesian Competitive Knowledge Transfer

This module implements BO-LCB-BCKT for expensive multi-task optimization problems.

References
----------
    [1] Lu, Yi, et al. "Multi-Task Surrogate-Assisted Search with Bayesian Competitive Knowledge Transfer for Expensive Optimization." arXiv preprint arXiv:2510.23407 (2025).

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.01.09
Version: 2.0
"""
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.bo_utils import bo_next_point_lcb
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
import time
import numpy as np
import torch
from scipy.stats import spearmanr

warnings.filterwarnings("ignore")


[docs] class BO_LCB_BCKT: """ BO-LCB-BCKT algorithm for expensive multi-task optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'True', 'n_initial': 'equal', 'max_nfes': 'equal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, gen_gap=10, sigma_I_sq=0.05 ** 2, save_data=True, save_path='./Data', name='BO-LCB-BCKT', disable_tqdm=True, padding='zero'): self.problem = problem self.n_initial = n_initial if n_initial is not None else 50 self.max_nfes = max_nfes if max_nfes is not None else 100 self.gen_gap = gen_gap self.sigma_I_sq = sigma_I_sq self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm self.padding = padding self.dims = problem.dims self.d_max = np.max(problem.dims)
[docs] def optimize(self): data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims d_max = self.d_max n_initial = self.n_initial max_nfes = self.max_nfes gen_gap = self.gen_gap sigma2_sim = 0.05 ** 2 # ======================== Phase 1: Initialization ======================== decs_real = initialization(problem, n_initial, method='lhs') objs_real, _ = evaluation(problem, decs_real) # Transform to unified space decs_uni = space_transfer(problem, decs_real, type='uni', padding=self.padding) # Current working data (unified space) decs = [decs_uni[i].copy() for i in range(nt)] objs = [objs_real[i].copy() for i in range(nt)] # Evaluation counters nfes_per_task = [n_initial] * nt total_nfes = sum(nfes_per_task) # Bayesian transferability: MUs and SIGMA2s (MATLAB-style conjugate Gaussian) MUs = np.full((nt, nt), np.nan) SIGMA2s = np.full((nt, nt), np.nan) # Transfer decision history transfer_actions = [[] for _ in range(nt)] # GP model cache gp_models = [None] * nt # gen_gap in total FEs (MATLAB: gen_gap = no_tasks * paras.gen_gap) gen_gap_total = nt * gen_gap pbar = tqdm(total=max_nfes * nt, initial=total_nfes, desc=f"{self.name}", disable=self.disable_tqdm) # ==================== Phase 2: Main Optimization Loop ======================= while total_nfes < max_nfes * nt: active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes] if not active_tasks: break # ---------- Step 1: Single-Task Optimization (BO-LCB) ---------- solutions_in = {} improvements_in = {} for i in active_tasks: candidate_uni, gp_model = bo_next_point_lcb( d_max, decs[i], objs[i], data_type=data_type ) solutions_in[i] = candidate_uni gp_models[i] = gp_model imp_in = _improvement_internal( gp_model, decs[i], objs[i], candidate_uni, data_type=data_type ) improvements_in[i] = imp_in # ---------- Step 2: Bayesian Competitive Knowledge Transfer ---------- solutions_candidate = {} trigger_transfer = (total_nfes % gen_gap_total == 0) for target_idx in active_tasks: if trigger_transfer: # Save old MUs/SIGMA2s for rollback MUs_old = MUs.copy() SIGMA2s_old = SIGMA2s.copy() # Execute knowledge competition (solution_ex, improvement_ex, source_idx, impn) = _knowledge_competition( decs, objs, gp_models, target_idx, MUs, SIGMA2s, sigma2_sim, data_type ) if improvements_in[target_idx] >= improvement_ex: # Internal wins: use internal solution, rollback Bayesian update solutions_candidate[target_idx] = solutions_in[target_idx] transfer_actions[target_idx].append(0) MUs[source_idx, target_idx] = MUs_old[source_idx, target_idx] SIGMA2s[source_idx, target_idx] = SIGMA2s_old[source_idx, target_idx] else: # External wins: use transferred solution solutions_candidate[target_idx] = solution_ex transfer_actions[target_idx].append(source_idx + 1) # Hidden evaluation for Bayesian update (MATLAB behavior) cand_real = solution_ex[:, :dims[target_idx]] obj_hidden, _ = evaluation_single(problem, cand_real, target_idx) obj_hidden_val = obj_hidden.flatten()[0] # Compute actual transferability min_target = np.min(objs[target_idx]) max_target = np.max(objs[target_idx]) if max_target != 0: imp_actual = (min_target - obj_hidden_val) / max_target else: imp_actual = 0.0 if impn != 0: mu_transfer = imp_actual / impn else: mu_transfer = 0.0 # Second Bayesian update with decaying variance sigma2_transfer = sigma2_sim * np.exp( -(total_nfes - n_initial * nt) / gen_gap_total ) _bayesian_update_gaussian( MUs, SIGMA2s, source_idx, target_idx, mu_transfer, sigma2_transfer ) else: solutions_candidate[target_idx] = solutions_in[target_idx] transfer_actions[target_idx].append(0) # ---------- Step 3: Evaluation and Database Update ---------- for target_idx in active_tasks: candidate_uni = solutions_candidate[target_idx] # Ensure uniqueness (Chebyshev distance, matching MATLAB) candidate_uni = _ensure_unique(candidate_uni, decs[target_idx]) # Transform to real space for evaluation candidate_real = candidate_uni[:, :dims[target_idx]] obj, _ = evaluation_single(problem, candidate_real, target_idx) # Update database decs[target_idx] = np.vstack([decs[target_idx], candidate_uni]) objs[target_idx] = np.vstack([objs[target_idx], obj]) nfes_per_task[target_idx] += 1 total_nfes += 1 pbar.update(1) pbar.close() runtime = time.time() - start_time # ======================== Phase 3: Build Results (real space) ======================== all_decs_uni, all_objs = build_staircase_history(decs, objs, k=1) final_decs_real = [] for i in range(nt): task_decs_real = [] for dec_uni in all_decs_uni[i]: dec_real = dec_uni[:, :dims[i]] task_decs_real.append(dec_real) final_decs_real.append(task_decs_real) results = build_save_results( all_decs=final_decs_real, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) results.transfer_states = transfer_actions return results
# ==================== Utility Functions ==================== def _ensure_unique(candidate, decs, epsilon=5e-3, max_trials=50): """Ensure uniqueness using Chebyshev distance (matching MATLAB).""" scales = np.linspace(0.1, 1.0, max_trials) for trial in range(max_trials): distances = np.max(np.abs(candidate - decs), axis=1) min_dist = np.min(distances) if min_dist >= epsilon: break perturbation = scales[trial] * (np.random.rand(candidate.shape[1]) - 0.5) candidate = np.clip(candidate + perturbation, 0, 1) return candidate def _bayesian_update_gaussian(MUs, SIGMA2s, source_idx, target_idx, observation, sigma2_obs): """Standard conjugate Gaussian Bayesian update: MU/SIGMA2 in-place.""" mu_old = MUs[source_idx, target_idx] s2_old = SIGMA2s[source_idx, target_idx] MUs[source_idx, target_idx] = ( (mu_old * sigma2_obs + observation * s2_old) / (sigma2_obs + s2_old) ) SIGMA2s[source_idx, target_idx] = ( s2_old * sigma2_obs / (s2_old + sigma2_obs) ) def _knowledge_competition(decs, objs, gp_models, target_idx, MUs, SIGMA2s, sigma2_sim, data_type): """ Execute Bayesian Competitive Knowledge Transfer (matching MATLAB knowledge_competition.m). Returns (solution_ex, improvement_ex, source_idx, impn). MUs and SIGMA2s are updated IN-PLACE with similarity observations. """ nt = len(decs) Xt = decs[target_idx] Yt = objs[target_idx].flatten() dim = Xt.shape[1] improvements = np.full(nt, -np.inf) impns = np.full(nt, -np.inf) solutions_external = [None] * nt for source_idx in range(nt): if source_idx == target_idx: continue Xs = decs[source_idx] Ys = objs[source_idx].flatten() # Compute similarity (SSRC) using source GP predictions on target data similarity, objs_val = _compute_similarity_ssrc( gp_models[source_idx], Xt, Yt, data_type ) # Bayesian update with similarity as observation if np.isnan(MUs[source_idx, target_idx]): # First encounter: initialize MUs[source_idx, target_idx] = similarity SIGMA2s[source_idx, target_idx] = sigma2_sim else: # Subsequent: conjugate Gaussian update _bayesian_update_gaussian( MUs, SIGMA2s, source_idx, target_idx, similarity, sigma2_sim ) # Sample transferability from posterior transferability = np.random.normal( MUs[source_idx, target_idx], np.sqrt(SIGMA2s[source_idx, target_idx]) ) # Compute external improvement (matching MATLAB improvement_external.m) imp, impn = _improvement_external(Ys, objs_val, Yt, transferability) improvements[source_idx] = imp impns[source_idx] = impn # Best solution from source task best_idx = np.argmin(Ys) solutions_external[source_idx] = Xs[best_idx:best_idx + 1] # Select best source best_source = np.argmax(improvements) return (solutions_external[best_source], improvements[best_source], best_source, impns[best_source]) def _compute_similarity_ssrc(gp_source, Xt, Yt, data_type): """ Compute Surrogate-based Spearman Rank Correlation (SSRC). The GP from bo_next_point_lcb is trained on -objs, so predictions are negated back. Returns (similarity, objs_val) where objs_val is in ORIGINAL (non-negated) space. """ Xt_torch = torch.tensor(Xt, dtype=data_type) with torch.no_grad(): posterior = gp_source.posterior(Xt_torch) # GP predicts -obj, negate back to original space objs_val = -posterior.mean.cpu().numpy().flatten() # Spearman rank correlation if len(Yt) < 3: return 0.0, objs_val similarity, _ = spearmanr(Yt, objs_val) if np.isnan(similarity): similarity = 0.0 return similarity, objs_val def _improvement_external(objs_source, objs_val, objs_target, transferability): """ Calculate external improvement (matching MATLAB improvement_external.m). Parameters ---------- objs_source : source task true objectives (flattened) objs_val : source GP predictions on target data (original space) objs_target : target task true objectives (flattened) transferability : sampled Bayesian transferability Returns ------- imp : external improvement impn_source : normalized source improvement (used for actual transferability calc) """ combined = np.concatenate([objs_source, objs_val]) max_combined = np.max(combined) if max_combined == 0: return -np.inf, 0.0 impn_source = (np.min(objs_val) - np.min(objs_source)) / max_combined if impn_source < 0: return -np.inf, impn_source imp = transferability * impn_source * np.max(objs_target) return imp, impn_source def _improvement_internal(gp_model, decs, objs, candidate, data_type=torch.float, kappa=2.0): """ Calculate internal improvement (LCB-based). The GP predicts -obj, so we negate back to compute LCB in original space: LCB_orig = -mu_gp - kappa * sigma_gp """ X_db_torch = torch.tensor(decs, dtype=data_type) with torch.no_grad(): posterior_db = gp_model.posterior(X_db_torch) mu_db = posterior_db.mean.cpu().numpy().flatten() std_db = posterior_db.variance.sqrt().cpu().numpy().flatten() X_cand_torch = torch.tensor(candidate, dtype=data_type) with torch.no_grad(): posterior_cand = gp_model.posterior(X_cand_torch) mu_cand = posterior_cand.mean.cpu().numpy().flatten() std_cand = posterior_cand.variance.sqrt().cpu().numpy().flatten() # LCB in original space: obj ≈ -mu_gp, so LCB = -mu_gp - kappa * sigma lcb_db = -mu_db - kappa * std_db lcb_cand = -mu_cand - kappa * std_cand improvement = np.min(lcb_db) - lcb_cand[0] return improvement