"""
Transfer Task-averaged Natural Gradient - Separable NES (TNG-SNES)
This module implements TNG-SNES for many-task single-objective optimization
using separable Natural Evolution Strategy with task-averaged gradient transfer.
References
----------
[1] Li, Yanchi, et al. "Transfer Task-averaged Natural Gradient for Efficient
Many-task Optimization." IEEE Transactions on Evolutionary Computation,
29(5): 1952-1965, 2025.
Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class TNG_SNES:
"""
Transfer Task-averaged Natural Gradient for Many-Task Optimization (Separable NES).
Uses separable NES with task-averaged natural gradient for knowledge transfer:
- Each task maintains a Gaussian distribution N(x, diag(S^2))
- Natural gradients are computed from fitness-ranked utility weights
- Task-averaged gradient is transferred with adaptive utilization rate (rho)
- Adaptive transfer control adjusts rho and alpha via virtual parameter comparison
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'equal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, sigma0=0.3, rho0=0.1,
alpha0=0.7, adj_gap=100, save_data=True, save_path='./Data',
name='TNG-SNES', disable_tqdm=True):
"""
Initialize TNG-SNES algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
sigma0 : float, optional
Initial standard deviation for all dimensions (default: 0.3)
rho0 : float, optional
Initial utilization factor for gradient transfer (default: 0.1)
alpha0 : float, optional
Initial transfer rate / probability (default: 0.7)
adj_gap : int, optional
Generation interval for adaptive transfer control (default: 100)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'TNG-SNES')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.sigma0 = sigma0
self.rho0 = rho0
self.alpha0 = alpha0
self.adj_gap = adj_gap
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the TNG-SNES algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n = self.n
maxD = max(dims)
max_nfes_per_task = par_list(self.max_nfes, nt)
total_max_nfes = sum(max_nfes_per_task)
# --- NES utility function (fitness-shaped weights) ---
shape = np.maximum(0.0, np.log(n / 2 + 1.0) - np.log(np.arange(1, n + 1)))
shape = shape / np.sum(shape) - 1.0 / n
# --- Initialize distribution parameters ---
x = np.zeros((maxD, nt)) # distribution mean per task
S = np.zeros((maxD, nt)) # distribution std per task
Gx = np.ones((maxD, nt)) # natural gradient of x
GS = np.ones((maxD, nt)) # natural gradient of S
etax = np.ones(nt) # learning rate for x
etaS = np.zeros(nt) # learning rate for S
for t in range(nt):
etaS[t] = (3 + np.log(dims[t])) / (5 * np.sqrt(dims[t]))
x[:, t] = np.mean(np.random.rand(maxD, n), axis=1)
S[:, t] = self.sigma0
vx = x.copy() # virtual x for adaptive transfer control
vS = S.copy() # virtual S for adaptive transfer control
rho = np.full(nt, self.rho0) # utilization factor per task
alpha = np.full(nt, self.alpha0) # transfer rate per task
# --- History tracking ---
nfes_per_task = [0] * nt
total_nfes = 0
all_decs = [[] for _ in range(nt)]
all_objs = [[] for _ in range(nt)]
all_cons = [[] for _ in range(nt)]
# Current population storage (for history recording)
cur_decs = [None] * nt
cur_objs = [None] * nt
cur_cons = [None] * nt
pbar = tqdm(total=total_max_nfes, initial=0, desc=f"{self.name}",
disable=self.disable_tqdm)
gen = 1 # 1-indexed to match MATLAB's Gen counter
while total_nfes < total_max_nfes:
# === First loop: sampling, evaluation, gradient computation ===
for t in range(nt):
if total_nfes >= total_max_nfes:
break
# Sampling: Z ~ N(0,1), X = x + S * Z
Z = np.random.randn(maxD, n)
X = x[:, t:t + 1] + S[:, t:t + 1] * Z # (maxD, n)
# Boundary handling: compute boundary CV from unclipped samples
X_clipped = np.clip(X, 0, 1)
bound_cvs = np.sum((X - X_clipped) ** 2, axis=0) # (n,)
# Evaluate clipped samples (trimmed to task dimension)
eval_decs = X_clipped[:dims[t]].T # (n, dims[t])
task_objs, task_cons = evaluation_single(problem, eval_decs, t)
nfes_per_task[t] += n
total_nfes += n
pbar.update(n)
# Constraint violation
if task_cons.shape[1] > 0:
cv = np.sum(np.maximum(0, task_cons), axis=1)
else:
cv = np.zeros(n)
# Add boundary penalty: boundary violators rank worse than
# the worst constraint-violating but in-bounds individual
max_cv = np.max(cv)
bound_cvs[bound_cvs > 0] += max_cv
total_cv = cv + bound_cvs
# Sort by total CV then objective (ascending)
rank_t = np.lexsort((task_objs[:, 0], total_cv))
# Assign utility weights based on rank
weights = np.zeros(n)
weights[rank_t] = shape
# Store current population for history
cur_decs[t] = eval_decs
cur_objs[t] = task_objs
cur_cons[t] = task_cons
# --- Adaptive transfer control (every adj_gap generations) ---
if gen % self.adj_gap == 0 and total_nfes + n <= total_max_nfes:
# Generate virtual samples using same Z but virtual params
vX = vx[:, t:t + 1] + vS[:, t:t + 1] * Z
vX_clipped = np.clip(vX, 0, 1)
veval_decs = vX_clipped[:dims[t]].T
vobjs, vcons = evaluation_single(problem, veval_decs, t)
nfes_per_task[t] += n
total_nfes += n
pbar.update(n)
# Compare mean fitness (raw CV, not boundary-penalized)
if vcons.shape[1] > 0:
vcv = np.sum(np.maximum(0, vcons), axis=1)
else:
vcv = np.zeros(n)
Fit = 1e8 * np.mean(cv) + np.mean(task_objs[:, 0])
vFit = 1e8 * np.mean(vcv) + np.mean(vobjs[:, 0])
if vFit > Fit:
# Virtual (transfer) worse → reduce transfer
rho[t] *= 2.0 / 3.0
alpha[t] *= 2.0 / 3.0
else:
# Virtual better → increase transfer
rho[t] = min(1.0, 1.5 * rho[t])
alpha[t] = min(1.0, 1.5 * alpha[t])
# Compute natural gradients for this task
Gx[:, t] = Z @ weights # (maxD,)
GS[:, t] = (Z ** 2 - 1) @ weights # (maxD,)
# === Task-averaged natural gradient ===
TaGx = np.mean(Gx, axis=1) # (maxD,)
TaGS = np.mean(GS, axis=1) # (maxD,)
# === Second loop: transfer and distribution update ===
for t in range(nt):
tGx = Gx[:, t].copy()
tGS = GS[:, t].copy()
# Compute virtual parameters (prep for next adj_gap check)
if (gen + 1) % self.adj_gap == 0:
vtGx = tGx + 1.5 * rho[t] * TaGx
vtGS = tGS + 1.5 * rho[t] * TaGS
vdx = etax[t] * S[:, t] * vtGx
vdS = 0.5 * etaS[t] * vtGS
vx[:, t] = x[:, t] + vdx
vS[:, t] = S[:, t] * np.exp(vdS)
# Transfer task-averaged natural gradient
if np.random.rand() < alpha[t] or (gen + 1) % self.adj_gap == 0:
tGx += rho[t] * TaGx
tGS += rho[t] * TaGS
# Update distribution parameters
dx = etax[t] * S[:, t] * tGx
dS = 0.5 * etaS[t] * tGS
x[:, t] += dx
S[:, t] *= np.exp(dS)
# Record history
if cur_decs[0] is not None:
for t in range(nt):
all_decs[t].append(cur_decs[t].copy())
all_objs[t].append(cur_objs[t].copy())
all_cons[t].append(cur_cons[t].copy())
gen += 1
pbar.close()
runtime = time.time() - start_time
# Trim excess evaluations
all_decs, all_objs, nfes_per_task, all_cons = trim_excess_evaluations(
all_decs, all_objs, nt, max_nfes_per_task, nfes_per_task, all_cons)
# Build and save results
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=max_nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results