Source code for ddmtolab.Algorithms.MTSO.LCB_EMT

"""
Lower Confidence Bound-based Evolutionary Multi-Tasking (LCB-EMT)

This module implements LCB-EMT for multi-task optimization with knowledge transfer
via Transfer Gaussian Process and similarity-based lower confidence bound.

References
----------
    [1] Wang, Zhenzhong, et al. "Evolutionary multitask optimization with lower confidence bound-based solution selection strategy." IEEE Transactions on Evolutionary Computation (2024).

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.19
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
from ddmtolab.Methods.Algo_Methods.tgp import TGPRegression
from ddmtolab.Methods.Algo_Methods.algo_utils import *


def tgp_slcb_knowledge_transfer(target_task_idx, source_task_idx, decs, objs, dims, n_transfer):
    """
    Perform knowledge transfer from source task to target task using TGP and SLCB.

    This function implements the Transfer Gaussian Process (TGP) with Similarity-based
    Lower Confidence Bound (SLCB) strategy for selecting promising solutions to transfer
    between tasks in multi-task optimization.

    Parameters
    ----------
    target_task_idx : int
        Index of the target task (receiving knowledge)
    source_task_idx : int
        Index of the source task (providing knowledge)
    decs : List[np.ndarray]
        Decision variables for all tasks. Each element is a 2D array of shape (n_solutions, n_dims)
    objs : List[np.ndarray]
        Objective values for all tasks. Each element is a 1D or 2D array of objective values
    dims : List[int]
        Dimensions for all tasks
    n_transfer : int
        Number of solutions to transfer

    Returns
    -------
    np.ndarray
        Transferred solutions (decision variables) to add to target task offspring.
        Shape: (n_transfer, target_dim)

    Notes
    -----
    The function performs the following steps:
    1. Dimension alignment between source and target tasks
    2. Objective normalization for each task
    3. TGP model training with task indicators
    4. SLCB computation for solution selection
    5. Probabilistic selection of transfer solutions
    """
    target_dim = dims[target_task_idx]
    source_dim = dims[source_task_idx]

    # Step 1: Dimension alignment - adjust source task dimensions to match target task
    adjusted_source_decs = decs[source_task_idx].copy()

    if source_dim < target_dim:
        # Zero-padding for smaller dimensions
        padding = np.zeros((len(adjusted_source_decs), target_dim - source_dim))
        adjusted_source_decs = np.hstack([adjusted_source_decs, padding])
    elif source_dim > target_dim:
        # Truncation for larger dimensions
        adjusted_source_decs = adjusted_source_decs[:, :target_dim]

    # Step 2: Prepare training data
    target_decs = decs[target_task_idx]
    target_objs = objs[target_task_idx]
    source_objs = objs[source_task_idx]

    # Step 3: Normalize objectives for each task based on their own population
    # Normalize target task objectives
    target_objs_min = np.min(target_objs)
    target_objs_max = np.max(target_objs)
    if target_objs_max - target_objs_min > 1e-10:  # Avoid division by zero
        target_objs_norm = (target_objs - target_objs_min) / (target_objs_max - target_objs_min)
    else:
        target_objs_norm = np.zeros_like(target_objs)

    # Normalize source task objectives
    source_objs_min = np.min(source_objs)
    source_objs_max = np.max(source_objs)
    if source_objs_max - source_objs_min > 1e-10:  # Avoid division by zero
        source_objs_norm = (source_objs - source_objs_min) / (source_objs_max - source_objs_min)
    else:
        source_objs_norm = np.zeros_like(source_objs)

    # Step 4: Add task indicators (0 for target task, 1 for source task)
    target_task_ind = np.zeros((len(target_decs), 1))
    source_task_ind = np.ones((len(adjusted_source_decs), 1))

    # Step 5: Combine target and source data
    X_train = np.vstack([
        np.hstack([target_decs, target_task_ind]),
        np.hstack([adjusted_source_decs, source_task_ind])
    ])
    y_train = np.concatenate([target_objs_norm, source_objs_norm])

    # Step 6: Convert to PyTorch tensors
    X_train_torch = torch.tensor(X_train, dtype=torch.float32)
    y_train_torch = torch.tensor(y_train.flatten(), dtype=torch.float32)

    # Step 7: Train Transfer Gaussian Process model
    tgp_model = TGPRegression()
    tgp_model.train(X_train_torch, y_train_torch, n_restarts=3, verbose=False)

    # Step 8: Get learned inter-task similarity (lambda)
    lambda_gen = tgp_model.get_task_correlation()

    # Step 9: Prepare candidate solutions for transfer
    # Solution set S = {x' | x' = M(x), x in source task}
    # Here we use identity mapping M(x) = x
    S = adjusted_source_decs.copy()

    # Step 10: Predict mean (f) and uncertainty (sigma) for candidate solutions
    S_torch = torch.tensor(S, dtype=torch.float32)
    with torch.no_grad():
        mu_S, cov_S = tgp_model.predict(S_torch)
        sigma_S = torch.sqrt(torch.diag(cov_S))

    mu_S_np = mu_S.cpu().numpy()
    sigma_S_np = sigma_S.cpu().numpy()

    # Step 11: Compute Similarity-based Lower Confidence Bound (SLCB)
    SLCB = mu_S_np - lambda_gen * sigma_S_np

    # Step 12: Compute transfer probability for each solution
    exp_neg_SLCB = np.exp(-SLCB)
    p_transfer_unnorm = lambda_gen * exp_neg_SLCB / np.sum(exp_neg_SLCB)
    exp_p_transfer = np.exp(p_transfer_unnorm)
    p_transfer = exp_p_transfer / np.sum(exp_p_transfer)

    if np.any(np.isnan(p_transfer)) or np.any(np.isinf(p_transfer)):
        p_transfer = np.ones(len(S)) / len(S)

    # Step 13: Sample solutions according to probability distribution
    n_transfer_actual = min(n_transfer, len(S))
    selected_indices = np.random.choice(len(S), size=n_transfer_actual, replace=False, p=p_transfer)

    # Step 14: Extract selected solutions for transfer
    transferred_decs = S[selected_indices]

    return transferred_decs


[docs] class LCB_EMT: """ Lower Confidence Bound-based Evolutionary Multi-Tasking. This algorithm uses Transfer Gaussian Process (TGP) to model task relationships and employs a Similarity-based Lower Confidence Bound (SLCB) strategy to select promising solutions for knowledge transfer between tasks. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'True', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, Nt=10, TGap=20, muc=2, mum=5, save_data=True, save_path='./Data', name='LCB-EMT', disable_tqdm=True): """ Initialize LCB-EMT algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) Nt : int, optional Number of solutions to transfer per knowledge transfer operation (default: 20) TGap : int, optional Transfer interval in generations (default: 20) muc : float, optional Distribution index for simulated binary crossover (SBX) (default: 2.0) mum : float, optional Distribution index for polynomial mutation (PM) (default: 5.0) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'LCBEMT_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.Nt = Nt self.TGap = TGap self.muc = muc self.mum = mum self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the LCB-EMT algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() # Store initial populations initial_decs = [d.copy() for d in decs] gen = 1 all_decs, all_objs, all_cons = init_history(decs, objs, cons) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Identify active tasks (tasks that haven't exhausted their evaluation budget) active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Generate offspring using genetic algorithm operators off_decs = ga_generation(decs[i], self.muc, self.mum) # Knowledge transfer using TGP and SLCB if self.Nt > 0 and gen % self.TGap == 0 and len(active_tasks) >= 2: # Select source task using circular indexing: mod(k+1, K) k = i # current task index (target task) K = len(active_tasks) # number of active tasks source_idx_in_active = (active_tasks.index(k) + 1) % K source_task = active_tasks[source_idx_in_active] # Perform knowledge transfer using independent function transferred_decs = tgp_slcb_knowledge_transfer( target_task_idx=i, source_task_idx=source_task, decs=decs, objs=objs, dims=dims, n_transfer=self.Nt ) # Add transferred solutions to offspring population off_decs = np.vstack([off_decs, transferred_decs]) else: off_decs = off_decs.copy() # Evaluate offspring population offobjs, offcons = evaluation_single(problem, off_decs, i) # Merge parent and offspring populations objs[i], decs[i], cons[i] = vstack_groups( (objs[i], offobjs), (decs[i], off_decs), (cons[i], offcons) ) # Environmental selection: keep the best n individuals index = selection_elit(objs[i], n_per_task[i], cons[i]) objs[i], decs[i], cons[i] = select_by_index(index, objs[i], decs[i], cons[i]) # Update evaluation counter and progress bar nfes_per_task[i] += off_decs.shape[0] pbar.update(off_decs.shape[0]) # Store optimization history append_history(all_decs[i], decs[i], all_objs[i], objs[i], all_cons[i], cons[i]) gen += 1 pbar.close() runtime = time.time() - start_time # Build and save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results