Source code for ddmtolab.Algorithms.STSO.KLPSO

"""
Knowledge Learning PSO (KLPSO)

This module implements Knowledge Learning PSO for single-objective optimization problems.
A neural network learns successful movement patterns from improved particles.
With probability LR, particles move using the learned knowledge instead of
standard PSO velocity update.

References
----------
    [1] Jiang, Yi, et al. "Knowledge Learning for Evolutionary Computation." IEEE Transactions on Evolutionary Computation, 2023. DOI: 10.1109/TEVC.2023.3278132

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.21
Version: 1.0
"""
import time
import numpy as np
import torch
import torch.nn as nn
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *


class _KLNet(nn.Module):
    """
    Feedforward network for learning position changes.

    Architecture: dim -> 16 (sigmoid) -> 16 (sigmoid) -> dim (linear)
    Matches MATLAB: newff(..., [16 16 D], {'logsig' 'logsig' 'purelin'})
    """

    def __init__(self, dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, 16),
            nn.Sigmoid(),
            nn.Linear(16, 16),
            nn.Sigmoid(),
            nn.Linear(16, dim),
        )

    def forward(self, x):
        return self.net(x)


[docs] class KLPSO: """ Knowledge Learning PSO algorithm for single-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'False', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, lr=0.2, ep=10, min_w=0.4, max_w=0.9, c1=0.2, c2=0.2, save_data=True, save_path='./Data', name='KLPSO', disable_tqdm=True): """ Initialize KLPSO algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 10000) lr : float, optional Learning rate - probability of using neural net prediction (default: 0.2) ep : int, optional Number of training epochs per generation (default: 10) min_w : float, optional Minimum inertia weight (default: 0.4) max_w : float, optional Maximum inertia weight (default: 0.9) c1 : float, optional Cognitive coefficient (default: 0.2) c2 : float, optional Social coefficient (default: 0.2) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'KLPSO') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n = n if n is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 10000 self.lr = lr self.ep = ep self.min_w = min_w self.max_w = max_w self.c1 = c1 self.c2 = c2 self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the KLPSO algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem dims = problem.dims nt = problem.n_tasks n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Initialize population in [0,1] space and evaluate for each task decs = initialization(problem, n_per_task) objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() all_decs, all_objs, all_cons = init_history(decs, objs, cons) # Initialize particle velocities to zero vel = [np.zeros_like(d) for d in decs] # Initialize personal best positions, objectives, and constraints pbest_decs = [d.copy() for d in decs] pbest_objs = [o.copy() for o in objs] pbest_cons = [c.copy() for c in cons] # Initialize global best for each task gbest_decs = [] gbest_objs = [] gbest_cons = [] for i in range(nt): cvs = np.sum(np.maximum(0, cons[i]), axis=1) sort_indices = np.lexsort((objs[i].flatten(), cvs)) best_idx = sort_indices[0] gbest_decs.append(decs[i][best_idx:best_idx + 1, :]) gbest_objs.append(objs[i][best_idx:best_idx + 1, :]) gbest_cons.append(cons[i][best_idx:best_idx + 1, :]) # Neural networks for knowledge learning (one per task) nets = [None] * nt optimizers = [None] * nt trained = [False] * nt total_nfes = sum(max_nfes_per_task) pbar = tqdm(total=total_nfes, initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < total_nfes: # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Linearly decrease inertia weight from max_w to min_w w = self.max_w - (self.max_w - self.min_w) * nfes_per_task[i] / max_nfes_per_task[i] # Save old state for training data collection old_decs = decs[i].copy() old_objs = objs[i].copy() old_cons = cons[i].copy() # Determine which particles use neural net vs standard PSO if trained[i]: use_net = np.random.rand(n_per_task[i]) < self.lr else: use_net = np.zeros(n_per_task[i], dtype=bool) # Standard PSO update for non-net particles pso_idx = np.where(~use_net)[0] if len(pso_idx) > 0: r1 = np.random.rand(len(pso_idx), 1) r2 = np.random.rand(len(pso_idx), 1) vel[i][pso_idx] = (w * vel[i][pso_idx] + self.c1 * r1 * (pbest_decs[i][pso_idx] - decs[i][pso_idx]) + self.c2 * r2 * (gbest_decs[i] - decs[i][pso_idx])) decs[i][pso_idx] = decs[i][pso_idx] + vel[i][pso_idx] # Neural net update for selected particles net_idx = np.where(use_net)[0] if len(net_idx) > 0: with torch.no_grad(): x_t = torch.tensor(decs[i][net_idx], dtype=torch.float32) y_pred = nets[i](x_t).numpy() r_scale = 2 * np.random.rand(len(net_idx), 1) decs[i][net_idx] = decs[i][net_idx] + r_scale * y_pred # Clip to [0, 1] boundary decs[i] = np.clip(decs[i], 0, 1) # Evaluation objs[i], cons[i] = evaluation_single(problem, decs[i], i) # PBest update current_cvs = np.sum(np.maximum(0, cons[i]), axis=1) pbest_cvs = np.sum(np.maximum(0, pbest_cons[i]), axis=1) improved_pb = (current_cvs < pbest_cvs) | \ ((current_cvs == pbest_cvs) & (objs[i].flatten() < pbest_objs[i].flatten())) pbest_decs[i][improved_pb] = decs[i][improved_pb] pbest_objs[i][improved_pb] = objs[i][improved_pb] pbest_cons[i][improved_pb] = cons[i][improved_pb] # GBest update pbest_cvs = np.sum(np.maximum(0, pbest_cons[i]), axis=1) sort_indices = np.lexsort((pbest_objs[i].flatten(), pbest_cvs)) best_idx = sort_indices[0] gbest_cv = np.sum(np.maximum(0, gbest_cons[i])) best_cv = pbest_cvs[best_idx] if (best_cv < gbest_cv) or \ (best_cv == gbest_cv and pbest_objs[i][best_idx] < gbest_objs[i][0]): gbest_decs[i] = pbest_decs[i][best_idx:best_idx + 1, :] gbest_objs[i] = pbest_objs[i][best_idx:best_idx + 1, :] gbest_cons[i] = pbest_cons[i][best_idx:best_idx + 1, :] # Identify improved particles for training data collection # Matches MATLAB Selection_Tournament with Ep=0: # both feasible AND new has lower obj, OR # both infeasible AND new has lower CV old_cvs = np.sum(np.maximum(0, old_cons), axis=1) new_cvs = np.sum(np.maximum(0, cons[i]), axis=1) both_feasible = (old_cvs <= 0) & (new_cvs <= 0) both_infeasible = (old_cvs > 0) & (new_cvs > 0) replace = (both_feasible & (objs[i].flatten() < old_objs.flatten())) | \ (both_infeasible & (new_cvs < old_cvs)) if np.any(replace): in_data = old_decs[replace] out_data = decs[i][replace] - old_decs[replace] # Initialize network on first improvement if not trained[i]: nets[i] = _KLNet(dims[i]) optimizers[i] = torch.optim.SGD(nets[i].parameters(), lr=0.1, momentum=0.9) trained[i] = True # Train network on this generation's improved particles in_tensor = torch.tensor(in_data, dtype=torch.float32) out_tensor = torch.tensor(out_data, dtype=torch.float32) loss_fn = nn.MSELoss() nets[i].train() for _ in range(self.ep): pred = nets[i](in_tensor) loss = loss_fn(pred, out_tensor) optimizers[i].zero_grad() loss.backward() optimizers[i].step() nfes_per_task[i] += n_per_task[i] pbar.update(n_per_task[i]) # Append current population to history append_history(all_decs[i], decs[i], all_objs[i], objs[i], all_cons[i], cons[i]) pbar.close() runtime = time.time() - start_time # Save results results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results