Source code for ddmtolab.Algorithms.MTSO.MUMBO

"""
Multi-task Max-value Bayesian Optimization (MUMBO)

This module implements MUMBO for expensive multi-task optimization. The algorithm
uses an information-theoretic acquisition function based on mutual information
between candidate observations and the global optimum value g*. A multi-task
Gaussian process provides cross-task knowledge transfer, and the MUMBO acquisition
function exploits the bivariate predictive distribution to compute rho (predictive
correlation) between each task and the target task. The acquisition value is
divided by the evaluation cost of each task, enabling cost-aware task selection.

References
----------
    [1] Moss, Henry B., David S. Leslie, and Paul Rayson. "Mumbo: Multi-task
        max-value Bayesian optimization." Joint European Conference on Machine
        Learning and Knowledge Discovery in Databases. Springer, 2020.

    [2] Wang, Zi, and Stefanie Jegelka. "Max-value entropy search for efficient
        Bayesian optimization." ICML, 2017.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.18
Version: 2.0
"""
from tqdm import tqdm
import torch
import time
import numpy as np
from scipy.stats import norm, gumbel_r
from scipy.integrate import simpson
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import mtgp_build
import warnings

warnings.filterwarnings("ignore")


[docs] class MUMBO: """ Multi-task Max-value Bayesian Optimization (MUMBO). Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements. """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'True', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, task_cost=None, n_gstar_samples=1, n_candidates=20, n_quad=10, save_data=True, save_path='./Data', name='MUMBO', disable_tqdm=True): """ Initialize MUMBO. Parameters ---------- problem : MTOP Multi-task optimization problem instance. n_initial : int or List[int], optional Number of initial samples per task (default: 50). max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 100). task_cost : List[float] or None, optional Evaluation cost for each task. If None, all tasks have equal cost [1, 1, ..., 1]. n_gstar_samples : int, optional Number of g* samples from Gumbel distribution (default: 1). n_candidates : int, optional Number of random candidates per task for acquisition (default: 20). n_quad : int, optional Number of quadrature points for Simpson integration (default: 10). save_data : bool, optional Whether to save optimization data (default: True). save_path : str, optional Path to save results (default: './Data'). name : str, optional Name for the experiment (default: 'MUMBO'). disable_tqdm : bool, optional Whether to disable progress bar (default: True). """ self.problem = problem self.n_initial = n_initial if n_initial is not None else 50 self.max_nfes = max_nfes if max_nfes is not None else 100 self.task_cost = task_cost self.n_gstar_samples = n_gstar_samples self.n_candidates = n_candidates self.n_quad = n_quad self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute MUMBO. Returns ------- Results Optimization results containing decision variables, objectives, and runtime. """ data_type = torch.double start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Task evaluation costs if self.task_cost is not None: task_cost = np.array(self.task_cost, dtype=float) else: task_cost = np.ones(nt, dtype=float) # Initialize samples using Latin Hypercube Sampling decs = initialization(problem, self.n_initial, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break # Build multi-task GP with normalized objectives objs_normalized, _, _ = normalize(objs, axis=0, method='minmax') mtgp = mtgp_build(decs, objs_normalized, dims, data_type=data_type) # Sample g* values from Gumbel extreme-value distribution g_samples = _sample_gstar(mtgp, decs, dims, nt, data_type, n_gstar_samples=self.n_gstar_samples) # Select (task, x) pair with highest acquisition/cost ratio best_task, best_x = _select_next_point( mtgp, g_samples, active_tasks, dims, nt, task_cost, data_type, n_candidates=self.n_candidates, n_quad=self.n_quad ) # Evaluate on selected task candidate_np = best_x.reshape(1, -1) obj, _ = evaluation_single(problem, candidate_np, best_task) # Update data decs[best_task], objs[best_task] = vstack_groups( (decs[best_task], candidate_np), (objs[best_task], obj) ) nfes_per_task[best_task] += 1 pbar.update(1) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=1) results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
# --------------------------------------------------------------------------- # Helper functions # --------------------------------------------------------------------------- def _get_task_range(nt, data_type=torch.double): """Get task index encoding vector consistent with mtgp_build.""" return torch.linspace(0, 1, nt, dtype=data_type) def _bivariate_stats_batch(mtgp, candidates_np, task_id, dims, nt, data_type=torch.double): """ Compute bivariate GP statistics for a batch of candidates on task z vs target task z0. For each candidate x_j, computes the joint posterior of (g, y) where g = f(x, z0) and y = f(x, z) + eps, then extracts mu_g, sigma_g, sigma_f_noisy, and rho. Parameters ---------- mtgp : MultiTaskGP Trained multi-task GP. candidates_np : np.ndarray Candidate points, shape (n, d). task_id : int Task index z for the candidate evaluation. dims : list[int] Dimensionality of each task. nt : int Number of tasks. data_type : torch.dtype Data type. Returns ------- mu_g : np.ndarray, shape (n,) sigma_g : np.ndarray, shape (n,) sigma_f_noisy : np.ndarray, shape (n,) rho : np.ndarray, shape (n,) """ max_dim = max(dims) task_range = _get_task_range(nt, data_type) n = candidates_np.shape[0] # Pad candidates to max_dim if candidates_np.shape[1] < max_dim: pad = np.zeros((n, max_dim - candidates_np.shape[1])) x_padded = np.hstack([candidates_np, pad]) else: x_padded = candidates_np x_t = torch.tensor(x_padded, dtype=data_type) # (n, max_dim) target_task_val = task_range[0].item() cand_task_val = task_range[task_id].item() # Build interleaved input: [x0_target, x0_cand, x1_target, x1_cand, ...] # Shape: (2*n, max_dim+1) x_target = torch.cat([x_t, torch.full((n, 1), target_task_val, dtype=data_type)], dim=1) x_cand = torch.cat([x_t, torch.full((n, 1), cand_task_val, dtype=data_type)], dim=1) # Interleave: row 0,1 = pair 0; row 2,3 = pair 1; ... x_joint = torch.stack([x_target, x_cand], dim=1).reshape(2 * n, -1) # Get noise variance noise_tensor = mtgp.likelihood.noise.detach().cpu() if noise_tensor.numel() > 1: noise_var = noise_tensor.mean().item() else: noise_var = noise_tensor.item() mtgp.eval() with torch.no_grad(): post = mtgp.posterior(x_joint) mu_all = post.mean.squeeze(-1).cpu().numpy() # (2*n,) cov_full = post.mvn.covariance_matrix.cpu().numpy() # (2*n, 2*n) # Extract per-pair statistics from the block-diagonal-like structure mu_g_arr = np.empty(n) sigma_g_arr = np.empty(n) sigma_f_noisy_arr = np.empty(n) rho_arr = np.empty(n) for j in range(n): idx0 = 2 * j # target idx1 = 2 * j + 1 # candidate mu_g_val = mu_all[idx0] sigma_g_sq = cov_full[idx0, idx0] sigma_f_sq = cov_full[idx1, idx1] sigma_gf = cov_full[idx0, idx1] sg = np.sqrt(max(sigma_g_sq, 1e-20)) sf_noisy = np.sqrt(max(sigma_f_sq + noise_var, 1e-20)) denom = sg * sf_noisy if denom < 1e-20: r = 0.0 else: r = np.clip(sigma_gf / denom, -0.999, 0.999) mu_g_arr[j] = mu_g_val sigma_g_arr[j] = sg sigma_f_noisy_arr[j] = sf_noisy rho_arr[j] = r return mu_g_arr, sigma_g_arr, sigma_f_noisy_arr, rho_arr def _sample_gstar(mtgp, decs, dims, nt, data_type, n_gstar_samples=1): """ Sample g* values using Gumbel extreme-value distribution. Use mean-field approximation: evaluate GP posterior mean on a large random grid, then fit a Gumbel distribution to sample potential optimal values. Parameters ---------- mtgp : MultiTaskGP Trained multi-task GP. decs : list[np.ndarray] Current decision variables per task. dims : list[int] Dimensionality per task. nt : int Number of tasks. data_type : torch.dtype Data type. n_gstar_samples : int Number of g* samples. Returns ------- g_samples : np.ndarray Array of g* samples, shape (n_gstar_samples,). """ max_dim = max(dims) target_dim = dims[0] task_range = _get_task_range(nt, data_type) target_task_val = task_range[0].item() # Generate random grid: 1000*d points + existing evaluated points n_grid = min(1000 * target_dim, 5000) grid_np = np.random.rand(n_grid, max_dim) # Add existing target task points to grid existing = decs[0].copy() if existing.shape[1] < max_dim: pad = np.zeros((existing.shape[0], max_dim - existing.shape[1])) existing = np.hstack([existing, pad]) grid_np = np.vstack([grid_np, existing]) # Evaluate GP posterior mean on target task grid_t = torch.tensor(grid_np, dtype=data_type) task_col = torch.full((grid_t.shape[0], 1), target_task_val, dtype=data_type) grid_with_task = torch.cat([grid_t, task_col], dim=1) mtgp.eval() with torch.no_grad(): post = mtgp.posterior(grid_with_task) mu = post.mean.squeeze(-1).cpu().numpy() # Fit Gumbel distribution and sample y_max = mu.max() y_mean = mu.mean() y_std = mu.std() if y_std < 1e-10: y_std = 1e-10 # Gumbel location/scale from extreme value theory loc = y_max scale = max(y_std * 0.1, 1e-6) g_samples = gumbel_r.rvs(loc=loc, scale=scale, size=n_gstar_samples) return g_samples def _mumbo_acquisition_single(mu_g, sigma_g, mu_f, sigma_f_noisy, rho, g_samples, n_quad=10): """ Compute MUMBO acquisition value for a single (x, z) pair. Implements equation (11) from the tex: alpha = (1/N) sum_{g*} [ rho^2 * gamma * phi(gamma) / (2 * Phi(gamma)) - log Phi(gamma) + E_{theta ~ ESN}[ log Phi((gamma - rho*theta) / sqrt(1-rho^2)) ] ] Parameters ---------- mu_g, sigma_g : float Target task posterior mean and std. mu_f, sigma_f_noisy : float Candidate task posterior mean and noisy std. rho : float Predictive correlation. g_samples : np.ndarray Samples of g*. n_quad : int Number of quadrature points for Simpson integration. Returns ------- alpha : float MUMBO acquisition value. """ if sigma_g < 1e-20: return 0.0 rho_sq = rho ** 2 sqrt_1_minus_rho_sq = np.sqrt(max(1.0 - rho_sq, 1e-20)) total = 0.0 for g_star in g_samples: gamma = (g_star - mu_g) / sigma_g phi_gamma = norm.pdf(gamma) cdf_gamma = norm.cdf(gamma) if cdf_gamma < 1e-30: cdf_gamma = 1e-30 # First two analytic terms term1 = rho_sq * gamma * phi_gamma / (2.0 * cdf_gamma) term2 = -np.log(cdf_gamma) # Third term: expectation over ESN distribution via Simpson rule # ESN mean and variance (eq. 9-10) ratio = phi_gamma / cdf_gamma # phi/Phi (inverse Mills ratio) esn_mean = rho * ratio esn_var = 1.0 - rho_sq * ratio * (gamma + ratio) esn_var = max(esn_var, 1e-20) esn_std = np.sqrt(esn_var) # Integration bounds: mean +/- 4*std lo = esn_mean - 4.0 * esn_std hi = esn_mean + 4.0 * esn_std if hi - lo < 1e-15: term3 = 0.0 else: theta_grid = np.linspace(lo, hi, n_quad) # ESN density p(theta) (eq. 7) phi_theta = norm.pdf(theta_grid) arg_inner = (gamma - rho * theta_grid) / sqrt_1_minus_rho_sq Phi_inner = norm.cdf(arg_inner) esn_density = (1.0 / cdf_gamma) * phi_theta * Phi_inner # Integrand: p(theta) * log Phi(...) log_Phi_inner = np.log(np.maximum(Phi_inner, 1e-30)) integrand = esn_density * log_Phi_inner term3 = simpson(integrand, x=theta_grid) total += term1 + term2 + term3 alpha = total / len(g_samples) return max(alpha, 0.0) def _select_next_point(mtgp, g_samples, active_tasks, dims, nt, task_cost, data_type, n_candidates=20, n_quad=10): """ Select the next (task, x) pair by maximizing alpha/cost. For each active task, generate random candidates in batch, compute MUMBO acquisition values, and select the (task, x) pair with highest ratio. Parameters ---------- mtgp : MultiTaskGP Trained multi-task GP. g_samples : np.ndarray Samples of g*. active_tasks : list[int] Indices of tasks with remaining budget. dims : list[int] Dimensionality per task. nt : int Number of tasks. task_cost : np.ndarray Evaluation cost per task. data_type : torch.dtype Data type. n_candidates : int Number of random candidates per task. n_quad : int Number of quadrature points for Simpson integration. Returns ------- best_task : int Selected task index. best_x : np.ndarray Selected candidate point, shape (d,). """ best_ratio = -np.inf best_task = active_tasks[0] best_x = np.random.rand(dims[active_tasks[0]]) for task_id in active_tasks: dim = dims[task_id] candidates = np.random.rand(n_candidates, dim) # Batch bivariate stats computation mu_g, sigma_g, sigma_f_noisy, rho = _bivariate_stats_batch( mtgp, candidates, task_id, dims, nt, data_type ) # Compute acquisition for each candidate for j in range(n_candidates): alpha = _mumbo_acquisition_single( mu_g[j], sigma_g[j], 0.0, sigma_f_noisy[j], rho[j], g_samples, n_quad=n_quad ) ratio = alpha / task_cost[task_id] if ratio > best_ratio: best_ratio = ratio best_task = task_id best_x = candidates[j].copy() return best_task, best_x