"""
Coevolutionary Framework for Constrained Multiobjective Optimization (CCMO)
This module implements CCMO for constrained multi-objective optimization problems.
References
----------
[1] Tian, Ye, et al. "A Coevolutionary Framework for Constrained Multiobjective Optimization Problems." IEEE Transactions on Evolutionary Computation 25.1 (2021): 102-116.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.14
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from ddmtolab.Algorithms.STMO.SPEA2 import SPEA2
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class CCMO:
"""
Coevolutionary Framework for Constrained Multiobjective Optimization.
CCMO uses two co-evolving populations:
- Population 1: Optimizes objectives with strict constraint handling (epsilon=0)
- Population 2: Optimizes objectives with relaxed constraints (epsilon=inf)
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'False',
'n': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, muc=20.0, mum=15.0,
save_data=True, save_path='./Data', name='CCMO',
disable_tqdm=True):
"""
Initialize CCMO algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int or List[int], optional
Population size per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 10000)
muc : float, optional
Distribution index for simulated binary crossover (SBX) (default: 20.0)
mum : float, optional
Distribution index for polynomial mutation (PM) (default: 15.0)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'CCMO_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.muc = muc
self.mum = mum
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
# Create SPEA2 instance for reusing methods
self._spea2 = SPEA2(problem, n, max_nfes, muc, mum, epsilon=0,
save_data=False, disable_tqdm=True)
[docs]
def optimize(self):
"""
Execute the CCMO algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, constraints, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
n_per_task = par_list(self.n, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Initialize two populations for each task
# Population 1: strict constraint handling (epsilon=0)
decs1 = initialization(problem, n_per_task)
objs1, cons1 = evaluation(problem, decs1)
# Population 2: relaxed constraint handling (epsilon=inf)
decs2 = initialization(problem, n_per_task)
objs2, cons2 = evaluation(problem, decs2)
nfes_per_task = [2 * n for n in n_per_task] # Two populations initialized
# History tracking uses population 1
all_decs, all_objs, all_cons = init_history(decs1, objs1, cons1)
# Calculate initial fitness for both populations using SPEA2 selection
fitness1 = []
fitness2 = []
for i in range(nt):
# Population 1: epsilon = 0 (strict constraints)
selected_indices1, selected_fitness1 = self._selection_spea2(
objs1[i], cons1[i], n_per_task[i], epsilon=0
)
objs1[i] = objs1[i][selected_indices1]
decs1[i] = decs1[i][selected_indices1]
cons1[i] = cons1[i][selected_indices1] if cons1[i] is not None else None
fitness1.append(selected_fitness1.copy())
# Population 2: epsilon = inf (relaxed constraints)
selected_indices2, selected_fitness2 = self._selection_spea2(
objs2[i], cons2[i], n_per_task[i], epsilon=np.inf
)
objs2[i] = objs2[i][selected_indices2]
decs2[i] = decs2[i][selected_indices2]
cons2[i] = cons2[i][selected_indices2] if cons2[i] is not None else None
fitness2.append(selected_fitness2.copy())
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(nfes_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Skip tasks that have exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
# Parent selection from both populations
matingpool1 = tournament_selection(2, n_per_task[i], fitness1[i])
matingpool2 = tournament_selection(2, n_per_task[i], fitness2[i])
# Generate offspring from both populations
off_decs1 = ga_generation(decs1[i][matingpool1, :], muc=self.muc, mum=self.mum)
off_decs2 = ga_generation(decs2[i][matingpool2, :], muc=self.muc, mum=self.mum)
# Combine offspring from both populations
off_decs_combined = np.vstack([off_decs1, off_decs2])
off_objs, off_cons = evaluation_single(problem, off_decs_combined, i)
# Update both populations by merging with offspring
objs1[i], decs1[i], cons1[i] = vstack_groups(
(objs1[i], off_objs),
(decs1[i], off_decs_combined),
(cons1[i], off_cons)
)
objs2[i], decs2[i], cons2[i] = vstack_groups(
(objs2[i], off_objs),
(decs2[i], off_decs_combined),
(cons2[i], off_cons)
)
# Environmental selection for both populations using SPEA2 selection
# Population 1: epsilon = 0 (strict constraints)
selected_indices1, selected_fitness1 = self._selection_spea2(
objs1[i], cons1[i], n_per_task[i], epsilon=0
)
objs1[i] = objs1[i][selected_indices1]
decs1[i] = decs1[i][selected_indices1]
cons1[i] = cons1[i][selected_indices1] if cons1[i] is not None else None
fitness1[i] = selected_fitness1
# Population 2: epsilon = inf (relaxed constraints)
selected_indices2, selected_fitness2 = self._selection_spea2(
objs2[i], cons2[i], n_per_task[i], epsilon=np.inf
)
objs2[i] = objs2[i][selected_indices2]
decs2[i] = decs2[i][selected_indices2]
cons2[i] = cons2[i][selected_indices2] if cons2[i] is not None else None
fitness2[i] = selected_fitness2
# Update evaluation count (2*N offspring evaluated)
nfes_per_task[i] += 2 * n_per_task[i]
pbar.update(2 * n_per_task[i])
# Update history with population 1 (strict constraints)
append_history(all_decs[i], decs1[i], all_objs[i], objs1[i], all_cons[i], cons1[i])
pbar.close()
runtime = time.time() - start_time
# Save results using population 1
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def _selection_spea2(self, objs, cons, N, epsilon=0):
"""
SPEA2-based environmental selection with epsilon constraint handling.
This method reuses SPEA2's selection logic with custom epsilon values.
Parameters
----------
objs : ndarray
Objective values with shape (pop_size, n_objs)
cons : ndarray or None
Constraint values with shape (pop_size, n_cons)
N : int
Number of individuals to select
epsilon : float, optional
Epsilon threshold for constraint violation (default: 0)
Returns
-------
selected_indices : ndarray
Indices of selected individuals
selected_fitness : ndarray
Fitness values of selected individuals
"""
# Temporarily set epsilon for constraint violation calculation
original_epsilon = self._spea2.epsilon
self._spea2.epsilon = epsilon
# Use SPEA2's selection method
selected_indices, selected_fitness = self._spea2._spea2_selection(objs, cons, N)
# Restore original epsilon
self._spea2.epsilon = original_epsilon
return selected_indices, selected_fitness