Source code for ddmtolab.Algorithms.STMO.TwoArch2

"""
Two-Archive Algorithm 2 (Two_Arch2)

This module implements Two_Arch2 for many-objective optimization problems.

References
----------
    [1] Wang, H., Jiao, L., & Yao, X. (2015). Two_Arch2: An improved two-archive algorithm for many-objective optimization. IEEE Transactions on Evolutionary Computation, 19(4), 524-541.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.13
Version: 1.0
"""
from tqdm import tqdm
import time
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class TwoArch2: """ Two-Archive Algorithm 2 for many-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '0', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, CA_size=None, p=None, save_data=True, save_path='./Data', name='Two_Arch2', disable_tqdm=True): """ Initialize Two_Arch2 algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) CA_size : int or None, optional Convergence archive size (default: None, will be set to population size) p : float or None, optional Parameter for fractional distance (default: None, will be set to 1/M) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'Two_Arch2_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.CA_size = CA_size self.p = p self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the Two_Arch2 algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, _ = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs = init_history(decs, objs) # Initialize archives for each task CAs = [] # Convergence Archive DAs = [] # Diversity Archive for i in range(nt): # Set CA size and p parameter for this task CA_size_i = self.CA_size if self.CA_size is not None else n_per_task[i] p_i = self.p if self.p is not None else 1.0 / objs[i].shape[1] # Initialize archives CA_i = self._update_CA([], objs[i], decs[i], CA_size_i) DA_i = self._update_DA([], [], objs[i], decs[i], n_per_task[i], p_i) CAs.append(CA_i) DAs.append(DA_i) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Get CA and DA for current task CA_objs_i, CA_decs_i = CAs[i] DA_objs_i, DA_decs_i = DAs[i] # Mating selection parentC_indices, parentM_indices = self._mating_selection( CA_objs_i, CA_decs_i, DA_objs_i, DA_decs_i, n_per_task[i] ) # Generate offspring through crossover and mutation # ParentC: for convergence (SBX + polynomial mutation) parentC_decs = np.vstack([CA_decs_i[parentC_indices[:len(parentC_indices) // 2]], DA_decs_i[parentC_indices[len(parentC_indices) // 2:]]]) off_decs_C = ga_generation(parentC_decs, muc=20.0, mum=0) # ParentM: for diversity (only mutation) parentM_decs = CA_decs_i[parentM_indices] off_decs_M = ga_generation(parentM_decs, muc=0, mum=20.0) # Combine offspring off_decs = np.vstack([off_decs_C, off_decs_M]) off_objs, _ = evaluation_single(problem, off_decs, i) # Update archives CA_objs_i, CA_decs_i = self._update_CA( (CA_objs_i, CA_decs_i), off_objs, off_decs, self.CA_size if self.CA_size is not None else n_per_task[i] ) DA_objs_i, DA_decs_i = self._update_DA( (DA_objs_i, DA_decs_i), (CA_objs_i, CA_decs_i), off_objs, off_decs, n_per_task[i], self.p if self.p is not None else 1.0 / objs[i].shape[1] ) # Update archives CAs[i] = (CA_objs_i, CA_decs_i) DAs[i] = (DA_objs_i, DA_decs_i) # Update main population with DA for tracking objs[i] = DA_objs_i decs[i] = DA_decs_i nfes_per_task[i] += off_decs.shape[0] pbar.update(off_decs.shape[0]) append_history(all_decs[i], decs[i], all_objs[i], objs[i]) pbar.close() runtime = time.time() - start_time # Save results (using DA as final population) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) results.CAs = CAs results.DAs = DAs return results
def _mating_selection(self, CA_objs, CA_decs, DA_objs, DA_decs, N): """ Mating selection for Two_Arch2. Parameters ---------- CA_objs : ndarray Convergence archive objectives CA_decs : ndarray Convergence archive decisions DA_objs : ndarray Diversity archive objectives DA_decs : ndarray Diversity archive decisions N : int Population size Returns ------- parentC_indices : ndarray Indices for convergence parents parentM_indices : ndarray Indices for diversity parents """ CA_size = CA_objs.shape[0] DA_size = DA_objs.shape[0] # Select parents from CA (for convergence) CA_parent1 = np.random.randint(0, CA_size, size=int(np.ceil(N / 2))) CA_parent2 = np.random.randint(0, CA_size, size=int(np.ceil(N / 2))) # Determine dominance between CA parents dominate = np.zeros(len(CA_parent1)) for idx in range(len(CA_parent1)): p1 = CA_parent1[idx] p2 = CA_parent2[idx] # Check if p1 dominates p2 less = np.any(CA_objs[p1] < CA_objs[p2]) greater = np.any(CA_objs[p1] > CA_objs[p2]) if less and not greater: dominate[idx] = 1 elif greater and not less: dominate[idx] = -1 # Select based on dominance parentC_from_CA = [] for idx in range(len(dominate)): if dominate[idx] == 1: parentC_from_CA.append(CA_parent1[idx]) else: parentC_from_CA.append(CA_parent2[idx]) # Add random parents from DA parentC_from_DA = np.random.randint(0, DA_size, size=int(np.ceil(N / 2))) # Combine parents for convergence parentC_indices = np.concatenate([parentC_from_CA, parentC_from_DA]) # Select parents from CA for mutation (diversity) parentM_indices = np.random.randint(0, CA_size, size=N) return parentC_indices, parentM_indices def _update_CA(self, CA, new_objs, new_decs, max_size): """ Update Convergence Archive (CA). Parameters ---------- CA : tuple or None Current CA (objs, decs) or None new_objs : ndarray New objectives to add new_decs : ndarray New decisions to add max_size : int Maximum size of CA Returns ------- CA_objs : ndarray Updated CA objectives CA_decs : ndarray Updated CA decisions """ if CA is None or len(CA) == 0: CA_objs = new_objs CA_decs = new_decs else: CA_objs, CA_decs = CA CA_objs = np.vstack([CA_objs, new_objs]) CA_decs = np.vstack([CA_decs, new_decs]) N = CA_objs.shape[0] if N <= max_size: return CA_objs, CA_decs # Normalize objectives min_vals = np.min(CA_objs, axis=0) max_vals = np.max(CA_objs, axis=0) range_vals = max_vals - min_vals range_vals[range_vals == 0] = 1 # Avoid division by zero CA_objs_norm = (CA_objs - min_vals) / range_vals # Calculate indicator matrix I I = np.zeros((N, N)) for i in range(N): for j in range(N): I[i, j] = np.max(CA_objs_norm[i] - CA_objs_norm[j]) # Calculate normalization constants C = np.max(np.abs(I), axis=0) C[C == 0] = 1.0 # Avoid division by zero # Calculate fitness (IBEA fitness) kappa = 0.05 C_matrix = np.tile(C, (N, 1)) # Add small epsilon to avoid division by zero epsilon = 1e-10 C_matrix = np.maximum(C_matrix, epsilon) # Calculate fitness with numerical stability exponent = -I / C_matrix / kappa # Clip exponent to avoid overflow/underflow exponent = np.clip(exponent, -100, 100) F = np.sum(-np.exp(exponent), axis=0) + 1 # Delete solutions based on fitness choose = np.arange(N) while len(choose) > max_size: # Find solution with minimum fitness min_idx = np.argmin(F[choose]) to_remove = choose[min_idx] # Update fitness values with numerical stability if C[to_remove] > epsilon: exponent_update = -I[to_remove, :] / C[to_remove] / kappa exponent_update = np.clip(exponent_update, -100, 100) F = F + np.exp(exponent_update) # Remove solution choose = np.delete(choose, min_idx) return CA_objs[choose], CA_decs[choose] def _update_DA(self, DA, CA, new_objs, new_decs, max_size, p): """ Update Diversity Archive (DA). Parameters ---------- DA : tuple or None Current DA (objs, decs) or None CA : tuple or None Current CA (objs, decs) or None new_objs : ndarray New objectives to add new_decs : ndarray New decisions to add max_size : int Maximum size of DA p : float Parameter for fractional distance Returns ------- DA_objs : ndarray Updated DA objectives DA_decs : ndarray Updated DA decisions """ # Combine current DA and new solutions if DA is None or len(DA) == 0: DA_objs = new_objs DA_decs = new_decs else: DA_objs, DA_decs = DA DA_objs = np.vstack([DA_objs, new_objs]) DA_decs = np.vstack([DA_decs, new_decs]) # Non-dominated sorting using existing nd_sort function N = DA_objs.shape[0] # Use nd_sort to get non-dominated front front_no, max_fno = nd_sort(DA_objs, N) # Get non-dominated solutions (front number 1) non_dominated_mask = front_no == 1 DA_objs = DA_objs[non_dominated_mask] DA_decs = DA_decs[non_dominated_mask] N = DA_objs.shape[0] if N <= max_size: return DA_objs, DA_decs # Select extreme solutions first choose = np.zeros(N, dtype=bool) # Minimum values for each objective for m in range(DA_objs.shape[1]): min_idx = np.argmin(DA_objs[:, m]) choose[min_idx] = True # Maximum values for each objective for m in range(DA_objs.shape[1]): max_idx = np.argmax(DA_objs[:, m]) choose[max_idx] = True # Adjust selection size if np.sum(choose) > max_size: # Randomly delete some solutions chosen_indices = np.where(choose)[0] to_remove = np.random.choice(chosen_indices, size=np.sum(choose) - max_size, replace=False) choose[to_remove] = False elif np.sum(choose) < max_size: # Add solutions using truncation strategy # Calculate distance matrix with p-norm distance = np.full((N, N), np.inf) # More efficient calculation using vectorization for i in range(N): diff = DA_objs - DA_objs[i] # Calculate p-norm distance if p == 1: # Manhattan distance distance[i, :] = np.sum(np.abs(diff), axis=1) elif p == 2: # Euclidean distance distance[i, :] = np.sqrt(np.sum(diff ** 2, axis=1)) else: # General p-norm distance[i, :] = np.sum(np.abs(diff) ** p, axis=1) ** (1 / p) # Set diagonal to infinity np.fill_diagonal(distance, np.inf) # Add solutions until reaching max_size while np.sum(choose) < max_size: remaining = np.where(~choose)[0] chosen = np.where(choose)[0] if len(chosen) == 0: # If no solutions chosen yet, choose randomly choose[np.random.choice(remaining)] = True else: # Calculate minimum distance to chosen solutions for each remaining solution min_distances = np.min(distance[np.ix_(remaining, chosen)], axis=1) # Select solution with maximum minimum distance max_idx = np.argmax(min_distances) choose[remaining[max_idx]] = True return DA_objs[choose], DA_decs[choose]