Source code for ddmtolab.Algorithms.STMO.DRLSAEA

"""
Deep Reinforcement Learning-assisted Surrogate-Assisted Evolutionary Algorithm (DRL-SAEA)

This module implements DRL-SAEA for computationally expensive constrained multi-objective
optimization. It uses a Double Deep Q-Network (DDQN) to dynamically select among three
constraint handling strategies for surrogate model management.

References
----------
    [1] S. Shao, Y. Tian, and Y. Zhang. Deep reinforcement learning assisted surrogate model management for expensive constrained multi-objective optimization. Swarm and Evolutionary Computation, 2025, 92: 101817.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict
import warnings

warnings.filterwarnings("ignore")


# ============================================================================
# DDQN Components
# ============================================================================

class QNetwork(nn.Module):
    """Simple feedforward Q-network with tanh hidden layers and linear output."""

    def __init__(self, n_states, n_actions, hidden_layers=(5, 5)):
        super().__init__()
        layers = []
        in_dim = n_states
        for h in hidden_layers:
            layers.append(nn.Linear(in_dim, h))
            layers.append(nn.Tanh())
            in_dim = h
        layers.append(nn.Linear(in_dim, n_actions))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)


class ExperienceReplay:
    """Circular buffer for storing (S, A, R, S') transitions."""

    def __init__(self, n_states, max_size=100):
        self.max_size = max_size
        self.n_states = n_states
        # buffer columns: [S (n_states), A (1), R (1), S' (n_states)]
        self.buffer = np.zeros((max_size, 2 * n_states + 2))
        self.index = 0
        self.overflow = False

    def insert(self, state, action, reward, next_state):
        idx = self.index % self.max_size
        row = np.concatenate([state, [action, reward], next_state])
        self.buffer[idx] = row
        self.index += 1
        if self.index >= self.max_size:
            self.overflow = True

    def get_batch(self, batch_size):
        max_idx = self.max_size if self.overflow else self.index
        if max_idx == 0:
            return None, None, None, None
        indices = np.random.randint(0, max_idx, size=batch_size)
        batch = self.buffer[indices]
        ns = self.n_states
        S = batch[:, :ns]
        A = batch[:, ns].astype(int)
        R = batch[:, ns + 1]
        Sn = batch[:, ns + 2:]
        return S, A, R, Sn

    @property
    def size(self):
        return self.max_size if self.overflow else self.index


class DDQN:
    """Double Deep Q-Network for action selection in DRL-SAEA."""

    def __init__(self, n_states, n_actions, hidden_layers=(5, 5), max_er=100,
                 discount=0.999, lr=1e-3):
        self.n_states = n_states
        self.n_actions = n_actions
        self.discount = discount
        self.memory = ExperienceReplay(n_states, max_er)

        self.agent = QNetwork(n_states, n_actions, hidden_layers)
        self.target = QNetwork(n_states, n_actions, hidden_layers)
        self.copy_weights_agent_to_target()

        self.optimizer = optim.Adam(self.agent.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()

    def copy_weights_agent_to_target(self):
        self.target.load_state_dict(self.agent.state_dict())

    def action(self, state):
        """Select action with biased exploration (favoring argmax Q)."""
        with torch.no_grad():
            state_t = torch.FloatTensor(state).unsqueeze(0)
            q_vals = self.agent(state_t).squeeze(0).numpy()
        # Biased random: argmax repeated n_actions times + all actions
        best_action = np.argmax(q_vals)
        candidates = np.concatenate([
            np.full(self.n_actions, best_action),
            np.arange(self.n_actions)
        ])
        return int(np.random.choice(candidates))

    def store(self, state, action, reward, next_state):
        self.memory.insert(state, action, reward, next_state)

    def experience_replay(self, batch_size):
        if self.memory.size < batch_size:
            return
        S, A, R, Sn = self.memory.get_batch(batch_size)
        S_t = torch.FloatTensor(S)
        Sn_t = torch.FloatTensor(Sn)
        R_t = torch.FloatTensor(R)

        # Current Q values
        q_current = self.agent(S_t)

        # DDQN: use agent to select best next action, target to evaluate it
        with torch.no_grad():
            q_next_agent = self.agent(Sn_t)
            q_next_target = self.target(Sn_t)
            next_best_actions = q_next_agent.argmax(dim=1)
            q_next_vals = q_next_target[torch.arange(batch_size), next_best_actions]

        # Build target Q values
        q_target = q_current.clone().detach()
        for i in range(batch_size):
            q_target[i, A[i]] = R_t[i] + self.discount * q_next_vals[i]

        # Train
        self.optimizer.zero_grad()
        loss = self.loss_fn(q_current, q_target)
        loss.backward()
        self.optimizer.step()


def _env_selection(pop_dec, pop_obj, NI, M, status):
    """
    Environmental selection using SPEA2 fitness and truncation.

    Parameters
    ----------
    pop_dec : np.ndarray, shape (N, D)
    pop_obj : np.ndarray, shape (N, M_ext) - may include constraint columns
    NI : int, target size
    M : int, number of real objectives
    status : int, action index (1, 2, or 3)

    Returns
    -------
    pop_dec, pop_obj, fitness : selected arrays
    """
    # Remove duplicates
    _, unique_idx = np.unique(pop_obj[:, :M], axis=0, return_index=True)
    unique_idx = np.sort(unique_idx)
    pop_dec = pop_dec[unique_idx]
    pop_obj = pop_obj[unique_idx]

    N = pop_dec.shape[0]
    if N == 0:
        return pop_dec, pop_obj, np.array([])

    # Calculate fitness based on action status
    if status == 1:
        real_obj = pop_obj[:, :M]
        cv = np.maximum(0, pop_obj[:, -1:])
        fitness = spea2_fitness(real_obj, cv)
    elif status == 2:
        real_obj = pop_obj[:, :M]
        pop_con = pop_obj[:, M:]
        cv = np.sum(np.maximum(0, pop_con), axis=1, keepdims=True)
        fitness = spea2_fitness(np.hstack([real_obj, cv]))
    else:  # status == 3
        fitness = spea2_fitness(pop_obj)

    # Selection: prefer fitness < 1, fill/truncate as needed
    next_mask = fitness < 1
    if np.sum(next_mask) < NI:
        rank = np.argsort(fitness)
        next_mask[:] = False
        next_mask[rank[:min(NI, N)]] = True
    elif np.sum(next_mask) > NI:
        if status != 3:
            real_obj_sel = pop_obj[next_mask, :M]
        else:
            real_obj_sel = pop_obj[next_mask]
        kept_indices = spea2_truncation(real_obj_sel, NI)
        temp = np.where(next_mask)[0]
        next_mask[:] = False
        next_mask[temp[kept_indices]] = True

    return pop_dec[next_mask], pop_obj[next_mask], fitness[next_mask]


# ============================================================================
# State and Reward Generation
# ============================================================================

def _compute_hv(objs, ref_point):
    """Compute hypervolume for 2D objectives (fast exact computation)."""
    if objs.shape[0] == 0:
        return 0.0
    # Filter dominated and above ref
    valid = np.all(objs < ref_point, axis=1)
    objs = objs[valid]
    if objs.shape[0] == 0:
        return 0.0

    M = objs.shape[1]
    if M == 2:
        return _exact_hv_2d(objs, ref_point)
    else:
        # Monte Carlo approximation for M>=3
        n_samples = 10000
        samples = np.random.uniform(
            np.min(objs, axis=0),
            ref_point,
            size=(n_samples, M)
        )
        dominated = np.zeros(n_samples, dtype=bool)
        for i in range(objs.shape[0]):
            dominated |= np.all(samples >= objs[i], axis=1)
        vol = np.prod(ref_point - np.min(objs, axis=0))
        return vol * np.mean(dominated)


def _exact_hv_2d(objs, ref_point):
    """Exact 2D hypervolume via sweep line."""
    N = objs.shape[0]
    if N == 0:
        return 0.0
    # Non-dominated filter
    sorted_idx = np.argsort(objs[:, 0])
    sorted_objs = objs[sorted_idx]
    nd = [sorted_objs[0]]
    for i in range(1, N):
        if sorted_objs[i, 1] < nd[-1][1]:
            nd.append(sorted_objs[i])
    nd = np.array(nd)

    hv = 0.0
    for i in range(len(nd)):
        x_width = (nd[i + 1, 0] if i + 1 < len(nd) else ref_point[0]) - nd[i, 0]
        y_height = ref_point[1] - nd[i, 1]
        hv += x_width * y_height
    return hv


def _generate_sample(last_objs, last_cons, curr_objs, curr_cons,
                      last_fe_ratio, curr_fe_ratio):
    """
    Generate state/reward for DDQN.

    Parameters
    ----------
    last_objs : np.ndarray, shape (N1, M), last archive objectives
    last_cons : np.ndarray, shape (N1, C), last archive constraints
    curr_objs : np.ndarray, shape (N2, M), current archive objectives
    curr_cons : np.ndarray, shape (N2, C), current archive constraints
    last_fe_ratio : float, previous FE / maxFE
    curr_fe_ratio : float, current FE / maxFE

    Returns
    -------
    last_state : np.ndarray, shape (4,)
    curr_state : np.ndarray, shape (4,)
    reward : float
    """
    M = last_objs.shape[1]

    # Compute HV with shared reference point
    all_objs = np.vstack([last_objs, curr_objs])
    ref_point = np.max(all_objs, axis=0)

    last_hv = _compute_hv(last_objs, ref_point)
    curr_hv = _compute_hv(curr_objs, ref_point)

    # Reward 1: HV improvement ratio
    if last_hv == 0 or np.isnan(last_hv):
        reward1 = 0.0
    else:
        reward1 = (curr_hv - last_hv) / abs(last_hv)
    if np.isnan(reward1):
        reward1 = 0.0

    # Reward 2: CV improvement ratio
    last_cv_total = np.sum(np.maximum(0, last_cons))
    curr_cv_total = np.sum(np.maximum(0, curr_cons))
    if last_cv_total == 0:
        reward2 = 0.0
    elif curr_cv_total < last_cv_total:
        reward2 = abs((curr_cv_total - last_cv_total) / last_cv_total)
    else:
        reward2 = -abs((curr_cv_total - last_cv_total) / last_cv_total)

    reward = reward1 + reward2

    # States
    last_state = np.array([
        np.sum(np.var(last_objs, axis=0)),
        np.sum(last_objs),
        last_cv_total,
        last_fe_ratio
    ])
    curr_state = np.array([
        np.sum(np.var(curr_objs, axis=0)),
        np.sum(curr_objs),
        curr_cv_total,
        curr_fe_ratio
    ])

    return last_state, curr_state, reward


# ============================================================================
# Archive and Population Update
# ============================================================================

def _update_archive(arc_decs, arc_objs, arc_cons, N):
    """
    Update archive: deduplicate, SPEA2 selection with truncation.

    Parameters
    ----------
    arc_decs, arc_objs, arc_cons : archive data
    N : int, max archive size

    Returns
    -------
    arc_decs, arc_objs, arc_cons : updated archive
    """
    # Deduplicate
    _, unique_idx = np.unique(arc_objs, axis=0, return_index=True)
    unique_idx = np.sort(unique_idx)
    arc_decs = arc_decs[unique_idx]
    arc_objs = arc_objs[unique_idx]
    arc_cons = arc_cons[unique_idx]

    if arc_decs.shape[0] > N:
        fitness = spea2_fitness(arc_objs, arc_cons)
        next_mask = fitness < 1
        if np.sum(next_mask) < N:
            rank = np.argsort(fitness)
            next_mask[:] = False
            next_mask[rank[:N]] = True
        elif np.sum(next_mask) > N:
            kept_indices = spea2_truncation(arc_objs[next_mask], N)
            temp = np.where(next_mask)[0]
            next_mask[:] = False
            next_mask[temp[kept_indices]] = True
        arc_decs = arc_decs[next_mask]
        arc_objs = arc_objs[next_mask]
        arc_cons = arc_cons[next_mask]

    return arc_decs, arc_objs, arc_cons


def _update_population(pop_decs, pop_objs, pop_cons, new_decs, new_objs, new_cons,
                        N, status):
    """
    Update population: remove duplicates with new, SPEA2 selection to N, append new.

    Parameters
    ----------
    pop_decs, pop_objs, pop_cons : current population
    new_decs, new_objs, new_cons : newly evaluated solutions
    N : int, target size (NI - mu)
    status : int, action index

    Returns
    -------
    pop_decs, pop_objs, pop_cons : updated population
    """
    # Remove solutions in population that duplicate with new
    if new_objs.shape[0] > 0:
        keep = []
        for i in range(pop_objs.shape[0]):
            is_dup = np.any(np.all(np.abs(pop_objs[i] - new_objs) < 1e-10, axis=1))
            if not is_dup:
                keep.append(i)
        if len(keep) > 0:
            keep = np.array(keep)
            pop_decs = pop_decs[keep]
            pop_objs = pop_objs[keep]
            pop_cons = pop_cons[keep]
        else:
            pop_decs = np.empty((0, pop_decs.shape[1]))
            pop_objs = np.empty((0, pop_objs.shape[1]))
            pop_cons = np.empty((0, pop_cons.shape[1]))

    # Deduplicate within population
    if pop_objs.shape[0] > 0:
        _, unique_idx = np.unique(pop_objs, axis=0, return_index=True)
        unique_idx = np.sort(unique_idx)
        pop_decs = pop_decs[unique_idx]
        pop_objs = pop_objs[unique_idx]
        pop_cons = pop_cons[unique_idx]

    # SPEA2 selection to N
    if pop_decs.shape[0] > N:
        if status == 1:
            fitness = spea2_fitness(pop_objs, pop_cons)
        elif status == 2:
            cv = np.sum(np.maximum(0, pop_cons), axis=1, keepdims=True)
            fitness = spea2_fitness(np.hstack([pop_objs, cv]))
        else:  # status == 3
            fitness = spea2_fitness(pop_objs, pop_cons)

        next_mask = fitness < 1
        if np.sum(next_mask) < N:
            rank = np.argsort(fitness)
            next_mask[:] = False
            next_mask[rank[:N]] = True
        elif np.sum(next_mask) > N:
            kept_indices = spea2_truncation(pop_objs[next_mask], N)
            temp = np.where(next_mask)[0]
            next_mask[:] = False
            next_mask[temp[kept_indices]] = True
        pop_decs = pop_decs[next_mask]
        pop_objs = pop_objs[next_mask]
        pop_cons = pop_cons[next_mask]

    # Append new solutions
    if new_decs.shape[0] > 0:
        pop_decs = np.vstack([pop_decs, new_decs])
        pop_objs = np.vstack([pop_objs, new_objs])
        pop_cons = np.vstack([pop_cons, new_cons])

    return pop_decs, pop_objs, pop_cons


# ============================================================================
# Normalize Constraint Violation
# ============================================================================

def _normalize_cv(pop_con):
    """
    Normalize constraint violations and return aggregated scalar CV per solution.

    Parameters
    ----------
    pop_con : np.ndarray, shape (N, C), raw constraint values

    Returns
    -------
    cv : np.ndarray, shape (N, 1), normalized aggregate CV
    """
    pop_con = np.maximum(0, pop_con)
    cmin = np.min(pop_con, axis=0)
    cmax = np.max(pop_con, axis=0)
    denom = cmax - cmin
    denom[denom == 0] = 1.0
    pop_con_norm = (pop_con - cmin) / denom
    # Handle NaN from zero columns
    pop_con_norm = np.nan_to_num(pop_con_norm, nan=0.0)
    return np.sum(pop_con_norm, axis=1, keepdims=True)


# ============================================================================
# Main Algorithm
# ============================================================================

[docs] class DRLSAEA: """ Deep Reinforcement Learning-assisted Surrogate-Assisted Evolutionary Algorithm for expensive constrained multi-objective optimization. Uses DDQN to select among 3 constraint handling strategies: - Action 0: objectives + normalized aggregate CV - Action 1: objectives + individual normalized active constraints - Action 2: objectives only (unconstrained) Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=100, wmax=20, mu=5, save_data=True, save_path='./Data', name='DRL-SAEA', disable_tqdm=True): """ Initialize DRL-SAEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Archive/population size per task (default: 100) wmax : int, optional Number of inner GA generations on surrogates (default: 20) mu : int, optional Number of real evaluated solutions per iteration (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'DRL-SAEA') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.wmax = wmax self.mu = mu self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the DRL-SAEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs n_cons = problem.n_cons # Set default initial samples: 11*dim - 1 if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_per_task = par_list(self.n, nt) # Generate initial samples using LHS decs = initialization(problem, n_initial_per_task, method='lhs') objs, cons = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # History tracking has_cons = any(c.shape[1] > 0 for c in cons) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) # Per-task optimization for task_i in range(nt): m = n_objs[task_i] dim = dims[task_i] nc = n_cons[task_i] NI = n_initial_per_task[task_i] N_archive = n_per_task[task_i] # Population and archive for this task pop_decs = decs[task_i].copy() pop_objs = objs[task_i].copy() pop_cons = cons[task_i].copy() if nc > 0 else np.zeros((NI, 0)) # Initialize archive arc_decs = pop_decs.copy() arc_objs = pop_objs.copy() arc_cons = pop_cons.copy() if nc > 0: arc_decs, arc_objs, arc_cons = _update_archive( arc_decs, arc_objs, arc_cons, N_archive ) else: # For unconstrained: just keep non-dominated front_no, _ = nd_sort(arc_objs, arc_objs.shape[0]) nd_mask = front_no == 1 if np.sum(nd_mask) > N_archive: fitness = spea2_fitness(arc_objs) rank = np.argsort(fitness) nd_mask = np.zeros(arc_decs.shape[0], dtype=bool) nd_mask[rank[:N_archive]] = True arc_decs = arc_decs[nd_mask] arc_objs = arc_objs[nd_mask] arc_cons = arc_cons[nd_mask] if nc > 0 else np.zeros((np.sum(nd_mask), 0)) last_arc_objs = arc_objs.copy() last_arc_cons = arc_cons.copy() if nc > 0 else np.zeros((arc_objs.shape[0], 1)) # DDQN setup num_actions = 3 num_states = 4 exp_replay_freq = 8 copy_weights_freq = exp_replay_freq + 3 # 11 batch_size = 16 step = 0 ddqn = DDQN(num_states, num_actions, (5, 5), max_er=100) # Initial state fe_ratio = nfes_per_task[task_i] / max_nfes_per_task[task_i] cv_total = np.sum(np.maximum(0, last_arc_cons)) if nc > 0 else 0.0 state = np.array([ np.sum(np.var(arc_objs, axis=0)), np.sum(arc_objs), cv_total, fe_ratio ]) while nfes_per_task[task_i] < max_nfes_per_task[task_i]: action = ddqn.action(state) step += 1 # Build surrogate models based on action if action == 0: # Action 1 in MATLAB: objectives + normalized aggregate CV status = 1 if nc > 0: cv_col = _normalize_cv(pop_cons) surr_objs = np.hstack([pop_objs, cv_col]) else: surr_objs = np.hstack([pop_objs, np.zeros((pop_objs.shape[0], 1))]) M_surr = surr_objs.shape[1] models = [] for j in range(M_surr): gp = gp_build(pop_decs, surr_objs[:, j:j+1], data_type) models.append(gp) fitness = spea2_fitness(pop_objs, pop_cons if nc > 0 else None) elif action == 1: # Action 2 in MATLAB: objectives + individual active normalized constraints status = 2 if nc > 0: raw_con = np.maximum(0, pop_cons) max_con = np.max(raw_con, axis=0) active_idx = np.where(max_con > 0)[0] if len(active_idx) > 0: active_con = raw_con[:, active_idx] cmin = np.min(active_con, axis=0) cmax = np.max(active_con, axis=0) denom = cmax - cmin denom[denom == 0] = 1.0 active_con_norm = (active_con - cmin) / denom surr_objs = np.hstack([pop_objs, active_con_norm]) else: surr_objs = pop_objs.copy() else: surr_objs = pop_objs.copy() M_surr = surr_objs.shape[1] models = [] for j in range(M_surr): gp = gp_build(pop_decs, surr_objs[:, j:j+1], data_type) models.append(gp) # Fitness with aggregated CV as extra objective if nc > 0: cv_agg = np.sum(np.maximum(0, pop_cons), axis=1, keepdims=True) fitness = spea2_fitness(np.hstack([pop_objs, cv_agg])) else: fitness = spea2_fitness(pop_objs) else: # action == 2 # Action 3 in MATLAB: objectives only status = 3 surr_objs = pop_objs.copy() M_surr = m models = [] for j in range(M_surr): gp = gp_build(pop_decs, surr_objs[:, j:j+1], data_type) models.append(gp) fitness = spea2_fitness(pop_objs) # Inner GA evolution on surrogates inner_decs = pop_decs.copy() inner_objs = surr_objs.copy() inner_fitness = fitness.copy() for w in range(self.wmax): # Tournament selection and GA generation mating_pool = tournament_selection(2, NI, -inner_fitness) off_decs = ga_generation(inner_decs[mating_pool], muc=20.0, mum=20.0) inner_decs = np.vstack([inner_decs, off_decs]) # Predict all solutions using surrogates N_inner = inner_decs.shape[0] pred_objs = np.zeros((N_inner, M_surr)) for j in range(M_surr): pred_j, _ = gp_predict(models[j], inner_decs, data_type) pred_objs[:, j] = pred_j.ravel() inner_objs = pred_objs # Environmental selection inner_decs, inner_objs, inner_fitness = _env_selection( inner_decs, inner_objs, NI, m, status ) # Select mu best for expensive evaluation sel_decs, _, _ = _env_selection(inner_decs, inner_objs, self.mu, m, status) # Remove duplicates sel_decs = remove_duplicates(sel_decs, decs[task_i]) if sel_decs.shape[0] == 0: # If all duplicates, generate random solutions sel_decs = np.random.rand(self.mu, dim) sel_decs = remove_duplicates(sel_decs, decs[task_i]) if sel_decs.shape[0] == 0: continue # Expensive evaluation new_objs, new_cons = evaluation_single(problem, sel_decs, task_i) n_new = sel_decs.shape[0] # Update cumulative dataset decs[task_i] = np.vstack([decs[task_i], sel_decs]) objs[task_i] = np.vstack([objs[task_i], new_objs]) if nc > 0: cons[task_i] = np.vstack([cons[task_i], new_cons]) # Update population pop_decs, pop_objs, pop_cons = _update_population( pop_decs, pop_objs, pop_cons if nc > 0 else np.zeros((pop_decs.shape[0], max(1, nc))), sel_decs, new_objs, new_cons if nc > 0 else np.zeros((n_new, max(1, nc))), NI - self.mu, status ) # Restore cons shape for unconstrained if nc == 0: pop_cons = np.zeros((pop_decs.shape[0], 0)) # Update archive combined_arc_decs = np.vstack([arc_decs, sel_decs]) combined_arc_objs = np.vstack([arc_objs, new_objs]) if nc > 0: combined_arc_cons = np.vstack([arc_cons, new_cons]) else: combined_arc_cons = np.zeros((combined_arc_objs.shape[0], 1)) arc_decs, arc_objs, arc_cons_full = _update_archive( combined_arc_decs, combined_arc_objs, combined_arc_cons, N_archive ) if nc == 0: arc_cons = np.zeros((arc_decs.shape[0], 0)) else: arc_cons = arc_cons_full prev_fe_ratio = nfes_per_task[task_i] / max_nfes_per_task[task_i] nfes_per_task[task_i] += n_new pbar.update(n_new) # Generate state/reward for DDQN curr_cons_for_state = arc_cons if nc > 0 else np.zeros((arc_objs.shape[0], 1)) last_cons_for_state = last_arc_cons if nc > 0 else np.zeros((last_arc_objs.shape[0], 1)) fe_ratio = nfes_per_task[task_i] / max_nfes_per_task[task_i] last_state, curr_state, reward = _generate_sample( last_arc_objs, last_cons_for_state, arc_objs, curr_cons_for_state, prev_fe_ratio, fe_ratio ) ddqn.store(state, action, reward, curr_state) state = curr_state last_arc_objs = arc_objs.copy() last_arc_cons = arc_cons.copy() if nc > 0 else np.zeros((arc_objs.shape[0], 1)) # Periodic DDQN updates if step % exp_replay_freq == 0: ddqn.experience_replay(batch_size) if step % copy_weights_freq == 0: ddqn.copy_weights_agent_to_target() pbar.close() runtime = time.time() - start_time if has_cons: all_decs, all_objs, all_cons = build_staircase_history(decs, objs, k=self.mu, db_cons=cons) else: all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu) all_cons = None results = build_save_results( all_decs, all_objs, runtime, max_nfes_per_task, all_cons=all_cons if has_cons else None, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results