Source code for ddmtolab.Methods.Algo_Methods.bo_utils

import torch
import numpy as np
import gpytorch
from botorch.models import SingleTaskGP, MultiTaskGP
from botorch.fit import fit_gpytorch_mll
from botorch.acquisition import LogExpectedImprovement
from botorch.optim import optimize_acqf
from botorch.models.transforms.outcome import Standardize
from botorch.acquisition.objective import ScalarizedPosteriorTransform
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.acquisition import UpperConfidenceBound



[docs] def gp_build( decs: np.ndarray, objs: np.ndarray, data_type: torch.dtype = torch.float ) -> SingleTaskGP: """ Build and fit a Single-Task Gaussian Process model. Parameters ---------- decs : np.ndarray Historical decision variables, shape: (n_samples, dim) objs : np.ndarray Historical objective function values, shape: (n_samples,) or (n_samples, 1) data_type : torch.dtype, optional Data type for tensors (default: torch.float) Returns ------- gp : SingleTaskGP Fitted Gaussian Process model """ # Prepare training data train_X = torch.tensor(decs, dtype=data_type) train_Y = torch.tensor(-objs, dtype=data_type) if train_Y.dim() == 1: train_Y = train_Y.unsqueeze(-1) # Build and fit Gaussian Process model gp = SingleTaskGP( train_X=train_X, train_Y=train_Y, outcome_transform=Standardize(m=1) ) mll = ExactMarginalLogLikelihood(gp.likelihood, gp) fit_gpytorch_mll(mll) return gp
[docs] def gp_predict( gp: SingleTaskGP, test_X: np.ndarray, data_type: torch.dtype = torch.float ) -> tuple[np.ndarray, np.ndarray]: """ Predict objectives and uncertainties using a trained Gaussian Process model. Parameters ---------- gp : SingleTaskGP Trained Gaussian Process model test_X : np.ndarray Test decision variables, shape: (n_candidates, dim) data_type : torch.dtype, optional Data type for tensors (default: torch.float) Returns ------- pred_objs : np.ndarray Predicted objective values, shape: (n_candidates, 1) pred_std : np.ndarray Predicted standard deviations, shape: (n_candidates, 1) """ # Convert to tensor test_X_tensor = torch.tensor(test_X, dtype=data_type) # Predict using the trained model gp.eval() with torch.no_grad(): posterior = gp.posterior(test_X_tensor) pred_mean = posterior.mean pred_var = posterior.variance # Convert to numpy and negate for minimization pred_objs = -pred_mean.cpu().numpy() pred_std = torch.sqrt(pred_var).cpu().numpy() return pred_objs, pred_std
[docs] def mo_gp_build(decs, objs, data_type=torch.float): """ Build Gaussian Process models for each objective in multi-objective optimization. Parameters ---------- decs : np.ndarray Decision variables, shape (N, D) where N is the number of samples and D is the dimension of decision space. objs : np.ndarray Objective values, shape (N, M) where M is the number of objectives. data_type : torch.dtype, optional Data type for GP models (default: torch.float). Returns ------- models : list List of trained GP models, one for each objective. """ M = objs.shape[1] models = [] for j in range(M): model = gp_build(decs, objs[:, j:j + 1], data_type) models.append(model) return models
[docs] def mo_gp_predict(models, x, data_type=torch.float, mse=False): """ Predict objectives using trained GP models for multi-objective optimization. Parameters ---------- models : list List of trained GP models (one per objective), as returned by mo_gp_build. x : np.ndarray Decision variables to predict, shape (N, D) where N is the number of samples and D is the dimension of decision space. data_type : torch.dtype, optional Data type for GP prediction (default: torch.float). mse : bool, optional If True, also return the Mean Squared Error (variance) of predictions. If False, only return predicted objective values (default: False). Returns ------- pred_objs : np.ndarray Predicted objective values, shape (N, M) where M is the number of objectives. pred_mse : np.ndarray, optional Predicted MSE (variance) for each objective, shape (N, M). Only returned if mse=True. """ N = x.shape[0] M = len(models) pred_objs = np.zeros((N, M)) if mse: pred_mse = np.zeros((N, M)) for j in range(M): pred, std = gp_predict(models[j], x, data_type) pred_objs[:, j] = pred.flatten() pred_mse[:, j] = (std ** 2).flatten() return pred_objs, pred_mse else: for j in range(M): pred, _ = gp_predict(models[j], x, data_type) pred_objs[:, j] = pred.flatten() return pred_objs
[docs] def bo_next_point( dim_i: int, decs_i: np.ndarray, objs_i: np.ndarray, data_type: torch.dtype = torch.float ) -> np.ndarray: """ Get the next sampling point using Bayesian Optimization Parameters ---------- dim_i : int Dimension of decision variables decs_i : np.ndarray Historical decision variables, shape: (n_samples, dim_i) objs_i : np.ndarray Historical objective function values, shape: (n_samples,) or (n_samples, 1) data_type : torch.dtype, optional Data type, default is torch.float Returns ------- candidate_np : np.ndarray Next sampling point, shape: (1, dim_i) """ # Define search bounds [0, 1]^dim_i bounds = torch.stack([ torch.zeros(dim_i, dtype=data_type), torch.ones(dim_i, dtype=data_type) ], dim=0) # Prepare training data for Gaussian Process train_X = torch.tensor(decs_i, dtype=data_type) train_Y = torch.tensor(-objs_i, dtype=data_type) if train_Y.dim() == 1: train_Y = train_Y.unsqueeze(-1) # Build and fit Gaussian Process model gp = SingleTaskGP( train_X=train_X, train_Y=train_Y, # outcome_transform=Standardize(m=1) ) mll = ExactMarginalLogLikelihood(gp.likelihood, gp) fit_gpytorch_mll(mll) # Optimize Log Expected Improvement acquisition function best_f = train_Y.max() logEI = LogExpectedImprovement(model=gp, best_f=best_f) candidate, _ = optimize_acqf( logEI, bounds=bounds, q=1, num_restarts=5, raw_samples=20 ) # Convert to numpy array and return candidate_np = candidate.detach().cpu().numpy() return candidate_np
[docs] def bo_next_point_lcb( dim_i: int, decs_i: np.ndarray, objs_i: np.ndarray, data_type: torch.dtype = torch.float, kappa: float = 2.0 ) -> tuple: """ Get the next sampling point using Bayesian Optimization with LCB acquisition Parameters ---------- dim_i : int Dimension of decision variables decs_i : np.ndarray Historical decision variables, shape: (n_samples, dim_i) objs_i : np.ndarray Historical objective function values, shape: (n_samples,) or (n_samples, 1) data_type : torch.dtype, optional Data type, default is torch.float kappa : float, optional Exploration weight for LCB, default is 2.0 Returns ------- candidate_np : np.ndarray Next sampling point, shape: (1, dim_i) gp : SingleTaskGP Fitted Gaussian Process model """ # Define search bounds [0, 1]^dim_i bounds = torch.stack([ torch.zeros(dim_i, dtype=data_type), torch.ones(dim_i, dtype=data_type) ], dim=0) # Prepare training data for Gaussian Process train_X = torch.tensor(decs_i, dtype=data_type) train_Y = torch.tensor(-objs_i, dtype=data_type) # 取负以最大化 if train_Y.dim() == 1: train_Y = train_Y.unsqueeze(-1) # Build and fit Gaussian Process model gp = SingleTaskGP(train_X=train_X, train_Y=train_Y) mll = ExactMarginalLogLikelihood(gp.likelihood, gp) fit_gpytorch_mll(mll) # Optimize Upper Confidence Bound (UCB) acquisition function UCB = UpperConfidenceBound(model=gp, beta=kappa**2) # beta = kappa^2 candidate, _ = optimize_acqf( UCB, bounds=bounds, q=1, num_restarts=5, raw_samples=20 ) # Convert to numpy array and return candidate_np = candidate.detach().cpu().numpy() return candidate_np, gp
[docs] def mtgp_build( decs: list[np.ndarray], objs: list[np.ndarray], dims: list[int], data_type: torch.dtype = torch.float ) -> MultiTaskGP: """ Build a Multi-Task Gaussian Process model. Parameters ---------- decs : list[np.ndarray] List of decision variable matrices for each task objs : list[np.ndarray] List of objective value matrices for each task dims : list[int] List of dimensionalities for each task std_params : list[dict] | None Standardization parameters for each task. If None, objectives are not standardized. data_type : torch.dtype Data type for tensors Returns ------- mtgp : MultiTaskGP Fitted Multi-Task GP model """ nt = len(decs) max_dim = max(dims) train_X_list = [] train_Y_list = [] train_i_list = [] task_range = torch.linspace(0, 1, nt).unsqueeze(-1) for i in range(nt): task_data = torch.tensor(decs[i], dtype=data_type) # Use negative objectives for maximization task_obj = torch.tensor(-objs[i], dtype=data_type) task_idx = torch.full( (task_data.shape[0], 1), task_range[i].item(), dtype=data_type ) # Pad with zeros if current task has fewer dimensions if dims[i] < max_dim: padding = torch.zeros(task_data.shape[0], max_dim - dims[i], dtype=data_type) task_data = torch.cat([task_data, padding], dim=1) train_X_list.append(task_data) train_Y_list.append(task_obj) train_i_list.append(task_idx) train_X = torch.cat(train_X_list, dim=0) train_Y = torch.cat(train_Y_list, dim=0) train_i = torch.cat(train_i_list, dim=0) train_X_with_task = torch.cat([train_X, train_i], dim=1) mtgp = MultiTaskGP( train_X=train_X_with_task, train_Y=train_Y, task_feature=-1, ) mll = ExactMarginalLogLikelihood(mtgp.likelihood, mtgp) fit_gpytorch_mll(mll) return mtgp
[docs] def mtgp_predict( mtgp: MultiTaskGP, off_decs: np.ndarray, task_id: int, dims: list[int], nt: int, obj_min_vals: list[float] | None = None, obj_max_vals: list[float] | None = None, data_type: torch.dtype = torch.float ) -> tuple[np.ndarray, np.ndarray]: """ Use Multi-Task GP to predict objectives and uncertainties for candidate solutions. Parameters ---------- mtgp : MultiTaskGP Trained Multi-Task Gaussian Process model off_decs : np.ndarray Candidate decision variables, shape (n_candidates, dim) task_id : int Task index for prediction dims : list[int] List of dimensionalities for each task nt : int Total number of tasks obj_min_vals : list[float] | None Minimum objective values for each task (for denormalization) obj_max_vals : list[float] | None Maximum objective values for each task (for denormalization) data_type : torch.dtype, optional Data type for tensors (default: torch.float) Returns ------- pred_objs : np.ndarray Predicted objective values, shape (n_candidates, 1) pred_std : np.ndarray Predicted standard deviations, shape (n_candidates, 1) """ # Convert to tensor and pad dimensions if necessary test_X = torch.tensor(off_decs, dtype=data_type) max_dim = max(dims) if dims[task_id] < max_dim: padding = torch.rand(test_X.shape[0], max_dim - dims[task_id], dtype=data_type) test_X = torch.cat([test_X, padding], dim=1) # Append task index as the last feature task_range = torch.linspace(0, 1, nt) task_idx = torch.full((test_X.shape[0], 1), task_range[task_id].item(), dtype=data_type) test_X_with_task = torch.cat([test_X, task_idx], dim=1) # Predict using the trained model mtgp.eval() with torch.no_grad(): posterior = mtgp.posterior(test_X_with_task) pred_mean = posterior.mean pred_var = posterior.variance # Convert to numpy and negate for minimization pred_objs = -pred_mean.cpu().numpy() pred_std = torch.sqrt(pred_var).cpu().numpy() # Denormalize if min/max values are provided if obj_min_vals is not None and obj_max_vals is not None: min_val = obj_min_vals[task_id] max_val = obj_max_vals[task_id] range_val = max_val - min_val if range_val < 1e-10: range_val = 1.0 pred_objs = pred_objs * range_val + min_val pred_std = pred_std * range_val return pred_objs, pred_std
[docs] def mtgp_task_corr( mtgp: MultiTaskGP ) -> np.ndarray: """ Extract task correlation matrix from multi-task Gaussian process model. Parameters ---------- mtgp : MultiTaskGP Trained Multi-Task Gaussian Process model Returns ------- task_corr : np.ndarray Task correlation matrix (normalized covariance matrix) """ # Set model to evaluation mode mtgp.eval() # Find the IndexKernel from the covariance module index_kernel = None for kernel in mtgp.covar_module.kernels: if isinstance(kernel, gpytorch.kernels.IndexKernel): index_kernel = kernel break # Extract task correlations without gradient computation with torch.no_grad(): covar_factor = index_kernel.covar_factor task_covar = (covar_factor @ covar_factor.T).cpu().numpy() # Normalize covariance matrix to get correlation matrix diag = np.sqrt(np.diag(task_covar)) task_corr = task_covar / np.outer(diag, diag) return task_corr
[docs] def mtbo_next_point( mtgp: MultiTaskGP, task_id: int, objs: list[np.ndarray], dims: list[int], nt: int, data_type: torch.dtype = torch.float ) -> np.ndarray: """ Get the next sampling point using Multi-Task Bayesian Optimization. Parameters ---------- mtgp : MultiTaskGP Trained Multi-Task Gaussian Process model task_id : int Task index for which to find the next point objs : list[np.ndarray] List of objective value matrices for each task dims : list[int] List of dimensionalities for each task nt : int Total number of tasks data_type : torch.dtype, optional Data type for tensors (default: torch.float) Returns ------- candidate_np : np.ndarray Next sampling point, shape: (1, dims[task_id]) """ # Define search bounds [0, 1]^max_dim with fixed task index max_dim = max(dims) lower_bound = torch.zeros(max_dim + 1, dtype=data_type) upper_bound = torch.ones(max_dim + 1, dtype=data_type) task_range = torch.linspace(0, 1, nt) lower_bound[-1] = task_range[task_id].item() upper_bound[-1] = task_range[task_id].item() bounds = torch.stack([lower_bound, upper_bound], dim=0) # Compute the best observed value for the current task best_f = torch.tensor(-objs[task_id], dtype=data_type).squeeze().max() # Build Log Expected Improvement acquisition function posterior_transform = ScalarizedPosteriorTransform(weights=torch.tensor([1.0], dtype=data_type)) logEI = LogExpectedImprovement( model=mtgp, best_f=best_f, posterior_transform=posterior_transform ) # Optimize acquisition function and extract decision variables candidate, _ = optimize_acqf( logEI, bounds=bounds, q=1, num_restarts=5, raw_samples=20 ) candidate_np = candidate[:, :dims[task_id]].detach().cpu().numpy() return candidate_np