Source code for ddmtolab.Algorithms.STMO.DISK

"""
Distribution-Informed Surrogate-assisted Kriging (DISK)

This module implements DISK for computationally expensive multi-objective optimization.
It uses Distribution-Informed Probabilistic Dominance (DIPD) that combines prediction
uncertainty from Kriging with the probability distribution learned from Pareto-optimal
solutions. It features adaptive local search guided by weight vector identification
to fill gaps in the Pareto front.

References
----------
    [1] Z. Song, H. Wang, and H. Xu. DISK: A Kriging-Assisted Multi-Objective Optimization Algorithm with Distribution-Informed Probabilistic Dominance. IEEE Transactions on Evolutionary Computation, 2024.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from scipy.spatial.distance import cdist
from scipy.stats import norm, multivariate_normal
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
import warnings

warnings.filterwarnings("ignore")


[docs] class DISK: """ Distribution-Informed Surrogate-assisted Kriging for expensive multi-objective optimization with adaptive local search. """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=100, wmax=60, alpha=5, save_data=True, save_path='./Data', name='DISK', disable_tqdm=True): """ Initialize DISK algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Population/archive size per task (default: 100) wmax : int, optional Surrogate evolution generations (default: 60) alpha : int, optional Number of candidates per iteration (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'DISK') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.wmax = wmax self.alpha = alpha self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the DISK algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_per_task = par_list(self.n, nt) # Initialize with LHS decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # Initialize working populations pop_indices = [] for i in range(nt): N = n_per_task[i] if decs[i].shape[0] <= N: pop_indices.append(np.arange(decs[i].shape[0])) else: idx = _env_selection_real(objs[i], N) pop_indices.append(np.where(idx)[0]) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] D = dims[i] N = n_per_task[i] # ===== Learn distribution from Pareto front ===== front_no, _ = nd_sort(objs[i], objs[i].shape[0]) pf_decs = decs[i][front_no == 1] if pf_decs.shape[0] <= 1: pf_decs2 = decs[i][front_no == 2] if np.any(front_no == 2) else np.empty((0, D)) pf_decs = np.vstack([pf_decs, pf_decs2]) if pf_decs2.shape[0] > 0 else pf_decs if pf_decs.shape[0] <= 1: dist_mu = np.mean(decs[i], axis=0) dist_K = np.cov(decs[i].T) + 1e-6 * np.eye(D) else: dist_mu = np.mean(pf_decs, axis=0) dist_K = np.cov(pf_decs.T) if dist_K.ndim == 0: dist_K = np.array([[dist_K]]) dist_K += 1e-6 * np.eye(D) # ===== Build GP models ===== try: obj_models = [] for j in range(M): model = gp_build(decs[i], objs[i][:, j:j + 1], data_type) obj_models.append(model) except Exception: continue # ===== Evolutionary search on surrogates ===== A1_decs = decs[i][pop_indices[i]] OP_decs, OP_objs, OP_mse = _surrogate_evolution( A1_decs, obj_models, M, N, self.wmax, data_type, dist_mu, dist_K ) # ===== Candidate selection ===== n_budget = min(self.alpha, max_nfes_per_task[i] - nfes_per_task[i]) if n_budget <= 0: break cand_decs, cand_objs = _new_select( OP_decs, OP_objs, OP_mse, decs[i], objs[i], n_budget, problem, i, dist_mu, dist_K ) if cand_decs is None or cand_decs.shape[0] == 0: continue # ===== Check improvement for local search ===== flag = _judge_ls(cand_objs, objs[i]) # Update database decs[i] = np.vstack([decs[i], cand_decs]) objs[i] = np.vstack([objs[i], cand_objs]) nfes_per_task[i] += cand_decs.shape[0] pbar.update(cand_decs.shape[0]) # ===== Adaptive local search ===== if flag == 1 and nfes_per_task[i] < max_nfes_per_task[i]: # Re-train models try: obj_models = [] for j in range(M): model = gp_build(decs[i], objs[i][:, j:j + 1], data_type) obj_models.append(model) except Exception: pass else: W, ideal = _identify_w(objs[i], N, M) ls_dec, ls_obj = _local_search( OP_decs, OP_objs, OP_mse, W, ideal, obj_models, M, N, self.wmax, data_type, decs[i], problem, i ) if ls_dec is not None: decs[i] = np.vstack([decs[i], ls_dec]) objs[i] = np.vstack([objs[i], ls_obj]) nfes_per_task[i] += ls_dec.shape[0] pbar.update(ls_dec.shape[0]) # ===== Update working population ===== idx = _env_selection_real(objs[i], N) pop_indices[i] = np.where(idx)[0] pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=self.alpha) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # Surrogate-based Evolutionary Search # ============================================================================= def _surrogate_evolution(pop_decs, obj_models, M, N, wmax, data_type, dist_mu, dist_K): """ Evolutionary search guided by GP surrogates with DIPD selection. Parameters ---------- pop_decs : np.ndarray Initial population decisions obj_models : list GP models for each objective M : int Number of objectives N : int Population size wmax : int Number of generations data_type : torch.dtype Data type for GP dist_mu, dist_K : np.ndarray Distribution parameters for DIPD Returns ------- pop_decs, pop_objs, pop_mse : np.ndarray Evolved population """ pop_objs, pop_mse = _gp_predict_all(pop_decs, obj_models, M, data_type) for w in range(wmax): off_decs = ga_generation(pop_decs, muc=20, mum=20) pop_decs = np.vstack([pop_decs, off_decs]) pop_objs, pop_mse = _gp_predict_all(pop_decs, obj_models, M, data_type) # DIPD-based environmental selection selected = _s_env_selection(pop_decs, pop_objs, pop_mse, N, dist_mu, dist_K) pop_decs = pop_decs[selected] pop_objs = pop_objs[selected] pop_mse = pop_mse[selected] return pop_decs, pop_objs, pop_mse def _gp_predict_all(pop_decs, obj_models, M, data_type): """Predict objectives and MSE for all solutions.""" N = pop_decs.shape[0] pop_objs = np.zeros((N, M)) pop_mse = np.zeros((N, M)) for j in range(M): pred, std = gp_predict(obj_models[j], pop_decs, data_type) pop_objs[:, j] = pred.flatten() pop_mse[:, j] = np.abs(std.flatten()) ** 2 pop_objs = np.real(pop_objs) pop_mse = np.abs(np.real(pop_mse)) return pop_objs, pop_mse # ============================================================================= # Distribution-Informed Probabilistic Dominance Sorting # ============================================================================= def _ndsort_dipd(pop_decs, pop_objs, obj_mse, n_sort, dist_mu, dist_K): """ Non-dominated sorting with Distribution-Informed Probabilistic Dominance. Combines prediction uncertainty with distribution likelihood from Pareto front. Parameters ---------- pop_decs : np.ndarray Decision variables, shape (N, D) pop_objs : np.ndarray Predicted objectives, shape (N, M) obj_mse : np.ndarray Prediction MSE, shape (N, M) n_sort : int Number of solutions to sort dist_mu : np.ndarray Distribution mean, shape (D,) dist_K : np.ndarray Distribution covariance, shape (D, D) Returns ------- front_no : np.ndarray Front assignments max_fno : int Last front number """ N, M = pop_objs.shape # Compute probability density under learned distribution try: rv = multivariate_normal(mean=dist_mu, cov=dist_K, allow_singular=True) Pro = rv.pdf(pop_decs) except Exception: Pro = np.ones(N) Pro = np.maximum(Pro, 1e-300) # Pairwise probabilistic dominance # mean_diff[i,j,k] = obj_i_k - obj_j_k mean_diff = pop_objs[:, np.newaxis, :] - pop_objs[np.newaxis, :, :] sigma_sq = obj_mse[:, np.newaxis, :] + obj_mse[np.newaxis, :, :] sigma = np.sqrt(np.maximum(sigma_sq, 1e-20)) # P(obj_i <= obj_j) for each objective z = -mean_diff / sigma x_PD = norm.cdf(z) # P(i is better than j) y_PD = 1.0 - x_PD # P(j is better than i) # Weight by distribution probability — MATLAB: # x_PD = -x_PD .* Pro[i], y_PD = -y_PD .* Pro[j] # Dominance: all(-x*Pro[i] <= -y*Pro[j]) ↔ all(x*Pro[i] >= y*Pro[j]) x_PD = x_PD * Pro[:, np.newaxis, np.newaxis] # weighted by Pro[i] y_PD = y_PD * Pro[np.newaxis, :, np.newaxis] # weighted by Pro[j] # Dominance: i dominates j if all(x_PD[i,j,:] >= y_PD[i,j,:]) and not all equal dominates = np.all(x_PD >= y_PD, axis=2) & ~np.all( np.abs(x_PD - y_PD) < 1e-10, axis=2) np.fill_diagonal(dominates, False) # Non-dominated sorting front_no = np.full(N, np.inf) max_fno = 0 remaining = np.ones(N, dtype=bool) n_assigned = 0 while n_assigned < min(n_sort, N): max_fno += 1 current = np.where(remaining)[0] if len(current) == 0: break sub_dom = dominates[np.ix_(current, current)] dom_count = np.sum(sub_dom, axis=0) min_count = np.min(dom_count) frontal = current[dom_count == min_count] front_no[frontal] = max_fno remaining[frontal] = False n_assigned += len(frontal) return front_no, max_fno # ============================================================================= # Environmental Selection # ============================================================================= def _s_env_selection(pop_decs, pop_objs, pop_mse, N, dist_mu, dist_K): """Surrogate environmental selection using DIPD + angle-based diversity.""" n_total = pop_objs.shape[0] front_no, max_fno = _ndsort_dipd(pop_decs, pop_objs, pop_mse, N, dist_mu, dist_K) # Normalize objectives zmin = np.min(pop_objs, axis=0) zmax = np.max(pop_objs, axis=0) obj_range = np.maximum(zmax - zmin, 1e-9) norm_objs = (pop_objs - zmin) / obj_range selected = np.where(front_no < max_fno)[0] last_front = np.where(front_no == max_fno)[0] n_remaining = N - len(selected) if n_remaining <= 0: return selected[:N] if len(last_front) <= n_remaining: selected = np.concatenate([selected, last_front]) elif max_fno == 1: chosen = _truncation_angle(norm_objs[last_front], n_remaining) selected = np.concatenate([selected, last_front[chosen]]) else: chosen = _dist_selection_angle(norm_objs[selected], norm_objs[last_front], n_remaining) selected = np.concatenate([selected, last_front[chosen]]) return selected def _env_selection_real(pop_objs, N): """Real evaluation environmental selection (standard ND + angle diversity).""" n_total = pop_objs.shape[0] if n_total <= N: return np.ones(n_total, dtype=bool) zmin = np.min(pop_objs, axis=0) zmax = np.max(pop_objs, axis=0) obj_range = np.maximum(zmax - zmin, 1e-9) norm_objs = (pop_objs - zmin) / obj_range front_no, max_fno = nd_sort(pop_objs, N) next_mask = np.zeros(n_total, dtype=bool) next_mask[front_no < max_fno] = True last_front = np.where(front_no == max_fno)[0] n_remaining = N - np.sum(next_mask) if n_remaining <= 0: return next_mask if len(last_front) <= n_remaining: next_mask[last_front] = True elif max_fno == 1: chosen = _truncation_angle(norm_objs[last_front], n_remaining) next_mask[last_front[chosen]] = True else: sel_objs = norm_objs[next_mask] chosen = _dist_selection_angle(sel_objs, norm_objs[last_front], n_remaining) next_mask[last_front[chosen]] = True return next_mask def _angle_distance(A, B): """Compute pairwise angle-based distance: arccos(1 - cosine_distance).""" # Cosine similarity norm_A = np.linalg.norm(A, axis=1, keepdims=True) norm_B = np.linalg.norm(B, axis=1, keepdims=True) norm_A = np.maximum(norm_A, 1e-10) norm_B = np.maximum(norm_B, 1e-10) cos_sim = (A @ B.T) / (norm_A @ norm_B.T) cos_sim = np.clip(cos_sim, -1, 1) return np.arccos(cos_sim) def _truncation_angle(pop_objs, K): """Select K solutions by removing most crowded (angle-based distance).""" N = pop_objs.shape[0] if N <= K: return np.arange(N) dist = _angle_distance(pop_objs, pop_objs) np.fill_diagonal(dist, np.inf) deleted = np.zeros(N, dtype=bool) while np.sum(~deleted) > K: remaining = np.where(~deleted)[0] sub_dist = dist[np.ix_(remaining, remaining)] sorted_d = np.sort(sub_dist, axis=1) # Sort by nearest-neighbor distance (least diverse first) order = np.lexsort(sorted_d.T) deleted[remaining[order[0]]] = True return np.where(~deleted)[0] def _dist_selection_angle(selected_objs, candidate_objs, n_select): """Select n_select candidates maximizing min angle-distance to selected.""" N1 = selected_objs.shape[0] N2 = candidate_objs.shape[0] if N2 <= n_select: return np.arange(N2) all_objs = np.vstack([selected_objs, candidate_objs]) dist = _angle_distance(all_objs, all_objs) np.fill_diagonal(dist, np.inf) chosen_set = list(range(N1)) remaining = list(range(N1, N1 + N2)) chosen = [] for _ in range(n_select): if not remaining: break best_idx = None best_min_d = -1.0 for idx in remaining: min_d = np.min([dist[idx, c] for c in chosen_set]) if min_d > best_min_d: best_min_d = min_d best_idx = idx chosen_set.append(best_idx) remaining.remove(best_idx) chosen.append(best_idx - N1) return chosen # ============================================================================= # Candidate Selection (NewSelect) # ============================================================================= def _new_select(OP_decs, OP_objs, OP_mse, db_decs, db_objs, alpha, problem, task_idx, dist_mu, dist_K): """ Select alpha candidates from surrogate-evolved population. Uses DIPD filtering then angle-based diversity selection. Evaluates candidates iteratively, updating Pareto front. Returns ------- cand_decs, cand_objs : np.ndarray or None Selected and evaluated candidates """ # Find novel solutions if db_decs.shape[0] > 0: dist_to_db = cdist(OP_decs, db_decs) min_dist = np.min(dist_to_db, axis=1) novel_mask = min_dist > 1e-50 else: novel_mask = np.ones(OP_decs.shape[0], dtype=bool) novel_idx = np.where(novel_mask)[0] if len(novel_idx) == 0: return None, None if len(novel_idx) <= alpha: pop_new = OP_decs[novel_idx] new_objs, _ = evaluation_single(problem, pop_new, task_idx) return pop_new, new_objs # Extract and normalize pop_decs = OP_decs[novel_idx] pop_objs = OP_objs[novel_idx] pop_mse = OP_mse[novel_idx] # Get database Pareto front n_db = db_objs.shape[0] front_no_db, _ = nd_sort(db_objs, n_db) pf_objs = db_objs[front_no_db == 1] pf_objs = np.unique(pf_objs, axis=0) if pf_objs.shape[0] > 1 else pf_objs # Normalize all_objs_combined = np.vstack([pf_objs, pop_objs]) zmin = np.min(all_objs_combined, axis=0) zmax = np.max(all_objs_combined, axis=0) obj_range = np.maximum(zmax - zmin, 1e-9) norm_pop_objs = (pop_objs - zmin) / obj_range norm_pop_mse = pop_mse / (obj_range ** 2) norm_pf_objs = (pf_objs - zmin) / obj_range # DIPD sorting - first front only (MATLAB: nSort=1) front_no, _ = _ndsort_dipd(pop_decs, norm_pop_objs, norm_pop_mse, 1, dist_mu, dist_K) first_front = np.where(front_no == 1)[0] if len(first_front) == 0: first_front = np.arange(min(alpha, pop_decs.shape[0])) if len(first_front) <= alpha: sel_decs = pop_decs[first_front] sel_objs, _ = evaluation_single(problem, sel_decs, task_idx) return sel_decs, sel_objs # Diversity-based iterative selection cand_decs_list = [] cand_objs_list = [] selectable = np.ones(len(first_front), dtype=bool) current_pf = norm_pf_objs.copy() for _ in range(alpha): avail = np.where(selectable)[0] if len(avail) == 0: break avail_objs = norm_pop_objs[first_front[avail]] # Angle distance to current Pareto front if current_pf.shape[0] > 0: angle_dist = _angle_distance(avail_objs, current_pf) min_angles = np.min(angle_dist, axis=1) else: min_angles = np.ones(len(avail)) best = np.argmax(min_angles) best_global = first_front[avail[best]] # Evaluate new_dec = pop_decs[best_global:best_global + 1] new_obj, _ = evaluation_single(problem, new_dec, task_idx) cand_decs_list.append(new_dec[0]) cand_objs_list.append(new_obj[0]) # Update Pareto front with new solution all_real_objs = np.vstack([db_objs] + [np.array(cand_objs_list)]) fno, _ = nd_sort(all_real_objs, all_real_objs.shape[0]) new_pf = all_real_objs[fno == 1] new_pf = np.unique(new_pf, axis=0) if new_pf.shape[0] > 1 else new_pf current_pf = (new_pf - zmin) / obj_range selectable[avail[best]] = False if len(cand_decs_list) == 0: return None, None return np.array(cand_decs_list), np.array(cand_objs_list) # ============================================================================= # Weight Vector Identification # ============================================================================= def _identify_w(db_objs, N, M): """ Identify weight vector pointing to gap in Pareto front. Parameters ---------- db_objs : np.ndarray All evaluated objectives N : int Population size (for generating reference vectors) M : int Number of objectives Returns ------- W : np.ndarray Weight vector, shape (M,) ideal : np.ndarray Adjusted ideal point, shape (M,) """ # Generate candidate weight vectors V, _ = uniform_point(max(10 * N, 100), M) # Extract Pareto front n_all = db_objs.shape[0] front_no, _ = nd_sort(db_objs, n_all) pf_objs = db_objs[front_no == 1] if pf_objs.shape[0] == 0: return np.ones(M) / M, np.zeros(M) # Compute adjusted ideal point nadir = np.max(pf_objs, axis=0) ideal_raw = np.min(pf_objs, axis=0) ideal = ideal_raw - (nadir - ideal_raw) / 10 - 0.1 # Shift objectives shifted_pf = pf_objs - ideal shifted_pf = np.maximum(shifted_pf, 1e-10) # Compute angles between weight vectors and PF solutions angle_dist = _angle_distance(V, shifted_pf) # Find vector with max min-angle to any PF solution min_angles = np.min(angle_dist, axis=1) best = np.argmax(min_angles) W = V[best] return W, ideal # ============================================================================= # Local Search # ============================================================================= def _local_search(OP_decs, OP_objs, OP_mse, W, ideal, obj_models, M, N, wmax, data_type, db_decs, problem, task_idx): """ Focused local search in direction W using weighted Chebyshev + uncertainty. fitness = max(|obj - ideal| * W) - 2 * mean(std) Returns ------- ls_dec, ls_obj : np.ndarray or None Best solution found and its true objective """ pop_decs = OP_decs.copy() pop_objs = OP_objs.copy() pop_mse = OP_mse.copy() for w in range(wmax): # 4 offspring operators off1 = ga_generation(pop_decs, muc=20, mum=20) off2 = _de_current_rand_1(pop_decs) off3 = _de_rand_1(pop_decs) off4 = _de_current_rand_1(pop_decs) pop_decs = np.vstack([pop_decs, off1, off2, off3, off4]) pop_decs = np.unique(np.round(pop_decs, 10), axis=0) pop_decs = np.clip(pop_decs, 0, 1) pop_objs, pop_mse = _gp_predict_all(pop_decs, obj_models, M, data_type) pop_std = np.sqrt(np.maximum(pop_mse, 0)) # Directional fitness: weighted Chebyshev - exploration bonus fitness = np.max(np.abs(pop_objs - ideal) * W, axis=1) - 2 * np.mean(pop_std, axis=1) # Keep top N sorted_idx = np.argsort(fitness) n_keep = min(N, len(sorted_idx)) pop_decs = pop_decs[sorted_idx[:n_keep]] pop_objs = pop_objs[sorted_idx[:n_keep]] pop_mse = pop_mse[sorted_idx[:n_keep]] # Select best pop_std = np.sqrt(np.maximum(pop_mse, 0)) fitness = np.max(np.abs(pop_objs - ideal) * W, axis=1) - 2 * np.mean(pop_std, axis=1) best_idx = np.argmin(fitness) best_dec = pop_decs[best_idx:best_idx + 1] # Check for duplicate if db_decs.shape[0] > 0: min_d = np.min(cdist(best_dec, db_decs)) if min_d <= 1e-50: return None, None best_obj, _ = evaluation_single(problem, best_dec, task_idx) return best_dec, best_obj def _judge_ls(cand_objs, db_objs): """ Judge if local search should be triggered. Returns 1 if candidates improved the Pareto front. """ front_no_c, _ = nd_sort(cand_objs, cand_objs.shape[0]) front_no_db, _ = nd_sort(db_objs, db_objs.shape[0]) pf_c = cand_objs[front_no_c == 1] pf_db = db_objs[front_no_db == 1] # Check if any candidate dominates any archive PF member for c_sol in pf_c: for db_sol in pf_db: if np.all(c_sol <= db_sol) and not np.all(c_sol == db_sol): return 0 # Candidate dominates archive → no improvement needed return 1 # No domination → trigger local search # ============================================================================= # DE Operators # ============================================================================= def _de_current_rand_1(pop_decs): """DE/current-to-rand/1 with polynomial mutation.""" N, D = pop_decs.shape Fm_set = np.array([0.6, 0.8, 1.0]) CRm_set = np.array([0.1, 0.2, 1.0]) F = Fm_set[np.random.randint(0, 3, size=N)].reshape(-1, 1) CR = CRm_set[np.random.randint(0, 3, size=N)].reshape(-1, 1) F_mat = np.tile(F, (1, D)) site = np.random.rand(N, D) < CR p1 = pop_decs[np.random.permutation(N)] p2 = pop_decs[np.random.permutation(N)] p3 = pop_decs[np.random.permutation(N)] # DE/current-to-rand/1: x + F*(r1-x) + F*(r2-r3) mutant = pop_decs + F_mat * (p1 - pop_decs) + F_mat * (p2 - p3) offspring = pop_decs.copy() offspring[site] = mutant[site] offspring = np.clip(offspring, 0, 1) offspring = _poly_mutation(offspring, mum=20) return offspring def _de_rand_1(pop_decs): """DE/rand/1 with polynomial mutation.""" N, D = pop_decs.shape Fm_set = np.array([0.6, 0.8, 1.0]) CRm_set = np.array([0.1, 0.2, 1.0]) F = Fm_set[np.random.randint(0, 3, size=N)].reshape(-1, 1) CR = CRm_set[np.random.randint(0, 3, size=N)].reshape(-1, 1) F_mat = np.tile(F, (1, D)) site = np.random.rand(N, D) < CR p1 = pop_decs[np.random.permutation(N)] p2 = pop_decs[np.random.permutation(N)] p3 = pop_decs[np.random.permutation(N)] # DE/rand/1: r1 + F*(r2-r3) mutant = p1 + F_mat * (p2 - p3) offspring = pop_decs.copy() offspring[site] = mutant[site] offspring = np.clip(offspring, 0, 1) offspring = _poly_mutation(offspring, mum=20) return offspring def _poly_mutation(pop_decs, mum=20): """Polynomial mutation with proM=1, bounds [0,1].""" N, D = pop_decs.shape site = np.random.rand(N, D) < 1.0 / D mu = np.random.rand(N, D) temp_low = site & (mu <= 0.5) temp_high = site & (mu > 0.5) offspring = pop_decs.copy() if np.any(temp_low): x = offspring[temp_low] delta = (2 * mu[temp_low] + (1 - 2 * mu[temp_low]) * (1 - x) ** (mum + 1)) ** (1.0 / (mum + 1)) - 1 offspring[temp_low] = x + delta if np.any(temp_high): x = offspring[temp_high] delta = 1 - (2 * (1 - mu[temp_high]) + 2 * (mu[temp_high] - 0.5) * (1 - (1 - x)) ** (mum + 1)) ** (1.0 / (mum + 1)) offspring[temp_high] = x + delta return np.clip(offspring, 0, 1)