Source code for ddmtolab.Algorithms.STMO.CPS_MOEA

"""
Classification and Pareto Domination Based Multi-Objective Evolutionary Algorithm (CPS-MOEA)

This module implements CPS-MOEA for multi-objective optimization problems.

References
----------
    [1] J. Zhang, A. Zhou, and G. Zhang. "A classification and Pareto domination based multiobjective evolutionary algorithm." Proceedings of the IEEE Congress on Evolutionary Computation, 2015, 2883-2890.

Notes
-----
Author: Converted from MATLAB implementation
Date: 2025.01.22
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class CPS_MOEA: """ Classification and Pareto Domination Based Multi-Objective Evolutionary Algorithm. This algorithm uses KNN-based classification to distinguish between good and bad solutions, and uses this information to guide offspring generation via differential evolution with surrogate-assisted pre-selection. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, 3]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'True', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, M=3, CR=1.0, F=0.5, proM=1.0, disM=20.0, k_neighbors=5, save_data=True, save_path='./Data', name='CPS-MOEA', disable_tqdm=True): """ Initialize CPS-MOEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) M : int, optional Number of candidate offsprings generated per solution (default: 3) CR : float, optional Crossover probability for differential evolution (default: 1.0) F : float, optional Scaling factor for differential evolution (default: 0.5) proM : float, optional Expected number of mutated variables (default: 1.0) disM : float, optional Distribution index for polynomial mutation (default: 20.0) k_neighbors : int, optional Number of neighbors for KNN classification (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'CPS-MOEA_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.M = M self.CR = CR self.F = F self.proM = proM self.disM = disM self.k_neighbors = k_neighbors self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm # Initialize KNN model storage self.knn_models = []
[docs] def optimize(self): """ Execute the CPS-MOEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() db_decs = [d.copy() for d in decs] db_objs = [o.copy() for o in objs] db_cons = [c.copy() for c in cons] # Initialize Pgood and Pbad for each task Pgood_decs, Pgood_objs, Pgood_cons = [], [], [] Pbad_decs, Pbad_objs, Pbad_cons = [], [], [] # Initialize KNN models for each task self.knn_models = [{'data': None, 'label': None} for _ in range(nt)] for i in range(nt): half_n = n_per_task[i] // 2 pgood_d, pgood_o, pgood_c, pbad_d, pbad_o, pbad_c = self._nds_split( decs[i], objs[i], cons[i], half_n ) Pgood_decs.append(pgood_d) Pgood_objs.append(pgood_o) Pgood_cons.append(pgood_c) Pbad_decs.append(pbad_d) Pbad_objs.append(pbad_o) Pbad_cons.append(pbad_c) # Train initial KNN model self._train_knn(i, pgood_d, pbad_d) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Generate offsprings using DE and KNN-based classification off_decs = self._generate_offspring(i, decs[i], n_per_task[i]) off_objs, off_cons = evaluation_single(problem, off_decs, i) db_decs[i] = np.vstack([db_decs[i], off_decs]) db_objs[i] = np.vstack([db_objs[i], off_objs]) db_cons[i] = np.vstack([db_cons[i], off_cons]) # Merge current population and offspring objs[i], decs[i], cons[i] = vstack_groups( (objs[i], off_objs), (decs[i], off_decs), (cons[i], off_cons) ) # Environmental selection: NDS-based selection to keep N individuals rank, front_no, crowd_dis = nsga2_sort(objs[i], cons[i]) index = np.argsort(rank)[:n_per_task[i]] objs[i], decs[i], cons[i] = select_by_index(index, objs[i], decs[i], cons[i]) # Update Pgood and Pbad sets front_no_off = self._compute_front_no(off_objs, off_cons) good_mask = front_no_off == 1 # Merge offspring with Pgood and Pbad new_pgood_decs, new_pgood_objs, new_pgood_cons = vstack_groups( (Pgood_decs[i], off_decs[good_mask]), (Pgood_objs[i], off_objs[good_mask]), (Pgood_cons[i], off_cons[good_mask] if off_cons is not None else None) ) new_pbad_decs, new_pbad_objs, new_pbad_cons = vstack_groups( (Pbad_decs[i], off_decs[~good_mask]), (Pbad_objs[i], off_objs[~good_mask]), (Pbad_cons[i], off_cons[~good_mask] if off_cons is not None else None) ) # Select best half for Pgood and Pbad half_n = n_per_task[i] // 2 Pgood_decs[i], Pgood_objs[i], Pgood_cons[i], _, _, _ = self._nds_split( new_pgood_decs, new_pgood_objs, new_pgood_cons, half_n ) Pbad_decs[i], Pbad_objs[i], Pbad_cons[i], _, _, _ = self._nds_split( new_pbad_decs, new_pbad_objs, new_pbad_cons, half_n ) # Update KNN model self._train_knn(i, Pgood_decs[i], Pbad_decs[i]) nfes_per_task[i] += n_per_task[i] pbar.update(n_per_task[i]) pbar.close() runtime = time.time() - start_time all_decs, all_objs, all_cons = build_staircase_history(db_decs, db_objs, k=self.n, db_cons=db_cons) # Save results results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
def _nds_split(self, decs, objs, cons, K): """ Sort population and split into good (Pgood) and bad (Pbad) sets. Parameters ---------- decs : np.ndarray Decision variables objs : np.ndarray Objective values cons : np.ndarray or None Constraint values K : int Size of each set Returns ------- Pgood_decs, Pgood_objs, Pgood_cons, Pbad_decs, Pbad_objs, Pbad_cons """ rank, front_no, crowd_dis = nsga2_sort(objs, cons) sorted_idx = np.argsort(rank) # Select top K as Pgood pgood_idx = sorted_idx[:K] # Select bottom K as Pbad pbad_idx = sorted_idx[-K:] pgood_decs = decs[pgood_idx] pgood_objs = objs[pgood_idx] pgood_cons = cons[pgood_idx] if cons is not None else None pbad_decs = decs[pbad_idx] pbad_objs = objs[pbad_idx] pbad_cons = cons[pbad_idx] if cons is not None else None return pgood_decs, pgood_objs, pgood_cons, pbad_decs, pbad_objs, pbad_cons def _train_knn(self, task_idx, good_decs, bad_decs): """ Train KNN model for classification. Parameters ---------- task_idx : int Task index good_decs : np.ndarray Decision variables of good solutions bad_decs : np.ndarray Decision variables of bad solutions """ data = np.vstack([good_decs, bad_decs]) labels = np.hstack([np.ones(len(good_decs), dtype=bool), np.zeros(len(bad_decs), dtype=bool)]) self.knn_models[task_idx] = {'data': data, 'label': labels} def _predict_knn(self, task_idx, test_decs): """ Predict labels using KNN model. Parameters ---------- task_idx : int Task index test_decs : np.ndarray Test decision variables Returns ------- np.ndarray Boolean array indicating predicted class (True=good, False=bad) """ model = self.knn_models[task_idx] # Compute pairwise distances distances = np.sqrt(((test_decs[:, np.newaxis, :] - model['data'][np.newaxis, :, :]) ** 2).sum(axis=2)) # Find k nearest neighbors k_nearest_idx = np.argsort(distances, axis=1)[:, :self.k_neighbors] # Get labels of k nearest neighbors k_nearest_labels = model['label'][k_nearest_idx] # Predict: majority voting (>50% of neighbors are good) predictions = np.sum(k_nearest_labels, axis=1) > (self.k_neighbors / 2) return predictions def _generate_offspring(self, task_idx, parent_decs, N): """ Generate offspring using DE and KNN-based classification pre-selection. Parameters ---------- task_idx : int Task index parent_decs : np.ndarray Parent decision variables N : int Number of offsprings to generate Returns ------- np.ndarray Selected offspring decision variables """ M = self.M pop_size = len(parent_decs) # Generate M candidate offsprings for each parent all_candidates = [] for _ in range(M): # Random selection for DE parent1_idx = np.arange(pop_size) parent2_idx = np.random.randint(0, pop_size, size=pop_size) parent3_idx = np.random.randint(0, pop_size, size=pop_size) parent1 = parent_decs[parent1_idx] parent2 = parent_decs[parent2_idx] parent3 = parent_decs[parent3_idx] # Differential evolution operator candidates = self._operator_de(parent1, parent2, parent3) all_candidates.append(candidates) # Stack all candidates: shape (N, M, D) all_candidates = np.array(all_candidates).transpose(1, 0, 2) # (N, M, D) # Flatten for KNN prediction flat_candidates = all_candidates.reshape(-1, all_candidates.shape[-1]) # KNN classification predictions = self._predict_knn(task_idx, flat_candidates) predictions = predictions.reshape(N, M).astype(float) # Add random noise for tie-breaking predictions += np.random.rand(N, M) * 0.01 # Select best candidate for each parent best_idx = np.argmax(predictions, axis=1) offspring = all_candidates[np.arange(N), best_idx, :] return offspring def _operator_de(self, parent1, parent2, parent3): """ Differential evolution operator with polynomial mutation in [0,1] space. Parameters ---------- parent1, parent2, parent3 : np.ndarray Parent decision variables Returns ------- np.ndarray Offspring decision variables """ N, D = parent1.shape # Differential evolution: offspring = parent1 + F * (parent2 - parent3) site = np.random.rand(N, D) < self.CR offspring = parent1.copy() offspring[site] = offspring[site] + self.F * (parent2[site] - parent3[site]) # Polynomial mutation site = np.random.rand(N, D) < (self.proM / D) mu = np.random.rand(N, D) # Ensure offspring is within [0,1] before mutation offspring = np.clip(offspring, 0.0, 1.0) # Mutation for mu <= 0.5 temp = site & (mu <= 0.5) delta = (2.0 * mu + (1.0 - 2.0 * mu) * (1.0 - offspring) ** (self.disM + 1)) ** (1.0 / (self.disM + 1)) - 1.0 offspring[temp] = offspring[temp] + delta[temp] # Mutation for mu > 0.5 temp = site & (mu > 0.5) delta = 1.0 - (2.0 * (1.0 - mu) + 2.0 * (mu - 0.5) * (1.0 - (1.0 - offspring)) ** (self.disM + 1)) ** (1.0 / (self.disM + 1)) offspring[temp] = offspring[temp] + delta[temp] # Final bound check to ensure [0,1] offspring = np.clip(offspring, 0.0, 1.0) return offspring def _compute_front_no(self, objs, cons=None): """ Compute front numbers for solutions. Parameters ---------- objs : np.ndarray Objective values cons : np.ndarray, optional Constraint values Returns ------- np.ndarray Front number for each solution """ pop_size = objs.shape[0] if cons is not None: front_no, _ = nd_sort(objs, cons, pop_size) else: front_no, _ = nd_sort(objs, pop_size) return front_no
def nsga2_sort(objs, cons=None): """ Sort solutions based on NSGA-II criteria using non-dominated sorting and crowding distance. Parameters ---------- objs : np.ndarray Objective value matrix of shape (pop_size, n_obj) cons : np.ndarray, optional Constraint matrix of shape (pop_size, n_con). If None, no constraints are considered (default: None) Returns ------- rank : np.ndarray Ranking of each solution (0-based index after sorting) of shape (pop_size,). rank[i] indicates the position of solution i in the sorted order front_no : np.ndarray Non-dominated front number of each solution of shape (pop_size,) crowd_dis : np.ndarray Crowding distance of each solution of shape (pop_size,) Notes ----- Solutions are sorted first by front number (ascending), then by crowding distance (descending). Larger crowding distance values indicate better diversity preservation. """ pop_size = objs.shape[0] # Perform non-dominated sorting if cons is not None: front_no, _ = nd_sort(objs, cons, pop_size) else: front_no, _ = nd_sort(objs, pop_size) # Calculate crowding distance for diversity preservation crowd_dis = crowding_distance(objs, front_no) # Sort by front number (ascending), then by crowding distance (descending) sorted_indices = np.lexsort((-crowd_dis, front_no)) # Create rank array: rank[i] gives the sorted position of solution i rank = np.empty(pop_size, dtype=int) rank[sorted_indices] = np.arange(pop_size) return rank, front_no, crowd_dis # from Problems.STMO.DTLZ import DTLZ, SETTINGS # problem = DTLZ().DTLZ1() # results = CPSMOEA(problem).optimize()