Source code for ddmtolab.Algorithms.STMO.C_TAEA

"""
Constrained Two-Archive Evolutionary Algorithm (C-TAEA)

This module implements C-TAEA for constrained multi-objective optimization problems.

References
----------
    [1] Li, Ke, et al. "Two-archive evolutionary algorithm for constrained multi-objective optimization." IEEE Transactions on Evolutionary Computation 23.2 (2018): 303-315.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.01
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class C_TAEA: """ Constrained Two-Archive Evolutionary Algorithm for constrained multi-objective optimization. C-TAEA uses two co-evolving archives: - Convergence Archive (CA): Focuses on convergence towards the Pareto front - Diversity Archive (DA): Maintains diversity in the objective space Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, muc=20.0, mum=15.0, save_data=True, save_path='./Data', name='C-TAEA', disable_tqdm=True): """ Initialize C-TAEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) muc : float, optional Distribution index for simulated binary crossover (SBX) (default: 20.0) mum : float, optional Distribution index for polynomial mutation (PM) (default: 15.0) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'CTAEA_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.muc = muc self.mum = mum self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the C-TAEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks no = problem.n_objs n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate uniformly distributed weight vectors for each task W = [] for i in range(nt): W_i, n = uniform_point(n_per_task[i], no[i]) W.append(W_i) n_per_task[i] = n # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() # Initialize CA and DA archives for each task CAs = [] DAs = [] for i in range(nt): CA_i = self._update_CA(None, objs[i], decs[i], cons[i], W[i], n_per_task[i]) DA_i = self._update_DA(CA_i, None, objs[i], decs[i], cons[i], W[i], n_per_task[i]) CAs.append(CA_i) DAs.append(DA_i) # History tracking uses CA all_decs, all_objs, all_cons = init_history( [CA_i['decs'] for CA_i in CAs], [CA_i['objs'] for CA_i in CAs], [CA_i['cons'] for CA_i in CAs] ) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: CA_i = CAs[i] DA_i = DAs[i] # Calculate the ratio of non-dominated solutions in CA and DA Hm_objs = np.vstack([CA_i['objs'], DA_i['objs']]) Hm_cons = np.vstack([CA_i['cons'], DA_i['cons']]) if CA_i['cons'] is not None else None # Non-dominated sorting for the combined archive front_no, _ = nd_sort(Hm_objs, Hm_cons, Hm_objs.shape[0]) # Calculate proportions CA_size = CA_i['objs'].shape[0] FrontNo_C = front_no[:CA_size] Nc = np.sum(FrontNo_C == 1) Pc = Nc / len(front_no) FrontNo_D = front_no[CA_size:] Nd = np.sum(FrontNo_D == 1) Pd = Nd / len(front_no) # Calculate the proportion of non-dominated solutions in CA front_no_CA, _ = nd_sort(CA_i['objs'], CA_i['cons'], CA_i['objs'].shape[0]) NC = np.sum(front_no_CA == 1) PC = NC / CA_size # Generate offspring off_decs_list = [] for j in range(n_per_task[i]): # Select first parent based on Pc and Pd if Pc > Pd: P1 = self._mating_selection(CA_i) else: P1 = self._mating_selection(DA_i) # Select second parent based on PC pf = np.random.rand() if pf < PC: P2 = self._mating_selection(CA_i) else: P2 = self._mating_selection(DA_i) # Combine parents mating_pool = np.vstack([P1['decs'], P2['decs']]) offspring = ga_generation(mating_pool, muc=self.muc, mum=self.mum) off_decs_list.append(offspring) # Combine all offspring off_decs = np.vstack(off_decs_list) off_objs, off_cons = evaluation_single(problem, off_decs, i) # Update CA and DA CA_i = self._update_CA(CA_i, off_objs, off_decs, off_cons, W[i], n_per_task[i]) DA_i = self._update_DA(CA_i, DA_i, off_objs, off_decs, off_cons, W[i], n_per_task[i]) CAs[i] = CA_i DAs[i] = DA_i # Update evaluation count nfes_per_task[i] += off_decs.shape[0] pbar.update(off_decs.shape[0]) # Update history with CA append_history(all_decs[i], CA_i['decs'], all_objs[i], CA_i['objs'], all_cons[i], CA_i['cons']) pbar.close() runtime = time.time() - start_time # Save results using CA results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
def _mating_selection(self, archive): """ Mating selection for C-TAEA using binary tournament selection. Parameters ---------- archive : dict Archive containing 'objs', 'decs', and 'cons' Returns ------- dict Selected individual with keys 'objs', 'decs', 'cons' """ number = archive['objs'].shape[0] x_1 = np.random.randint(0, number) x_2 = np.random.randint(0, number) CV1 = np.sum(np.maximum(0, archive['cons'][x_1])) if archive['cons'] is not None else 0 CV2 = np.sum(np.maximum(0, archive['cons'][x_2])) if archive['cons'] is not None else 0 if CV1 > 0 and CV2 > 0: # Both infeasible, select randomly x = np.random.randint(0, number) selected_idx = x elif CV1 <= 0 and CV2 > 0: # x_1 is feasible selected_idx = x_1 elif CV1 > 0 and CV2 <= 0: # x_2 is feasible selected_idx = x_2 else: # Both feasible, use non-dominated sorting rnd_objs = np.vstack([archive['objs'][x_1], archive['objs'][x_2]]) rnd_cons = np.vstack([archive['cons'][x_1], archive['cons'][x_2]]) if archive['cons'] is not None else None front_no, _ = nd_sort(rnd_objs, rnd_cons, 2) if front_no[0] <= front_no[1]: selected_idx = x_1 else: selected_idx = x_2 return { 'objs': archive['objs'][selected_idx:selected_idx + 1], 'decs': archive['decs'][selected_idx:selected_idx + 1], 'cons': archive['cons'][selected_idx:selected_idx + 1] if archive['cons'] is not None else None } def _update_CA(self, CA, new_objs, new_decs, new_cons, W, N): """ Update Convergence Archive (CA). Parameters ---------- CA : dict or None Current CA with keys 'objs', 'decs', 'cons' new_objs : ndarray New objectives to add new_decs : ndarray New decisions to add new_cons : ndarray New constraints to add W : ndarray Weight vectors N : int Archive size Returns ------- dict Updated CA with keys 'objs', 'decs', 'cons' """ # Merge CA and new solutions if CA is None: Hc_objs = new_objs Hc_decs = new_decs Hc_cons = new_cons else: Hc_objs = np.vstack([CA['objs'], new_objs]) Hc_decs = np.vstack([CA['decs'], new_decs]) Hc_cons = np.vstack([CA['cons'], new_cons]) if CA['cons'] is not None else new_cons # Calculate constraint violation CV = np.sum(np.maximum(0, Hc_cons), axis=1) if Hc_cons is not None else np.zeros(Hc_objs.shape[0]) # Collect feasible solutions Sc_mask = CV == 0 Sc_objs = Hc_objs[Sc_mask] Sc_decs = Hc_decs[Sc_mask] Sc_cons = Hc_cons[Sc_mask] if Hc_cons is not None else None if len(Sc_objs) == N: return {'objs': Sc_objs, 'decs': Sc_decs, 'cons': Sc_cons} elif len(Sc_objs) > N: # More feasible solutions than needed S_objs, S_decs, S_cons = self._truncate_feasible(Sc_objs, Sc_decs, Sc_cons, W, N) return {'objs': S_objs, 'decs': S_decs, 'cons': S_cons} else: # Less feasible solutions than needed, add infeasible ones SI_mask = ~Sc_mask SI_objs = Hc_objs[SI_mask] SI_decs = Hc_decs[SI_mask] SI_cons = Hc_cons[SI_mask] if Hc_cons is not None else None S_objs, S_decs, S_cons = self._fill_with_infeasible( Sc_objs, Sc_decs, Sc_cons, SI_objs, SI_decs, SI_cons, W, N ) return {'objs': S_objs, 'decs': S_decs, 'cons': S_cons} def _truncate_feasible(self, objs, decs, cons, W, N): """ Truncate feasible solutions using non-dominated sorting and crowding. Parameters ---------- objs : ndarray Objective values decs : ndarray Decision variables cons : ndarray or None Constraint values W : ndarray Weight vectors N : int Target size Returns ------- tuple Truncated (objs, decs, cons) """ # Non-dominated sorting front_no, max_fno = nd_sort(objs, cons, objs.shape[0]) # Add solutions front by front S_indices = [] for i in range(1, max_fno + 1): current_front = np.where(front_no == i)[0] if len(S_indices) + len(current_front) <= N: S_indices.extend(current_front) else: # Need to select from this front S_indices.extend(current_front) break S_indices = np.array(S_indices) S_objs = objs[S_indices] S_decs = decs[S_indices] S_cons = cons[S_indices] if cons is not None else None # Truncate if necessary while len(S_objs) > N: # Normalization Zmax = np.max(S_objs, axis=0) Zmin = np.min(S_objs, axis=0) range_vals = Zmax - Zmin range_vals[range_vals == 0] = 1 # Avoid division by zero SPopObj = (S_objs - Zmin) / range_vals # Associate each solution with a subregion cosine = 1 - cdist(SPopObj, W, metric='cosine') Region = np.argmax(cosine, axis=1) # Find the most crowded subregion unique_regions, counts = np.unique(Region, return_counts=True) most_crowded = unique_regions[np.argmax(counts)] # Find solutions in the most crowded subregion crowded_mask = Region == most_crowded crowded_indices = np.where(crowded_mask)[0] S_crowded_objs = S_objs[crowded_mask] # Calculate distances dist = cdist(S_crowded_objs, S_crowded_objs) np.fill_diagonal(dist, np.inf) # Find the closest pair min_dist = np.min(dist) closest_indices = np.where(dist == min_dist) row = closest_indices[0][0] # Map back to S indices closest_in_S = crowded_indices[row] # Calculate Tchebycheff distance for tie-breaking St_objs = S_objs[crowded_indices] Region_St = Region[crowded_indices] Z = np.min(St_objs, axis=0) g_tch = np.max(np.abs(St_objs - Z) / W[Region_St], axis=1) worst_idx = np.argmax(g_tch) # Map to S indices worst_in_S = crowded_indices[worst_idx] # Remove the worst solution mask = np.ones(len(S_objs), dtype=bool) mask[worst_in_S] = False S_objs = S_objs[mask] S_decs = S_decs[mask] S_cons = S_cons[mask] if S_cons is not None else None return S_objs, S_decs, S_cons def _fill_with_infeasible(self, Sc_objs, Sc_decs, Sc_cons, SI_objs, SI_decs, SI_cons, W, N): """ Fill archive with infeasible solutions when there are not enough feasible ones. Parameters ---------- Sc_objs : ndarray Feasible objectives Sc_decs : ndarray Feasible decisions Sc_cons : ndarray or None Feasible constraints SI_objs : ndarray Infeasible objectives SI_decs : ndarray Infeasible decisions SI_cons : ndarray Infeasible constraints W : ndarray Weight vectors N : int Target size Returns ------- tuple Combined (objs, decs, cons) """ # Create two-objective optimization problem: minimize CV and Tchebycheff CV_SI = np.sum(np.maximum(0, SI_cons), axis=1, keepdims=True) # Associate with subregions cosine = 1 - cdist(SI_objs, W, metric='cosine') Region_SI = np.argmax(cosine, axis=1) Z = np.min(SI_objs, axis=0) g_tch = np.max(np.abs(SI_objs - Z) / W[Region_SI], axis=1, keepdims=True) # Combine CV and Tchebycheff as bi-objective PopObj = np.hstack([CV_SI, g_tch]) # Non-dominated sorting front_no, max_fno = nd_sort(PopObj, None, PopObj.shape[0]) # Start with feasible solutions S_objs = Sc_objs S_decs = Sc_decs S_cons = Sc_cons # Add infeasible solutions front by front for i in range(1, max_fno + 1): current_front = np.where(front_no == i)[0] if len(S_objs) + len(current_front) <= N: S_objs = np.vstack([S_objs, SI_objs[current_front]]) S_decs = np.vstack([S_decs, SI_decs[current_front]]) S_cons = np.vstack([S_cons, SI_cons[current_front]]) if S_cons is not None else SI_cons[current_front] else: # Need to select from this front needed = N - len(S_objs) last_front_indices = current_front # Sort by CV CV_last = CV_SI[last_front_indices].flatten() sorted_indices = np.argsort(CV_last)[:needed] S_objs = np.vstack([S_objs, SI_objs[last_front_indices[sorted_indices]]]) S_decs = np.vstack([S_decs, SI_decs[last_front_indices[sorted_indices]]]) S_cons = np.vstack([S_cons, SI_cons[last_front_indices[sorted_indices]]]) if S_cons is not None else \ SI_cons[last_front_indices[sorted_indices]] break if len(S_objs) >= N: break return S_objs, S_decs, S_cons def _update_DA(self, CA, DA, new_objs, new_decs, new_cons, W, N): """ Update Diversity Archive (DA). Parameters ---------- CA : dict Current CA with keys 'objs', 'decs', 'cons' DA : dict or None Current DA with keys 'objs', 'decs', 'cons' new_objs : ndarray New objectives to add new_decs : ndarray New decisions to add new_cons : ndarray New constraints to add W : ndarray Weight vectors N : int Archive size Returns ------- dict Updated DA with keys 'objs', 'decs', 'cons' """ # Merge DA and new solutions if DA is None: Hd_objs = new_objs.copy() Hd_decs = new_decs.copy() Hd_cons = new_cons.copy() if new_cons is not None else None else: Hd_objs = np.vstack([DA['objs'], new_objs]) Hd_decs = np.vstack([DA['decs'], new_decs]) Hd_cons = np.vstack([DA['cons'], new_cons]) if DA['cons'] is not None else new_cons # Associate solutions in CA with subregions cosine_CA = 1 - cdist(CA['objs'], W, metric='cosine') Region_CA = np.argmax(cosine_CA, axis=1) # Build DA iteratively - use a different approach to avoid index tracking issues selected_indices = [] available_mask = np.ones(len(Hd_objs), dtype=bool) itr = 1 max_itr = 10 * N # Prevent infinite loop while len(selected_indices) < N and itr < max_itr: for region_idx in range(N): # region_idx is the subregion index if len(selected_indices) >= N: break # Count how many CA solutions are in this region current_c_count = np.sum(Region_CA == region_idx) if current_c_count < itr: # Need to add solutions from Hd to this subregion for j in range(itr - current_c_count): if len(selected_indices) >= N: break # Find available candidates in Hd for this region if np.sum(available_mask) == 0: break cosine_Hd = 1 - cdist(Hd_objs[available_mask], W, metric='cosine') Region_Hd_available = np.argmax(cosine_Hd, axis=1) # Get indices in the available subset that belong to current region candidates_in_subset = np.where(Region_Hd_available == region_idx)[0] if len(candidates_in_subset) > 0: # Map back to original Hd indices available_indices = np.where(available_mask)[0] candidates_in_Hd = available_indices[candidates_in_subset] # Get objectives and constraints for candidates cand_objs = Hd_objs[candidates_in_Hd] cand_cons = Hd_cons[candidates_in_Hd] if Hd_cons is not None else None # Non-dominated sorting front_no, _ = nd_sort(cand_objs, cand_cons, len(candidates_in_Hd)) nd_mask = front_no == 1 nd_indices_in_Hd = candidates_in_Hd[nd_mask] if len(nd_indices_in_Hd) > 0: # Calculate Tchebycheff distance for non-dominated solutions nd_objs = Hd_objs[nd_indices_in_Hd] Z = np.min(nd_objs, axis=0) # Get regions for nd solutions cosine_nd = 1 - cdist(nd_objs, W, metric='cosine') Region_nd = np.argmax(cosine_nd, axis=1) g_tch = np.max(np.abs(nd_objs - Z) / W[Region_nd], axis=1) # Select the best one (minimum Tchebycheff) best_local_idx = np.argmin(g_tch) best_global_idx = nd_indices_in_Hd[best_local_idx] # Add to selected and mark as unavailable selected_indices.append(best_global_idx) available_mask[best_global_idx] = False else: break itr += 1 # Build final DA from selected indices if len(selected_indices) == 0: # Fallback: just take first N solutions selected_indices = list(range(min(N, len(Hd_objs)))) # Ensure we have exactly N solutions if len(selected_indices) < N: # Fill with remaining available solutions remaining_indices = np.where(available_mask)[0] needed = N - len(selected_indices) if len(remaining_indices) > 0: selected_indices.extend(remaining_indices[:needed].tolist()) else: # If no more available, just duplicate some existing ones dup_count = 0 while len(selected_indices) < N: selected_indices.append(selected_indices[dup_count % len(selected_indices)]) dup_count += 1 selected_indices = selected_indices[:N] return { 'objs': Hd_objs[selected_indices], 'decs': Hd_decs[selected_indices], 'cons': Hd_cons[selected_indices] if Hd_cons is not None else None }