Source code for ddmtolab.Algorithms.STMO.MCEA_D

"""
Multiple Classifiers-assisted Evolutionary Algorithm based on Decomposition (MCEAD)

This module implements MCEAD for multi-objective optimization problems.

References
----------
    [1] T. Sonoda and M. Nakata. Multiple classifiers-assisted evolutionary algorithm based on decomposition for high-dimensional multi-objective problems. IEEE Transactions on Evolutionary Computation, 2022.

Notes
-----
Author: Haowei Guo
Email: ghw@mail.nwpu.edu.cn
Date: 2026.01.06
Version: 1.0
"""

import numpy as np
from tqdm import tqdm
import time
from sklearn.svm import SVC
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MCEA_D: """ Multiple Classifiers-assisted Evolutionary Algorithm based on Decomposition for multi-objective optimization Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'True', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, delta=0.9, nr=2, r_max=10, save_data=True, save_path='./Data', name='MCEA-D', disable_tqdm=True): """ Initialize MCEAD algorithm. Parameters ---------- problem : MTOP Problem instance. n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) delta : float, optional Probability of choosing parents from neighborhood (default: 0.9). nr : int, optional Maximum number of solutions replaced by each offspring (default: 2). r_max : int, optional Maximum repeat time of offspring generation (default: 10). save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path for saving results (default: './TestData') name : str, optional Name of the experiment/file (default: 'MCEAD_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 50 self.max_nfes = max_nfes if max_nfes is not None else 500 self.delta = delta self.nr = nr self.r_max = r_max self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MCEAD algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) W_list = [] B_list = [] Z_list = [] svm_groups = [] # Stage 1: Pre-calculate weights and update population size for t in range(nt): n_objs_t = problem.n_objs[t] W, N = uniform_point(n_per_task[t], n_objs_t) n_per_task[t] = N W_list.append(W) # Compute neighboring vectors T = int(np.ceil(N / 10)) dists = cdist(W, W, metric='euclidean') B = np.argsort(dists, axis=1)[:, :T] B_list.append(B) # Stage 2: Unified initialization decs_list = initialization(problem, n=n_per_task, method='lhs') objs_list, cons_list = evaluation(problem, decs_list) db_decs = [d.copy() for d in decs_list] db_objs = [o.copy() for o in objs_list] db_cons = [c.copy() for c in cons_list] archive_decs_list = [] archive_objs_list = [] # Stage 3: Build environments task-wise (SVM, Z, Archive) for t in range(nt): N = n_per_task[t] objs = objs_list[t] decs = decs_list[t] # 2.4 Ideal point Z Z = np.min(objs, axis=0) Z_list.append(Z) # 2.5 Archive (Copy initial population) archive_decs_list.append(decs.copy()) archive_objs_list.append(objs.copy()) # 2.6 Init SVMs (one for each subproblem in task t) svm_list = [SVM(problem, task_id=t) for _ in range(N)] svm_groups.append(svm_list) # Progress Bar nfes_per_task = n_per_task.copy() pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) # Stage 4: Optimization Loop while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [t for t in range(nt) if nfes_per_task[t] < max_nfes_per_task[t]] if not active_tasks: break for t in active_tasks: N = n_per_task[t] W = W_list[t] B = B_list[t] Z = Z_list[t] current_svm_list = svm_groups[t] # MCEAD logic: Iterate over all subproblems; switch task after one full cycle (N subproblems). for i in range(N): if nfes_per_task[t] >= max_nfes_per_task[t]: break # 4.1: Model Construction current_svm_list[i].model_construction( archive_decs_list[t], archive_objs_list[t], B[i], W, Z ) # 4.2: Choose Parents if np.random.rand() < self.delta: P = B[i].copy() else: P = np.arange(N) np.random.shuffle(P) # 4.3: Solution Generation off_dec = solution_generation( problem=problem, population_decs=decs_list[t], parents_indices=P, svm_model=current_svm_list[i], r_max=self.r_max, current_idx=i, task_id=t ) # 4.4: Evaluate Offspring off_obj, off_con = evaluation_single(problem, off_dec.reshape(1, -1), t) nfes_per_task[t] += 1 pbar.update(1) db_decs[t] = np.vstack([db_decs[t], off_dec.reshape(1, -1)]) db_objs[t] = np.vstack([db_objs[t], off_obj]) db_cons[t] = np.vstack([db_cons[t], off_con]) # Update Archive (Task t) archive_decs_list[t] = np.vstack((archive_decs_list[t], off_dec)) archive_objs_list[t] = np.vstack((archive_objs_list[t], off_obj)) # 4.5: Update Reference Point Z = np.min(np.vstack((Z, off_obj)), axis=0) Z_list[t] = Z # 4.6: Update Population g_old = np.max(np.abs(objs_list[t][P] - Z) * W[P], axis=1) g_new = np.max(np.abs(off_obj - Z) * W[P], axis=1) replace_mask = g_old >= g_new replace_indices = P[replace_mask] if len(replace_indices) > self.nr: replace_indices = replace_indices[:self.nr] if len(replace_indices) > 0: decs_list[t][replace_indices] = off_dec objs_list[t][replace_indices] = off_obj if cons_list[t] is not None: cons_list[t][replace_indices] = off_con pbar.close() runtime = time.time() - start_time all_decs, all_objs, all_cons = build_staircase_history(db_decs, db_objs, k=1, db_cons=db_cons) # Save Results results = build_save_results( all_decs=all_decs, all_objs=all_objs, all_cons=all_cons, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
class SVM: """ Support Vector Machine (SVM) wrapper for classification-assisted optimization. This class manages a local SVM model for a specific subproblem in the MCEAD algorithm. It constructs a decision boundary to distinguish between promising and unpromising solutions based on the current population's performance in the neighborhood. """ def __init__(self, problem, task_id): self.problem = problem self.task_id = task_id self.model = None self.C = 1.0 self.gamma_matlab = 1.0 self.gamma_sklearn = self.gamma_matlab # Access bounds from MTOP bounds list [(lb, ub), ...] self.lower = problem.bounds[task_id][0].flatten() self.upper = problem.bounds[task_id][1].flatten() self.n_vars = len(self.lower) def uniform_input(self, x): denom = self.upper - self.lower denom[denom == 0] = 1e-10 return (x - self.lower) / denom def model_construction(self, archive_decs, archive_objs, neighbor_indices, W, Z): """ Construct and train the SVM model based on neighboring solutions. The model classifies solutions as 'good' (label 1) or 'bad' (label -1) based on their Tchebycheff aggregation values. """ n_samples = len(archive_decs) labels = -1 * np.ones(n_samples) c_i = set() for neighbor_idx in neighbor_indices: w_vec = W[neighbor_idx] g_data = np.max(np.abs(archive_objs - Z) * w_vec, axis=1) sorted_indices = np.argsort(g_data) for best_idx in sorted_indices: if best_idx not in c_i: c_i.add(best_idx) labels[best_idx] = 1 break X_train = np.array([self.uniform_input(x) for x in archive_decs]) y_train = labels if len(np.unique(y_train)) > 1: self.model = SVC(C=self.C, kernel='rbf', gamma=self.gamma_sklearn, probability=False) self.model.fit(X_train, y_train) else: self.model = None def predict_class(self, x): """ Predict the class and decision score for a candidate solution. Returns ------- pred_class : int Predicted class label (1 for good, -1/0 for bad). score : float Decision function score (distance to hyperplane). """ if self.model is None: return 1, 0.0 x_uni = self.uniform_input(x).reshape(1, -1) pred_class = self.model.predict(x_uni)[0] score = self.model.decision_function(x_uni)[0] return pred_class, score def operator_de(problem, parent1, parent2, parent3, task_id, params=None): """ Perform Differential Evolution (DE) crossover and Polynomial Mutation. Returns ------- offspring : np.ndarray The generated offspring decision vector. """ if params is None: # CR=1, F=0.5, proM=1, disM=20 CR, F, proM, disM = 1.0, 0.5, 1.0, 20.0 else: CR, F, proM, disM = params lower = problem.bounds[task_id][0].flatten() upper = problem.bounds[task_id][1].flatten() D = len(lower) offspring = parent1.copy() site_de = np.random.rand(D) < CR diff = F * (parent2 - parent3) offspring[site_de] = offspring[site_de] + diff[site_de] # Polynomial Mutation site_pm = np.random.rand(D) < (proM / D) mu = np.random.rand(D) offspring = np.clip(offspring, lower, upper) temp = site_pm & (mu <= 0.5) l_temp, u_temp = lower[temp], upper[temp] o_temp, m_temp = offspring[temp], mu[temp] if np.any(temp): val = 2.0 * m_temp + (1.0 - 2.0 * m_temp) * \ (1.0 - (o_temp - l_temp) / (u_temp - l_temp)) ** (disM + 1) delta = val ** (1.0 / (disM + 1)) - 1.0 offspring[temp] = o_temp + (u_temp - l_temp) * delta temp = site_pm & (mu > 0.5) l_temp, u_temp = lower[temp], upper[temp] o_temp, m_temp = offspring[temp], mu[temp] if np.any(temp): val = 2.0 * (1.0 - m_temp) + 2.0 * (m_temp - 0.5) * \ (1.0 - (u_temp - o_temp) / (u_temp - l_temp)) ** (disM + 1) delta = 1.0 - val ** (1.0 / (disM + 1)) offspring[temp] = o_temp + (u_temp - l_temp) * delta offspring = np.clip(offspring, lower, upper) return offspring def solution_generation(problem, population_decs, parents_indices, svm_model, r_max, current_idx, task_id): """ Generate a new solution using DE and SVM assistance. Attempts to generate a solution that is predicted as 'good' (class 1) by the SVM. If no such solution is found within `r_max` tries, returns the one with the highest decision score. Returns ------- y_best : np.ndarray The best generated offspring solution found. """ x_i = population_decs[current_idx] d_i_max = -np.inf y_best = None P = parents_indices.copy() for r in range(r_max): p1_idx = P[0] p2_idx = P[1] x_p1 = population_decs[p1_idx] x_p2 = population_decs[p2_idx] candidate = operator_de(problem, x_i, x_p1, x_p2, task_id) np.random.shuffle(P) c_pred, score = svm_model.predict_class(candidate) if c_pred == 1: return candidate else: if r == 0: d_i_max = score y_best = candidate elif score > d_i_max: d_i_max = score y_best = candidate return y_best