Source code for ddmtolab.Algorithms.STMO.MOEA_D_FRRMAB

"""
MOEA/D with Fitness-Rate-Rank-based Multi-Armed Bandit (MOEA/D-FRRMAB)

This module implements MOEA/D-FRRMAB for multi-objective optimization problems.

References
----------
    [1] Li, Ke, et al. "Adaptive operator selection with bandits for a multiobjective evolutionary algorithm based on decomposition." IEEE Transactions on Evolutionary Computation 18.1 (2014): 114-130.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.01
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *


[docs] class MOEA_D_FRRMAB: """ MOEA/D with Fitness-Rate-Rank-based Multi-Armed Bandit. This algorithm uses a multi-armed bandit approach to adaptively select differential evolution operators during optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '0', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, C=5, W=None, D=1, T=20, nr=2, save_data=True, save_path='./Data', name='MOEA-D-FRRMAB', disable_tqdm=True): """ Initialize MOEA/D-FRRMAB algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) C : float, optional Scaling factor in bandit-based operator selection (default: 5) W : int or List[int], optional Size of sliding window (default: ceil(n/2)) D : float, optional Decaying factor in calculating credit value (default: 1) T : int, optional Size of neighborhood (default: 20) nr : int, optional Maximum number of solutions replaced by each offspring (default: 2) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'MOEADFRRMAB_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.C = C self.W = W self.D = D self.T = T self.nr = nr self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MOEA/D-FRRMAB algorithm. Returns ------- Results Optimization results containing decision variables, objectives, constraints, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks no = problem.n_objs n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Set sliding window size for each task if self.W is None: W_per_task = [int(np.ceil(n / 2)) for n in n_per_task] else: W_per_task = par_list(self.W, nt) # Generate uniformly distributed weight vectors for each task Weight = [] B = [] # Neighborhood for each task for i in range(nt): Weight_i, n = uniform_point(n_per_task[i], no[i]) Weight.append(Weight_i) n_per_task[i] = n # Detect the neighbors of each solution distance = cdist(Weight_i, Weight_i) B_i = np.argsort(distance, axis=1)[:, :self.T] B.append(B_i) # Initialize population and evaluate for each task decs = initialization(problem, n_per_task) objs, _ = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs = init_history(decs, objs) # Initialize algorithm-specific variables for each task Z = [] # Ideal point Pi = [] # Utility for each subproblem oldObj = [] # Old Tchebycheff function value FRR = [] # Credit value of each operator SW = [] # Sliding window for i in range(nt): Z_i = np.min(objs[i], axis=0) Z.append(Z_i) Pi_i = np.ones(n_per_task[i]) Pi.append(Pi_i) # Calculate old Tchebycheff values oldObj_i = np.max(np.abs((objs[i] - Z_i) * Weight[i]), axis=1) oldObj.append(oldObj_i) # Initialize FRR (4 operators) FRR_i = np.zeros(4) FRR.append(FRR_i) # Initialize sliding window: [operator_indices; rewards] SW_i = np.zeros((2, W_per_task[i])) SW.append(SW_i) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(nfes_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Perform 5 sub-generations for subgen in range(5): # Choose I: boundary solutions + tournament selection # Boundary solutions are those with weight close to axes boundary = np.where(np.sum(Weight[i] < 1e-3, axis=1) == no[i] - 1)[0] n_select = int(np.floor(n_per_task[i] / 5)) - len(boundary) if n_select > 0: # Tournament selection based on negative utility tournament_indices = self._tournament_selection(10, n_select, -Pi[i]) I = np.concatenate([boundary, tournament_indices]) else: I = boundary # For each solution in I for idx in I: if nfes_per_task[i] >= max_nfes_per_task[i]: break # Bandit-based operator selection op = self._frrmab(FRR[i], SW[i], self.C) # Choose the parents if np.random.rand() < 0.9: # From neighborhood P = B[i][idx, np.random.permutation(self.T)] else: # From entire population P = np.random.permutation(n_per_task[i]) # Generate an offspring using the selected DE operator off_dec = self._four_de( op, decs[i][idx], decs[i][P[0]], decs[i][P[1]], decs[i][P[2]], decs[i][P[3]], decs[i][P[4]] if len(P) > 4 else decs[i][P[0]] ) # Evaluate offspring off_obj, _ = evaluation_single(problem, off_dec.reshape(1, -1), i) off_obj = off_obj[0] # Update the ideal point Z[i] = np.minimum(Z[i], off_obj) # Update the solutions in P by Tchebycheff approach P_subset = P[:self.nr * 5] # Consider more neighbors for replacement g_old = np.max(np.abs((objs[i][P_subset] - Z[i]) * Weight[i][P_subset]), axis=1) g_new = np.max(np.abs((off_obj - Z[i]) * Weight[i][P_subset]), axis=1) # Find solutions that can be replaced better_mask = g_old >= g_new replace_indices = np.where(better_mask)[0][:self.nr] if len(replace_indices) > 0: # Replace solutions actual_replace = P_subset[replace_indices] for r_idx in actual_replace: decs[i][r_idx] = off_dec objs[i][r_idx] = off_obj # Calculate Fitness Improvement Rate (FIR) FIR = np.sum((g_old[replace_indices] - g_new[replace_indices]) / g_old[replace_indices]) else: FIR = 0 # Update sliding window SW[i] = np.column_stack([SW[i][:, 1:], [op, FIR]]) # Update FRR through credit assignment FRR[i] = self._credit_assignment(SW[i], self.D) nfes_per_task[i] += 1 pbar.update(1) # Update Pi every 10 generations current_gen = int(np.ceil(nfes_per_task[i] / n_per_task[i])) if current_gen % 10 == 0: # Calculate new Tchebycheff values newObj_i = np.max(np.abs((objs[i] - Z[i]) * Weight[i]), axis=1) DELTA = (oldObj[i] - newObj_i) / oldObj[i] # Update utility Temp = DELTA < 0.001 Pi[i][~Temp] = 1 Pi[i][Temp] = (0.95 + 0.05 * DELTA[Temp] / 0.001) * Pi[i][Temp] oldObj[i] = newObj_i # Update history append_history(all_decs[i], decs[i], all_objs[i], objs[i]) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
def _tournament_selection(self, tournament_size, n_select, fitness): """ Tournament selection based on fitness values. Parameters ---------- tournament_size : int Number of individuals in each tournament n_select : int Number of individuals to select fitness : ndarray Fitness values (higher is better after negation) Returns ------- ndarray Selected indices """ pop_size = len(fitness) selected = [] for _ in range(n_select): # Randomly select tournament_size individuals candidates = np.random.choice(pop_size, size=min(tournament_size, pop_size), replace=False) # Select the best one winner = candidates[np.argmax(fitness[candidates])] selected.append(winner) return np.array(selected) def _frrmab(self, FRR, SW, C): """ Fitness-Rate-Rank-based Multi-Armed Bandit operator selection. Parameters ---------- FRR : ndarray Credit value of each operator (4 operators) SW : ndarray Sliding window [operator_indices; rewards] C : float Scaling factor Returns ------- int Selected operator index (0-3) """ # If any operator has zero credit or hasn't been used if np.any(FRR == 0) or np.any(SW[0, :] == 0): # Random selection return np.random.randint(0, 4) else: # Count how many times each operator has been used n = np.zeros(4) for op_idx in range(4): n[op_idx] = np.sum(SW[0, :] == op_idx) # UCB (Upper Confidence Bound) selection ucb = FRR + C * np.sqrt(2 * np.log(np.sum(n)) / (n + 1e-10)) return np.argmax(ucb) def _credit_assignment(self, SW, D): """ Credit assignment for operators based on their performance. Parameters ---------- SW : ndarray Sliding window [operator_indices; rewards] D : float Decaying factor Returns ------- ndarray Credit values (FRR) for each operator """ K = 4 # Number of operators Reward = np.zeros(K) # Calculate total reward for each operator for i in range(K): Reward[i] = np.sum(SW[1, SW[0, :] == i]) # Rank operators by reward (descending) Rank = np.argsort(-Reward) # Sort descending Rank_inverse = np.zeros(K, dtype=int) Rank_inverse[Rank] = np.arange(K) # Apply decaying factor based on rank Decay = (D ** Rank_inverse) * Reward # Normalize to get FRR total = np.sum(Decay) if total > 0: FRR = Decay / total else: FRR = np.ones(K) / K return FRR def _four_de(self, op, x, x1, x2, x3, x4, x5): """ Four different DE operators with polynomial mutation. Parameters ---------- op : int Operator index (0-3) x : ndarray Current solution x1, x2, x3, x4, x5 : ndarray Parent solutions Returns ------- ndarray Offspring solution """ # Parameters CR = 1.0 F = 0.5 proM = 1.0 disM = 20 K = 0.5 D = len(x) # Differential evolution if op == 0: # DE/rand/1 v = x + F * (x1 - x2) elif op == 1: # DE/rand/2 v = x + F * (x1 - x2) + F * (x3 - x4) elif op == 2: # DE/current-to-rand/2 v = x + K * (x - x1) + F * (x2 - x3) + F * (x4 - x5) else: # op == 3 # DE/current-to-rand/1 v = x + K * (x - x1) + F * (x2 - x3) # Crossover offspring = x.copy() CR_adjust = CR + (1 if op > 1 else 0) # Higher CR for current-to-rand operators site = np.random.rand(D) < CR_adjust offspring[site] = v[site] # Boundary handling (clip to [0, 1] since we're in normalized space) offspring = np.clip(offspring, 0, 1) # Polynomial mutation site = np.random.rand(D) < proM / D mu = np.random.rand(D) # Lower half temp = site & (mu <= 0.5) delta = (2 * mu + (1 - 2 * mu) * (1 - offspring) ** (disM + 1)) ** (1 / (disM + 1)) - 1 offspring[temp] = offspring[temp] + delta[temp] # Upper half temp = site & (mu > 0.5) delta = 1 - (2 * (1 - mu) + 2 * (mu - 0.5) * (1 - (1 - offspring)) ** (disM + 1)) ** (1 / (disM + 1)) offspring[temp] = offspring[temp] + delta[temp] # Final boundary handling offspring = np.clip(offspring, 0, 1) return offspring