"""
Data Analyzer Module for Multi-Task Optimization Experiments
This module provides a comprehensive analysis and visualization pipeline for
multi-task optimization experiments, including metric calculation, statistical
comparison tables, convergence plots, runtime analysis, and Pareto front visualization.
Classes:
MetricResults: Dataclass for storing metric calculation results
TableConfig: Dataclass for table generation configuration
PlotConfig: Dataclass for plot generation configuration
DataAnalyzer: Main class for data analysis pipeline
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.10.10
Version: 2.1
"""
import os
import pickle
import shutil
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Union, Callable
from dataclasses import dataclass, field
from matplotlib.ticker import FuncFormatter
from enum import Enum
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from openpyxl import load_workbook
from openpyxl.styles import Border, Side, Alignment, Font
from scipy import stats
from tqdm import tqdm
# Import from project modules
from ddmtolab.Methods.metrics import IGD, HV, GD, IGDp, FR, CV, DeltaP, Spread, Spacing
from ddmtolab.Methods.Algo_Methods.algo_utils import nd_sort
# =============================================================================
# Enums and Constants
# =============================================================================
class OptimizationDirection(Enum):
"""Optimization direction enumeration."""
MINIMIZE = "minimize"
MAXIMIZE = "maximize"
class TableFormat(Enum):
"""Output table format enumeration."""
EXCEL = "excel"
LATEX = "latex"
class StatisticType(Enum):
"""Statistical measure type enumeration."""
MEAN = "mean"
MEDIAN = "median"
MAX = "max"
MIN = "min"
# Default color palette for plots
DEFAULT_COLORS = [
'#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd',
'#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf',
'#e41a1c', '#377eb8', '#4daf4a', '#984ea3', '#ff8c00'
]
# Default markers for plots
DEFAULT_MARKERS = ['o', 's', '^', 'v', 'D', 'p', '*', 'h', '<', '>', 'X', 'P', 'd', '8', 'H']
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class ScanResult:
"""
Result of scanning a data directory.
:no-index:
Attributes
----------
algorithms : List[str]
Sorted list of algorithm names found in the directory.
problems : List[str]
Sorted list of problem names extracted from filenames.
runs : int
Number of independent runs per algorithm-problem combination.
data_path : Path
Path to the scanned data directory.
"""
algorithms: List[str]
problems: List[str]
runs: int
data_path: Path
@dataclass
class MetricResults:
"""
Container for all metric calculation results.
:no-index:
Attributes
----------
metric_values : Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
Nested dictionary storing metric values per generation.
Structure: metric_values[algorithm][problem][run] = List[np.ndarray]
where each np.ndarray contains metric values per generation for each task.
best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
Nested dictionary storing final best metric values.
Structure: best_values[algorithm][problem][run] = List[float]
where each float is the final best value for each task.
objective_values : Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
Nested dictionary storing original objective values.
Structure: objective_values[algorithm][problem][run] = List[np.ndarray]
where each np.ndarray has shape (n_solutions, n_objectives).
runtime : Dict[str, Dict[str, Dict[int, float]]]
Nested dictionary storing runtime in seconds.
Structure: runtime[algorithm][problem][run] = float
max_nfes : Dict[str, Dict[str, List[int]]]
Nested dictionary storing maximum number of function evaluations.
Structure: max_nfes[algorithm][problem] = List[int] (per task)
metric_name : Optional[str]
Name of the metric used (e.g., 'IGD', 'HV', or None for single-objective).
"""
metric_values: Dict[str, Dict[str, Dict[int, Any]]]
best_values: Dict[str, Dict[str, Dict[int, List[float]]]]
objective_values: Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
runtime: Dict[str, Dict[str, Dict[int, float]]]
max_nfes: Dict[str, Dict[str, List[int]]]
metric_name: Optional[str]
@dataclass
class TableConfig:
"""
Configuration for table generation.
:no-index:
Attributes
----------
table_format : TableFormat
Output format (EXCEL or LATEX).
statistic_type : StatisticType
Type of statistic to display (MEAN, MEDIAN, MAX, MIN).
significance_level : float
P-value threshold for statistical significance testing.
Default: 0.05
rank_sum_test : bool
Whether to perform Wilcoxon rank-sum test.
Default: True
save_path : Path
Directory path to save output tables.
"""
table_format: TableFormat = TableFormat.EXCEL
statistic_type: StatisticType = StatisticType.MEAN
significance_level: float = 0.05
rank_sum_test: bool = True
save_path: Path = Path('./Results')
@dataclass
class PlotConfig:
"""
Configuration for plot generation.
:no-index:
Attributes
----------
figure_format : str
Output figure format (e.g., 'pdf', 'png', 'svg').
Default: 'pdf'
statistic_type : StatisticType
Type of statistic for selecting representative run.
log_scale : bool
Whether to use logarithmic scale for y-axis.
Default: False
show_pf : bool
Whether to show true Pareto front in ND solution plots.
Default: True
show_nd : bool
Whether to filter and show only non-dominated solutions.
Default: True
merge_plots : bool
Whether to merge all plots into a single figure.
Default: False
merge_columns : int
Number of columns in merged plot layout.
Default: 3
show_std_band : bool
Whether to show standard deviation band on convergence curves.
Default: False
save_path : Path
Directory path to save output figures.
colors : List[str]
Color palette for plotting algorithms.
markers : List[str]
Marker styles for plotting algorithms.
"""
figure_format: str = 'pdf'
statistic_type: StatisticType = StatisticType.MEAN
log_scale: bool = False
show_pf: bool = True
show_nd: bool = True
merge_plots: bool = False
merge_columns: int = 3
show_std_band: bool = False
save_path: Path = Path('./Results')
colors: List[str] = field(default_factory=lambda: DEFAULT_COLORS.copy())
markers: List[str] = field(default_factory=lambda: DEFAULT_MARKERS.copy())
@dataclass
class ComparisonResult:
"""
Result of statistical comparison between algorithms.
:no-index:
Attributes
----------
symbol : str
Comparison symbol: '+' (better), '-' (worse), '=' (no significant difference).
p_value : Optional[float]
P-value from statistical test, or None if test not performed.
"""
symbol: str
p_value: Optional[float] = None
@dataclass
class ComparisonCounts:
"""
Aggregated comparison counts for an algorithm.
:no-index:
Attributes
----------
plus : int
Number of significantly better results.
minus : int
Number of significantly worse results.
equal : int
Number of statistically equivalent results.
"""
plus: int = 0
minus: int = 0
equal: int = 0
# =============================================================================
# Utility Functions
# =============================================================================
class DataUtils:
"""
Utility class for data loading and processing operations.
"""
@staticmethod
def load_pickle(file_path: Path) -> Dict[str, Any]:
"""
Load and return a Python object from a pickle file.
Parameters
----------
file_path : Path
Path to the pickle file.
Returns
-------
Dict[str, Any]
Unpickled Python object (typically a dictionary containing
'all_objs', 'runtime', 'max_nfes' keys).
Raises
------
FileNotFoundError
If the pickle file does not exist.
pickle.UnpicklingError
If the file cannot be unpickled.
"""
with open(file_path, 'rb') as f:
return pickle.load(f)
@staticmethod
def load_reference(
settings: Dict[str, Any],
problem: str,
task_identifier: Union[str, int],
M: int,
D: Optional[int] = None,
C: int = 0
) -> Optional[np.ndarray]:
"""
Load reference data (Pareto Front or reference point) for a specific problem and task.
Parameters
----------
settings : Dict[str, Any]
Dictionary containing problem configurations and reference definitions.
Expected keys:
- problem (str): Contains task definitions
- 'n_ref' (int, optional): Number of reference points (default: 10000)
- 'ref_path' (str, optional): Path to reference files (default: './MOReference')
problem : str
Name of the problem (e.g., "DTLZ1", "DTLZ2").
task_identifier : Union[str, int]
Task identifier - either task name (str like "T1") or index (int like 0).
M : int
Number of objectives (required).
D : int, optional
Number of decision variables (dimension).
C : int, optional
Number of constraints (default: 0).
Returns
-------
Optional[np.ndarray]
Reference data with shape (n_points, M), or None if not available.
Notes
-----
Supports three types of reference definitions:
1. Callable: Function that returns reference data
- Must accept parameter N (number of reference points)
- Must accept parameter M (number of objectives)
- May optionally accept parameters D, C
- Example signatures: ``func(N, M)``, ``func(N, M, D)``, ``func(N, M, D, C)``
2. String: File path to .npy or .csv reference file
3. Array-like: Direct reference data (list, tuple, np.ndarray)
If 'all_tasks' key is present instead of individual task keys, the same
reference data will be used for all tasks.
"""
# Convert task index to task name if necessary
task_name = f"T{task_identifier + 1}" if isinstance(task_identifier, int) else task_identifier
# Check if problem exists in settings
if problem not in settings:
print(f"Warning: Problem '{problem}' not found in settings")
return None
problem_settings = settings[problem]
# Check if task exists for this problem
if task_name in problem_settings:
ref_definition = problem_settings[task_name]
elif 'all_tasks' in problem_settings:
# Use the same reference for all tasks
ref_definition = problem_settings['all_tasks']
else:
print(f"Warning: Task '{task_name}' and 'all_tasks' not found for problem '{problem}'")
return None
# Case 1: Callable function
if callable(ref_definition):
N = settings.get('n_ref', 10000)
try:
import inspect
sig = inspect.signature(ref_definition)
params = list(sig.parameters.keys())
num_params = len(params)
if num_params == 2:
# func(N, M)
return ref_definition(N, M)
elif num_params == 3:
# func(N, M, D)
if D is None:
print(f"Warning: D not provided for {problem}_{task_name}, using 0")
D = 0
return ref_definition(N, M, D)
elif num_params >= 4:
# func(N, M, D, C)
if D is None:
print(f"Warning: D not provided for {problem}_{task_name}, using 0")
D = 0
return ref_definition(N, M, D, C)
else:
print(
f"Warning: Unexpected number of parameters ({num_params}) for reference function {problem}_{task_name}")
return None
except Exception as e:
print(f"Warning: Failed to call reference function for {problem}_{task_name}: {e}")
return None
# Case 2: String (file path or file name)
elif isinstance(ref_definition, str):
return DataUtils._load_reference_from_file(
settings,
ref_definition,
problem,
task_name
)
# Case 3: Array-like (list, tuple, numpy array)
elif isinstance(ref_definition, (list, tuple, np.ndarray)):
reference = np.array(ref_definition)
# Ensure it's at least 2D
if reference.ndim == 1:
reference = reference.reshape(1, -1)
return reference
else:
print(f"Warning: Unknown reference definition type for {problem}_{task_name}: {type(ref_definition)}")
return None
@staticmethod
def _load_reference_from_file(
settings: Dict[str, Any],
ref_definition: str,
problem: str,
task_name: str
) -> Optional[np.ndarray]:
"""
Load reference data from file.
Parameters
----------
settings : Dict[str, Any]
Settings dictionary containing 'ref_path'.
ref_definition : str
File path or filename.
problem : str
Problem name for alternative path construction.
task_name : str
Task name for alternative path construction.
Returns
-------
Optional[np.ndarray]
Loaded reference data or None if loading fails.
"""
ref_path = settings.get('ref_path', './MOReference')
# Construct full path
if not os.path.isabs(ref_definition):
full_path = os.path.join(ref_path, ref_definition)
else:
full_path = ref_definition
# Try to load the file
try:
if full_path.endswith('.npy'):
return np.load(full_path)
elif full_path.endswith('.csv'):
return np.loadtxt(full_path, delimiter=',')
else:
print(f"Warning: Unsupported file format for '{full_path}'")
return None
except FileNotFoundError:
# Try alternative naming conventions
base_name = f"{problem}_{task_name}_ref"
for ext in ['.npy', '.csv']:
alt_path = os.path.join(ref_path, base_name + ext)
if os.path.exists(alt_path):
try:
if ext == '.npy':
return np.load(alt_path)
else:
return np.loadtxt(alt_path, delimiter=',')
except Exception as e:
print(f"Error loading file '{alt_path}': {e}")
print(f"Warning: File not found: '{full_path}'")
return None
except Exception as e:
print(f"Error loading reference data from file '{full_path}': {e}")
return None
@staticmethod
def get_metric_direction(metric_name: Optional[str]) -> OptimizationDirection:
"""
Determine optimization direction based on metric type (Version 2 - More maintainable).
Parameters
----------
metric_name : Optional[str]
Name of the metric or None for single-objective.
Returns
-------
OptimizationDirection
MINIMIZE or MAXIMIZE based on the metric's sign attribute.
"""
if metric_name is None:
return OptimizationDirection.MINIMIZE
# Metric sign mapping (based on your code)
# sign = -1 means minimize, sign = 1 means maximize
metric_signs = {
'IGD': -1, # Inverted Generational Distance (minimize)
'HV': 1, # Hypervolume (maximize)
'IGDp': -1, # IGD+ (minimize)
'GD': -1, # Generational Distance (minimize)
'DeltaP': -1, # Delta_p (minimize)
'Spacing': -1, # Spacing (minimize)
'Spread': -1, # Spread (minimize)
'FR': 1, # Feasibility Rate (maximize)
'CV': -1, # Constraint Violation (minimize)
}
if metric_name not in metric_signs:
raise ValueError(f'Unsupported metric: {metric_name}')
sign = metric_signs[metric_name]
return OptimizationDirection.MAXIMIZE if sign == 1 else OptimizationDirection.MINIMIZE
# =============================================================================
# Statistics Module
# =============================================================================
class StatisticsCalculator:
"""
Class for statistical calculations and hypothesis testing.
"""
@staticmethod
def calculate_statistic(
data: List[float],
statistic_type: StatisticType
) -> Tuple[float, Optional[float]]:
"""
Calculate a statistical measure and optional standard deviation from data.
Parameters
----------
data : List[float]
List of numeric values to compute statistics from.
statistic_type : StatisticType
Type of statistic to calculate (MEAN, MEDIAN, MAX, MIN).
Returns
-------
Tuple[float, Optional[float]]
Tuple of (statistic_value, std_value).
std_value is only returned for MEAN, None otherwise.
Returns (np.nan, np.nan) for empty data.
"""
if len(data) == 0:
return np.nan, np.nan
if statistic_type == StatisticType.MEAN:
stat_value = np.mean(data)
std_value = np.std(data, ddof=1) if len(data) > 1 else 0.0
return stat_value, std_value
elif statistic_type == StatisticType.MEDIAN:
return np.median(data), None
elif statistic_type == StatisticType.MAX:
return np.max(data), None
elif statistic_type == StatisticType.MIN:
return np.min(data), None
else:
return np.nan, np.nan
@staticmethod
def perform_rank_sum_test(
algo_data: List[float],
base_data: List[float],
significance_level: float = 0.05,
direction: OptimizationDirection = OptimizationDirection.MINIMIZE
) -> ComparisonResult:
"""
Perform Wilcoxon rank-sum test to compare two algorithms.
Parameters
----------
algo_data : List[float]
Data from the algorithm being tested.
base_data : List[float]
Data from the baseline algorithm.
significance_level : float, optional
P-value threshold for significance (default: 0.05).
direction : OptimizationDirection, optional
Optimization direction (MINIMIZE or MAXIMIZE).
Returns
-------
ComparisonResult
Result containing comparison symbol and p-value.
Symbol: '+' (better), '-' (worse), '=' (no significant difference).
"""
if len(algo_data) == 0 or len(base_data) == 0:
return ComparisonResult(symbol='=', p_value=None)
try:
_, p_value = stats.ranksums(algo_data, base_data)
if p_value < significance_level:
algo_median = np.median(algo_data)
base_median = np.median(base_data)
if direction == OptimizationDirection.MINIMIZE:
symbol = '+' if algo_median < base_median else '-'
else:
symbol = '+' if algo_median > base_median else '-'
else:
symbol = '='
return ComparisonResult(symbol=symbol, p_value=p_value)
except Exception:
return ComparisonResult(symbol='=', p_value=None)
@staticmethod
def collect_task_data(
all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
algo: str,
prob: str,
task_idx: int
) -> List[float]:
"""
Collect non-NaN values from all runs for a specific algorithm-problem-task combination.
Parameters
----------
all_best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
Nested dictionary containing best metric values.
algo : str
Algorithm name.
prob : str
Problem name.
task_idx : int
Task index (0-based).
Returns
-------
List[float]
List of non-NaN metric values from all runs.
"""
data = []
for run in all_best_values[algo][prob].keys():
value = all_best_values[algo][prob][run][task_idx]
if not np.isnan(value):
data.append(value)
return data
@staticmethod
def select_representative_run(
all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
algo: str,
prob: str,
task_idx: int,
statistic_type: StatisticType
) -> Optional[int]:
"""
Select a representative run based on the specified statistic type.
Parameters
----------
all_best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
Nested dictionary containing best metric values.
algo : str
Algorithm name.
prob : str
Problem name.
task_idx : int
Task index (0-based).
statistic_type : StatisticType
Type of statistic (MEAN returns None as all runs are used).
Returns
-------
Optional[int]
Run number of the representative run, or None if MEAN or no valid data.
"""
if statistic_type == StatisticType.MEAN:
return None
# Collect final values from all runs
final_values = []
runs = []
for run in all_best_values[algo][prob].keys():
value = all_best_values[algo][prob][run][task_idx]
if not np.isnan(value):
final_values.append(value)
runs.append(run)
if len(final_values) == 0:
return None
final_values = np.array(final_values)
runs = np.array(runs)
if statistic_type == StatisticType.MEDIAN:
target_value = np.median(final_values)
idx = np.argmin(np.abs(final_values - target_value))
elif statistic_type == StatisticType.MAX:
idx = np.argmax(final_values)
elif statistic_type == StatisticType.MIN:
idx = np.argmin(final_values)
else:
return None
return runs[idx]
# =============================================================================
# Table Generator Module
# =============================================================================
class TableGenerator:
"""
Class for generating comparison tables in Excel and LaTeX formats.
"""
def __init__(self, config: TableConfig):
"""
Initialize TableGenerator with configuration.
Parameters
----------
config : TableConfig
Configuration object for table generation.
"""
self.config = config
def generate(
self,
all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
algorithm_order: List[str],
metric_name: Optional[str] = None
) -> Union[pd.DataFrame, str]:
"""
Generate comparison table with statistical analysis.
Parameters
----------
all_best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
Nested dictionary containing best metric values.
Structure: all_best_values[algorithm][problem][run] = List[float]
algorithm_order : List[str]
List of algorithm names in display order.
The last algorithm is treated as the baseline for comparisons.
metric_name : Optional[str], optional
Metric name to determine optimization direction.
Returns
-------
Union[pd.DataFrame, str]
DataFrame for Excel format, LaTeX string for LaTeX format.
"""
# Extract problems and determine task count
# problems = sorted(all_best_values[algorithm_order[0]].keys())
problems = sorted(all_best_values[algorithm_order[0]].keys(),
key=lambda x: int(''.join(filter(str.isdigit, x))) if any(c.isdigit() for c in x) else x)
# Determine optimization direction
direction = DataUtils.get_metric_direction(metric_name)
# Generate data rows
rows, comparison_counts, algorithm_ranks = self._generate_data_rows(all_best_values, algorithm_order, problems, direction)
# Generate and save table
if self.config.table_format == TableFormat.EXCEL:
return self._generate_excel_table(rows, algorithm_order, comparison_counts, algorithm_ranks, direction)
else:
return self._generate_latex_table(rows, algorithm_order, comparison_counts, algorithm_ranks, direction)
def _generate_data_rows(
self,
all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
algorithm_order: List[str],
problems: List[str],
direction: OptimizationDirection
) -> Tuple[List[Dict[str, Any]], Dict[str, ComparisonCounts], Dict[str, List[int]]]:
"""
...
Returns
-------
Tuple[List[Dict[str, Any]], Dict[str, ComparisonCounts], Dict[str, List[int]]]
Tuple of (rows, comparison_counts, algorithm_ranks).
algorithm_ranks[algo] = List[int]
"""
base_algo = algorithm_order[-1]
rows = []
comparison_counts = {algo: ComparisonCounts() for algo in algorithm_order[:-1]}
algorithm_ranks = {algo: [] for algo in algorithm_order}
for prob in problems:
first_algo = algorithm_order[0]
first_run = list(all_best_values[first_algo][prob].keys())[0]
num_tasks = len(all_best_values[first_algo][prob][first_run])
for task_idx in range(num_tasks):
row = {'Problem': prob, 'Task': task_idx + 1}
algo_stat_values = {}
base_data = StatisticsCalculator.collect_task_data(
all_best_values, base_algo, prob, task_idx
)
for algo in algorithm_order:
algo_data = StatisticsCalculator.collect_task_data(
all_best_values, algo, prob, task_idx
)
stat_value, std_value = StatisticsCalculator.calculate_statistic(
algo_data, self.config.statistic_type
)
algo_stat_values[algo] = stat_value
symbol = ''
if self.config.rank_sum_test and algo != base_algo:
result = StatisticsCalculator.perform_rank_sum_test(
algo_data, base_data,
self.config.significance_level, direction
)
symbol = result.symbol
if algo in comparison_counts:
if symbol == '+':
comparison_counts[algo].plus += 1
elif symbol == '-':
comparison_counts[algo].minus += 1
else:
comparison_counts[algo].equal += 1
cell_content = self._format_cell_content(stat_value, std_value, symbol)
row[algo] = cell_content
row_ranks = self._calculate_row_ranks(algo_stat_values, direction)
for algo, rank in row_ranks.items():
algorithm_ranks[algo].append(rank)
rows.append(row)
return rows, comparison_counts, algorithm_ranks
def _calculate_row_ranks(
self,
algo_values: Dict[str, float],
direction: OptimizationDirection
) -> Dict[str, int]:
"""
Calculate the rank of each algorithm in a single row.
Parameters
----------
algo_values : Dict[str, float]
Statistical values for each algorithm.
direction : OptimizationDirection
Optimization direction (MINIMIZE or MAXIMIZE).
Returns
-------
Dict[str, int]
Rank of each algorithm (1 is the best).
"""
# Filter out NaN values
valid_algos = {algo: val for algo, val in algo_values.items() if not np.isnan(val)}
if not valid_algos:
return {algo: np.nan for algo in algo_values.keys()}
# Sort based on optimization direction
if direction == OptimizationDirection.MINIMIZE:
sorted_algos = sorted(valid_algos.items(), key=lambda x: x[1])
else:
sorted_algos = sorted(valid_algos.items(), key=lambda x: x[1], reverse=True)
# Assign ranks
ranks = {}
for rank, (algo, _) in enumerate(sorted_algos, start=1):
ranks[algo] = rank
# Set NaN for algorithms with NaN values
for algo in algo_values.keys():
if algo not in ranks:
ranks[algo] = np.nan
return ranks
def _format_cell_content(
self,
stat_value: float,
std_value: Optional[float],
symbol: str
) -> str:
"""
Format a table cell with statistic value, optional std deviation, and comparison symbol.
Parameters
----------
stat_value : float
Statistical value.
std_value : Optional[float]
Standard deviation (or None).
symbol : str
Comparison symbol.
Returns
-------
str
Formatted cell content string.
"""
if np.isnan(stat_value):
return 'N/A'
if self.config.table_format == TableFormat.EXCEL:
if self.config.statistic_type == StatisticType.MEAN:
cell_content = f"{stat_value:.4e}({std_value:.2e})"
else:
cell_content = f"{stat_value:.4e}"
if symbol:
cell_content += f" {symbol}"
else:
# LaTeX format
if self.config.statistic_type == StatisticType.MEAN:
stat_str = f"{stat_value:.4e}".replace('e-', 'e$-$')
std_str = f"{std_value:.2e}".replace('e-', 'e$-$')
cell_content = f"{stat_str}({std_str})"
else:
stat_str = f"{stat_value:.4e}".replace('e-', 'e$-$')
cell_content = stat_str
if symbol:
symbol_map = {'+': '~$+$', '-': '~$-$', '=': '~='}
cell_content += symbol_map.get(symbol, '')
return cell_content
def _find_best_value_in_row(
self,
row: Dict[str, Any],
algorithm_order: List[str],
direction: OptimizationDirection
) -> Optional[str]:
"""
Find the algorithm with the best performance in a table row.
Parameters
----------
row : Dict[str, Any]
Dictionary mapping algorithm names to formatted cell values.
algorithm_order : List[str]
List of algorithm names.
direction : OptimizationDirection
Optimization direction.
Returns
-------
Optional[str]
Name of the best-performing algorithm or None.
"""
best_val = None
best_algo = None
for algo in algorithm_order:
cell = row[algo]
if cell != 'N/A':
try:
val_str = cell.split('(')[0].replace('e$-$', 'e-')
val = float(val_str)
if direction == OptimizationDirection.MINIMIZE:
if best_val is None or val < best_val:
best_val = val
best_algo = algo
else:
if best_val is None or val > best_val:
best_val = val
best_algo = algo
except Exception:
pass
return best_algo
def _generate_excel_table(
self,
rows: List[Dict[str, Any]],
algorithm_order: List[str],
comparison_counts: Dict[str, ComparisonCounts],
algorithm_ranks: Dict[str, List[int]],
direction: OptimizationDirection
) -> pd.DataFrame:
"""
Generate and save a formatted Excel table.
Parameters
----------
rows : List[Dict[str, Any]]
Table row data.
algorithm_order : List[str]
Algorithm display order.
comparison_counts : Dict[str, ComparisonCounts]
Comparison result counts.
direction : OptimizationDirection
Optimization direction.
Returns
-------
pd.DataFrame
DataFrame containing the table data.
"""
if self.config.rank_sum_test:
summary_row = {'Problem': '+/-/=', 'Task': ''}
for algo in algorithm_order[:-1]:
counts = comparison_counts[algo]
summary_row[algo] = f"{counts.plus}/{counts.minus}/{counts.equal}"
summary_row[algorithm_order[-1]] = 'Base'
rows.append(summary_row)
avg_rank_row = {'Problem': 'Average Rank', 'Task': ''}
for algo in algorithm_order:
ranks = algorithm_ranks[algo]
valid_ranks = [r for r in ranks if not np.isnan(r)]
if valid_ranks:
avg_rank = np.mean(valid_ranks)
avg_rank_row[algo] = f"{avg_rank:.2f}"
else:
avg_rank_row[algo] = 'N/A'
rows.append(avg_rank_row)
# Create DataFrame
df = pd.DataFrame(rows)
columns = ['Problem', 'Task'] + algorithm_order
df = df[columns]
# Save and format
save_dir = Path(self.config.save_path)
save_dir.mkdir(parents=True, exist_ok=True)
output_file = save_dir / f'results_table_{self.config.statistic_type.value}.xlsx'
df.to_excel(output_file, index=False)
# Apply Excel formatting
self._apply_excel_formatting(output_file, df, algorithm_order, direction)
print(f"Excel table saved to: {output_file}")
return df
def _apply_excel_formatting(
self,
output_file: Path,
df: pd.DataFrame,
algorithm_order: List[str],
direction: OptimizationDirection
) -> None:
"""..."""
wb = load_workbook(output_file)
ws = wb.active
# Define styles
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
normal_font = Font(name='Times New Roman', size=11)
bold_font = Font(name='Times New Roman', size=11, bold=True)
# Apply formatting and auto-adjust column widths
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
cell.border = thin_border
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.font = normal_font
try:
if cell.value:
cell_length = len(str(cell.value))
if cell_length > max_length:
max_length = cell_length
except Exception:
pass
ws.column_dimensions[column_letter].width = max_length + 2
# Bold the best value in each data row
num_summary_rows = 1 # Always has Average Rank row
if self.config.rank_sum_test:
num_summary_rows += 1 # Add +/-/= row
num_data_rows = len(df) - num_summary_rows
for row_idx in range(2, num_data_rows + 2):
best_val = None
best_col = None
for col_idx, algo in enumerate(algorithm_order, start=3):
cell = ws.cell(row=row_idx, column=col_idx)
cell_value = cell.value
if cell_value and cell_value != 'N/A':
try:
val_str = str(cell_value).split('(')[0].strip()
val = float(val_str)
if direction == OptimizationDirection.MINIMIZE:
if best_val is None or val < best_val:
best_val = val
best_col = col_idx
else:
if best_val is None or val > best_val:
best_val = val
best_col = col_idx
except Exception:
pass
if best_col is not None:
ws.cell(row=row_idx, column=best_col).font = bold_font
# Bold the best (minimum) average rank
avg_rank_row_idx = len(df) + 1 # Last row in the table
best_avg_rank = None
best_avg_rank_col = None
for col_idx, algo in enumerate(algorithm_order, start=3):
cell = ws.cell(row=avg_rank_row_idx, column=col_idx)
cell_value = cell.value
if cell_value and cell_value != 'N/A':
try:
avg_rank = float(cell_value)
if best_avg_rank is None or avg_rank < best_avg_rank:
best_avg_rank = avg_rank
best_avg_rank_col = col_idx
except Exception:
pass
if best_avg_rank_col is not None:
ws.cell(row=avg_rank_row_idx, column=best_avg_rank_col).font = bold_font
wb.save(output_file)
def _generate_latex_table(
self,
rows: List[Dict[str, Any]],
algorithm_order: List[str],
comparison_counts: Dict[str, ComparisonCounts],
algorithm_ranks: Dict[str, List[int]],
direction: OptimizationDirection
) -> str:
"""..."""
df = pd.DataFrame(rows)
# Build table structure
num_cols = len(algorithm_order) + 2
col_format = '|'.join(['c'] * num_cols)
col_format = '|' + col_format + '|'
# Initialize LaTeX table
latex_str = "\\begin{table*}[htbp]\n"
latex_str += "\\renewcommand{\\arraystretch}{1.2}\n"
latex_str += "\\centering\n"
latex_str += "\\caption{Your caption here}\n"
latex_str += "\\label{tab:results}\n"
latex_str += "\\resizebox{1.0\\textwidth}{!}{\n"
latex_str += f"\\begin{{tabular}}{{{col_format}}}\n"
latex_str += "\\hline\n"
# Header row
header = "Problem & Task & " + " & ".join(algorithm_order) + " \\\\\n"
latex_str += header
latex_str += "\\hline\n"
# Data rows
for _, row in df.iterrows():
best_algo = self._find_best_value_in_row(row, algorithm_order, direction)
row_str = f"{row['Problem']} & {row['Task']}"
for algo in algorithm_order:
cell = row[algo]
if algo == best_algo:
cell = f"\\textbf{{{cell}}}"
row_str += f" & {cell}"
row_str += " \\\\\n"
latex_str += row_str
latex_str += "\\hline\n"
# Summary row
if self.config.rank_sum_test:
summary_str = "\\multicolumn{2}{|c|}{+/$-$/=}"
for algo in algorithm_order[:-1]:
counts = comparison_counts[algo]
summary_str += f" & {counts.plus}/{counts.minus}/{counts.equal}"
summary_str += " & Base \\\\\n"
latex_str += summary_str
latex_str += "\\hline\n"
# Average Rank row with best rank highlighted
avg_rank_str = "\\multicolumn{2}{|c|}{Average Rank}"
# Calculate average ranks and find the best
avg_ranks = {}
for algo in algorithm_order:
ranks = algorithm_ranks[algo]
valid_ranks = [r for r in ranks if not np.isnan(r)]
if valid_ranks:
avg_ranks[algo] = np.mean(valid_ranks)
else:
avg_ranks[algo] = np.nan
# Find algorithm with best (minimum) average rank
valid_avg_ranks = {algo: rank for algo, rank in avg_ranks.items() if not np.isnan(rank)}
best_rank_algo = min(valid_avg_ranks, key=valid_avg_ranks.get) if valid_avg_ranks else None
# Generate Average Rank row
for algo in algorithm_order:
if np.isnan(avg_ranks[algo]):
cell_content = "N/A"
else:
cell_content = f"{avg_ranks[algo]:.2f}"
# Bold the best rank
if algo == best_rank_algo:
cell_content = f"\\textbf{{{cell_content}}}"
avg_rank_str += f" & {cell_content}"
avg_rank_str += " \\\\\n"
latex_str += avg_rank_str
latex_str += "\\hline\n"
latex_str += "\\end{tabular}}\n"
latex_str += "\\end{table*}\n"
# Save to file
save_dir = Path(self.config.save_path)
save_dir.mkdir(parents=True, exist_ok=True)
output_file = save_dir / f'results_table_{self.config.statistic_type.value}.tex'
with open(output_file, 'w') as f:
f.write(latex_str)
print(f"LaTeX table saved to: {output_file}")
return latex_str
# =============================================================================
# Plot Generator Module
# =============================================================================
class PlotGenerator:
"""
Class for generating various visualization plots.
"""
def __init__(self, config: PlotConfig):
"""
Initialize PlotGenerator with configuration.
Parameters
----------
config : PlotConfig
Configuration object for plot generation.
"""
self.config = config
@staticmethod
def _calculate_legend_fontsize(n_algorithms: int) -> int:
"""
Calculate legend font size based on number of algorithms.
Linear interpolation:
- 2 algorithms → font size 14
- 15 algorithms → font size 6
Parameters
----------
n_algorithms : int
Number of algorithms.
Returns
-------
int
Calculated legend font size.
"""
if n_algorithms <= 2:
return 14
elif n_algorithms >= 15:
return 6
else:
# Linear interpolation: y = 14 - (14-6)/(15-2) * (x-2)
return int(round(14 - (8 / 13) * (n_algorithms - 2)))
def plot_convergence_curves(
self,
metric_values: Dict[str, Dict[str, Dict[int, Any]]],
best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
max_nfes: Dict[str, Dict[str, List[int]]],
algorithm_order: List[str],
metric_name: Optional[str] = None
) -> None:
"""
Generate and save convergence curve plots for all algorithms, problems, and tasks.
Parameters
----------
metric_values : Dict[str, Dict[str, Dict[int, Any]]]
Metric values per generation.
Structure: metric_values[algorithm][problem][run] = List[np.ndarray]
best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
Best metric values for representative run selection.
max_nfes : Dict[str, Dict[str, List[int]]]
Maximum number of function evaluations per task.
Structure: max_nfes[algorithm][problem] = List[int]
algorithm_order : List[str]
List of algorithm names to plot.
metric_name : Optional[str], optional
Metric name for y-axis label.
Returns
-------
None
Saves figures to disk.
"""
problems = sorted(metric_values[algorithm_order[0]].keys())
save_dir = Path(self.config.save_path)
save_dir.mkdir(parents=True, exist_ok=True)
if self.config.merge_plots:
# Merged plot mode: all problems/tasks in one figure
self._plot_merged_convergence(
metric_values, best_values, max_nfes,
algorithm_order, problems, metric_name, save_dir
)
else:
# Separate plot mode: one figure per problem/task
for prob in problems:
first_run_data = best_values[algorithm_order[0]][prob][1]
num_tasks = len(first_run_data)
for task_idx in range(num_tasks):
fig = self._create_convergence_figure(
num_tasks, metric_values, best_values, max_nfes,
algorithm_order, prob, task_idx, metric_name
)
if num_tasks == 1:
output_file = save_dir / f'{prob}.{self.config.figure_format}'
else:
output_file = save_dir / f'{prob}-Task{task_idx + 1}.{self.config.figure_format}'
fig.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close(fig)
print(f"All convergence plots saved to: {save_dir}")
def _create_convergence_figure(
self,
num_tasks: int,
metric_values: Dict,
best_values: Dict,
max_nfes: Dict,
algorithm_order: List[str],
prob: str,
task_idx: int,
metric_name: Optional[str],
ax: Optional[plt.Axes] = None,
show_legend: bool = True
) -> plt.Figure:
"""
Create a single convergence curve figure.
Parameters
----------
num_tasks : int
Total number of tasks.
metric_values : Dict
Metric values dictionary.
best_values : Dict
Best values dictionary.
max_nfes : Dict
Max NFEs dictionary.
algorithm_order : List[str]
Algorithm order.
prob : str
Problem name.
task_idx : int
Task index.
metric_name : Optional[str]
Metric name for label.
ax : Optional[plt.Axes], optional
Existing axes to plot on. If None, creates new figure.
show_legend : bool, optional
Whether to show legend. Default: True.
Returns
-------
plt.Figure
Matplotlib figure object (None if ax was provided).
"""
fig = None
if ax is None:
fig, ax = plt.subplots(figsize=(5, 3.5))
# Collect curve data for y-axis range and max NFEs for x-axis formatting
all_curves = []
actual_max_nfes = 0
# Adaptive line width and marker size based on number of algorithms
n_algos = len(algorithm_order)
if n_algos <= 4:
markersize, linewidth = 8, 2.5
elif n_algos <= 6:
markersize, linewidth = 7, 2.0
else:
markersize, linewidth = 6, 1.6
for idx, algo in enumerate(algorithm_order):
if self.config.show_std_band:
mean_curve, std_curve = self._get_convergence_mean_std(
metric_values, algo, prob, task_idx
)
curve = mean_curve
else:
selected_run = StatisticsCalculator.select_representative_run(
best_values, algo, prob, task_idx, self.config.statistic_type
)
curve = self._get_convergence_curve(metric_values, algo, prob, task_idx, selected_run)
if len(curve) == 0:
continue
all_curves.append(curve)
nfes = max_nfes[algo][prob][task_idx]
actual_max_nfes = max(actual_max_nfes, nfes)
x = np.linspace(0, nfes, len(curve))
marker_interval = max(1, len(curve) // 10)
color = self.config.colors[idx % len(self.config.colors)]
ax.plot(
x, curve, label=algo,
color=color,
marker=self.config.markers[idx % len(self.config.markers)],
markevery=marker_interval,
markersize=markersize, linewidth=linewidth, linestyle='-', alpha=0.7
)
if self.config.show_std_band and len(std_curve) > 0:
ax.fill_between(
x, curve - std_curve, curve + std_curve,
alpha=0.15, color=color
)
# Set axis labels
y_label = metric_name if metric_name is not None else 'Objective Value'
ax.set_xlabel('NFEs', fontsize=14)
ax.set_ylabel(y_label, fontsize=14)
title = f'{prob}' if num_tasks == 1 else f'{prob} - Task {task_idx + 1}'
ax.set_title(title, fontsize=14)
ax.tick_params(axis='both', which='major', labelsize=14)
# Auto-adjust legend font size based on number of algorithms
if show_legend:
legend_fontsize = self._calculate_legend_fontsize(len(algorithm_order))
ax.legend(loc='best', fontsize=legend_fontsize)
ax.grid(True, alpha=0.2, linestyle='-')
# Apply axis formatting after all settings are complete
if self.config.log_scale:
ax.set_yscale('log')
# Check data range; use linear scale if range is too small
if len(all_curves) > 0:
all_data = np.concatenate([c for c in all_curves])
y_min, y_max = np.min(all_data), np.max(all_data)
# Log scale ineffective for less than one order of magnitude
if y_max / y_min < 10:
print(
f"Warning: Data range too small for log scale ({y_min:.4f} to {y_max:.4f}), using linear scale")
ax.set_yscale('linear')
self._apply_scientific_notation(ax, actual_xmax=actual_max_nfes)
else:
# Use log scale, still need x-axis scientific notation
from matplotlib.ticker import LogFormatterSciNotation
ax.yaxis.set_major_formatter(LogFormatterSciNotation())
# Use scientific notation for x-axis if > 10000
if actual_max_nfes > 10000:
from matplotlib.ticker import ScalarFormatter
formatter = ScalarFormatter(useMathText=True)
formatter.set_scientific(True)
formatter.set_powerlimits((0, 0))
ax.xaxis.set_major_formatter(formatter)
ax.xaxis.major.formatter._useMathText = True
else:
# Apply scientific notation only for linear scale
self._apply_scientific_notation(ax, actual_xmax=actual_max_nfes)
# Disable minor ticks (must be called after set_yscale)
ax.minorticks_off()
if fig is not None:
fig.tight_layout()
return fig
def _plot_merged_convergence(
self,
metric_values: Dict,
best_values: Dict,
max_nfes: Dict,
algorithm_order: List[str],
problems: List[str],
metric_name: Optional[str],
save_dir: Path
) -> None:
"""
Create a merged figure with all convergence curves.
Parameters
----------
metric_values : Dict
Metric values dictionary.
best_values : Dict
Best values dictionary.
max_nfes : Dict
Max NFEs dictionary.
algorithm_order : List[str]
Algorithm order.
problems : List[str]
List of problem names.
metric_name : Optional[str]
Metric name for label.
save_dir : Path
Directory to save the figure.
"""
# Collect all subplot info (problem, task_idx)
subplot_info = []
for prob in problems:
first_run_data = best_values[algorithm_order[0]][prob][1]
num_tasks = len(first_run_data)
for task_idx in range(num_tasks):
subplot_info.append((prob, task_idx, num_tasks))
n_plots = len(subplot_info)
if n_plots == 0:
return
n_cols = self.config.merge_columns
n_rows = (n_plots + n_cols - 1) // n_cols
# Create figure with subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 3.5 * n_rows))
# Flatten axes array for easy iteration
if n_rows == 1 and n_cols == 1:
axes = np.array([[axes]])
elif n_rows == 1:
axes = axes.reshape(1, -1)
elif n_cols == 1:
axes = axes.reshape(-1, 1)
axes_flat = axes.flatten()
# Plot each subplot
for i, (prob, task_idx, num_tasks) in enumerate(subplot_info):
ax = axes_flat[i]
self._create_convergence_figure(
num_tasks, metric_values, best_values, max_nfes,
algorithm_order, prob, task_idx, metric_name,
ax=ax, show_legend=False
)
# Hide unused subplots
for i in range(n_plots, len(axes_flat)):
axes_flat[i].set_visible(False)
# Add single legend at the top of the figure
handles, labels = axes_flat[0].get_legend_handles_labels()
legend_fontsize = 18
# Calculate legend columns
n_legend_cols = min(len(algorithm_order), 6)
# First apply tight_layout to position subplots
fig.tight_layout(h_pad=2.0, w_pad=1.5)
# Get the top position of the first row of subplots (in figure coordinates)
first_row_top = axes_flat[0].get_position().y1
# Fixed padding between legend and first row (absolute size in cm)
legend_padding_cm = 1.0
fig_height_inch = fig.get_size_inches()[1]
legend_padding = legend_padding_cm / 2.54 / fig_height_inch # cm -> inch -> figure coords
fig.legend(
handles, labels,
loc='lower center',
ncol=n_legend_cols,
fontsize=legend_fontsize,
bbox_to_anchor=(0.5, first_row_top + legend_padding)
)
output_file = save_dir / f'convergence_merged.{self.config.figure_format}'
fig.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close(fig)
print(f"Merged convergence plot saved to: {output_file}")
def _get_convergence_curve(
self,
metric_values: Dict,
algo: str,
prob: str,
task_idx: int,
run: Optional[int]
) -> np.ndarray:
"""
Extract convergence curve for a specific configuration.
Parameters
----------
metric_values : Dict
Metric values dictionary.
algo : str
Algorithm name.
prob : str
Problem name.
task_idx : int
Task index.
run : Optional[int]
Specific run number (None for mean across runs).
Returns
-------
np.ndarray
Convergence curve values.
"""
if run is not None:
return np.array(metric_values[algo][prob][run][task_idx])
else:
all_curves = []
for r in metric_values[algo][prob].keys():
curve = np.array(metric_values[algo][prob][r][task_idx])
if len(curve) > 0:
all_curves.append(curve)
if len(all_curves) == 0:
return np.array([])
min_len = min(len(c) for c in all_curves)
truncated_curves = [c[:min_len] for c in all_curves]
return np.mean(truncated_curves, axis=0)
def _get_convergence_mean_std(
self,
metric_values: Dict,
algo: str,
prob: str,
task_idx: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute mean and standard deviation of convergence curves across all runs.
Parameters
----------
metric_values : Dict
Metric values dictionary.
algo : str
Algorithm name.
prob : str
Problem name.
task_idx : int
Task index.
Returns
-------
Tuple[np.ndarray, np.ndarray]
(mean_curve, std_curve). Both empty arrays if no data.
"""
all_curves = []
for r in metric_values[algo][prob].keys():
curve = np.array(metric_values[algo][prob][r][task_idx])
if len(curve) > 0:
all_curves.append(curve)
if len(all_curves) < 2:
return np.array([]), np.array([])
min_len = min(len(c) for c in all_curves)
truncated_curves = np.array([c[:min_len] for c in all_curves])
return np.mean(truncated_curves, axis=0).ravel(), np.std(truncated_curves, axis=0, ddof=1).ravel()
def _apply_scientific_notation(
self,
ax: plt.Axes,
actual_xmax: Optional[float] = None,
x_threshold: float = 10000,
y_threshold: float = 1000
) -> None:
"""
Apply scientific notation to axes if values exceed threshold.
Parameters
----------
ax : plt.Axes
Matplotlib axes object.
actual_xmax : Optional[float], optional
Actual maximum x value from data (not affected by matplotlib padding).
If None, uses ax.get_xlim()[1].
x_threshold : float, optional
Threshold for x-axis to use scientific notation. Default is 10000.
y_threshold : float, optional
Threshold for y-axis to use scientific notation. Default is 1000.
"""
from matplotlib.ticker import ScalarFormatter
# Use actual data max to avoid inconsistency from matplotlib padding
xmax = actual_xmax if actual_xmax is not None else ax.get_xlim()[1]
ymax = ax.get_ylim()[1]
# X-axis: use scientific notation if > threshold
if xmax > x_threshold:
formatter = ScalarFormatter(useMathText=True)
formatter.set_scientific(True)
formatter.set_powerlimits((0, 0))
ax.xaxis.set_major_formatter(formatter)
ax.xaxis.major.formatter._useMathText = True
# Y-axis: use scientific notation if > threshold
if ymax > y_threshold:
formatter = ScalarFormatter(useMathText=True)
formatter.set_scientific(True)
formatter.set_powerlimits((0, 0))
ax.yaxis.set_major_formatter(formatter)
ax.yaxis.major.formatter._useMathText = True
def plot_runtime(
self,
runtime: Dict[str, Dict[str, Dict[int, float]]],
algorithm_order: List[str]
) -> None:
"""
Generate and save a bar plot showing average runtime comparison.
Parameters
----------
runtime : Dict[str, Dict[str, Dict[int, float]]]
Runtime dictionary.
Structure: runtime[algorithm][problem][run] = float (seconds)
algorithm_order : List[str]
List of algorithm names in display order.
Returns
-------
None
Saves figure to disk.
"""
problems = sorted(runtime[algorithm_order[0]].keys(),
key=lambda x: int(''.join(filter(str.isdigit, x))) if any(c.isdigit() for c in x) else x)
save_dir = Path(self.config.save_path)
save_dir.mkdir(parents=True, exist_ok=True)
fig, ax = plt.subplots(figsize=(6, 3.5))
n_algorithms = len(algorithm_order)
n_problems = len(problems)
bar_width = 0.8 / n_algorithms
x_groups = np.arange(n_problems)
for idx, algo in enumerate(algorithm_order):
means = []
stds = []
for prob in problems:
runtimes = [runtime[algo][prob][run] for run in runtime[algo][prob].keys()]
means.append(np.mean(runtimes))
# Only calculate std if there are at least 2 data points
if len(runtimes) > 1:
stds.append(np.std(runtimes, ddof=1))
else:
stds.append(0.0) # No error bar for single data point
x_offset = x_groups + (idx - n_algorithms / 2 + 0.5) * bar_width
ax.bar(
x_offset, means, bar_width,
yerr=stds, label=algo,
color=self.config.colors[idx % len(self.config.colors)],
alpha=0.8, capsize=2,
error_kw={'linewidth': 1.2, 'ecolor': 'black', 'alpha': 0.6}
)
ax.set_ylabel('Runtime (s)', fontsize=12)
ax.set_xticks(x_groups)
ax.set_xticklabels(problems, fontsize=12)
ax.tick_params(axis='both', which='major', labelsize=10)
ax.legend(loc='best', fontsize=12, framealpha=0.7)
ax.grid(True, axis='y', alpha=0.3, linestyle='-')
fig.tight_layout()
output_file = save_dir / f'runtime_comparison.{self.config.figure_format}'
fig.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close(fig)
print(f"Runtime plot saved to: {output_file}")
def plot_nd_solutions(
self,
best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
objective_values: Dict[str, Dict[str, Dict[int, List[np.ndarray]]]],
algorithm_order: List[str],
settings: Optional[Dict[str, Any]] = None
) -> None:
"""
Generate and save non-dominated solution plots.
Parameters
----------
best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
Best values for representative run selection.
objective_values : Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
Original objective values.
Structure: objective_values[algorithm][problem][run] = List[np.ndarray]
where each np.ndarray has shape (n_solutions, n_objectives).
algorithm_order : List[str]
List of algorithm names.
settings : Optional[Dict[str, Any]], optional
Problem settings for loading true Pareto fronts.
Returns
-------
None
Saves figures to disk.
"""
nd_folder = Path(self.config.save_path) / 'ND_Solutions'
nd_folder.mkdir(parents=True, exist_ok=True)
problems = list(objective_values[algorithm_order[0]].keys())
if self.config.merge_plots:
# Merged plot mode: all algorithms for each problem/task in one figure
self._plot_merged_nd_solutions(
best_values, objective_values, algorithm_order, problems, settings, nd_folder
)
else:
# Separate plot mode: one figure per algorithm/problem/task
for algo in algorithm_order:
for prob in problems:
first_run = list(objective_values[algo][prob].keys())[0]
n_tasks = len(objective_values[algo][prob][first_run])
for task_idx in range(n_tasks):
first_run_objs = objective_values[algo][prob][first_run][task_idx]
n_objectives = first_run_objs.shape[1]
if n_objectives <= 1:
continue
selected_run = StatisticsCalculator.select_representative_run(
best_values, algo, prob, task_idx, self.config.statistic_type
)
if selected_run is None:
selected_run = 1
objectives = objective_values[algo][prob][selected_run][task_idx]
if objectives.shape[0] == 0:
continue
# Filter non-dominated solutions if requested
if self.config.show_nd:
front_no, _ = nd_sort(objectives, objectives.shape[0])
nd_solutions = objectives[front_no == 1]
else:
nd_solutions = objectives
# Load true Pareto front if requested
true_pf = None
if self.config.show_pf and settings is not None:
true_pf = DataUtils.load_reference(settings, prob, task_idx, M=n_objectives)
# Create appropriate plot based on number of objectives
fig = self._create_nd_plot(nd_solutions, true_pf, n_objectives, n_tasks, prob, task_idx, algo)
# Save figure
if n_tasks == 1:
filename = f'{prob}-{algo}.{self.config.figure_format}'
else:
filename = f'{prob}-Task{task_idx + 1}-{algo}.{self.config.figure_format}'
fig.savefig(nd_folder / filename, dpi=300)
plt.close(fig)
print(f"All non-dominated solutions plots saved to: {nd_folder}\n")
def _plot_merged_nd_solutions(
self,
best_values: Dict,
objective_values: Dict,
algorithm_order: List[str],
problems: List[str],
settings: Optional[Dict[str, Any]],
nd_folder: Path
) -> None:
"""
Create merged figures for non-dominated solutions.
Each merged figure contains all algorithms for a specific problem/task.
Parameters
----------
best_values : Dict
Best values dictionary.
objective_values : Dict
Objective values dictionary.
algorithm_order : List[str]
Algorithm order.
problems : List[str]
List of problem names.
settings : Optional[Dict[str, Any]]
Problem settings.
nd_folder : Path
Output folder path.
"""
for prob in problems:
first_algo = algorithm_order[0]
first_run = list(objective_values[first_algo][prob].keys())[0]
n_tasks = len(objective_values[first_algo][prob][first_run])
for task_idx in range(n_tasks):
# Get n_objectives for this specific task
task_objs = objective_values[first_algo][prob][first_run][task_idx]
n_objectives = task_objs.shape[1]
if n_objectives <= 1:
continue
n_algos = len(algorithm_order)
# One column per algorithm, single row
n_cols = n_algos
n_rows = 1
# Determine if 3D plot is needed
is_3d = n_objectives == 3
if is_3d:
fig = plt.figure(figsize=(4.5 * n_cols, 3.5 * n_rows))
else:
fig, axes = plt.subplots(n_rows, n_cols, figsize=(4.5 * n_cols, 3.5 * n_rows))
# Flatten axes array
if n_cols == 1:
axes = np.array([axes])
axes_flat = axes.flatten()
# Load true Pareto front once
true_pf = None
if self.config.show_pf and settings is not None:
true_pf = DataUtils.load_reference(settings, prob, task_idx, M=n_objectives)
# Build subplot title prefix
task_label = f'{prob} - Task {task_idx + 1}' if n_tasks > 1 else prob
for idx, algo in enumerate(algorithm_order):
selected_run = StatisticsCalculator.select_representative_run(
best_values, algo, prob, task_idx, self.config.statistic_type
)
if selected_run is None:
selected_run = 1
objectives = objective_values[algo][prob][selected_run][task_idx]
if objectives.shape[0] == 0:
continue
# Filter non-dominated solutions
if self.config.show_nd:
front_no, _ = nd_sort(objectives, objectives.shape[0])
nd_solutions = objectives[front_no == 1]
else:
nd_solutions = objectives
# Subplot title: "P1 - Task 1 - RVEA" or "P1 - RVEA"
subplot_title = f'{task_label} - {algo}'
# Create subplot
if is_3d:
ax = fig.add_subplot(n_rows, n_cols, idx + 1, projection='3d')
self._plot_nd_subplot_3d(ax, nd_solutions, true_pf, subplot_title)
else:
ax = axes_flat[idx]
self._plot_nd_subplot_2d(ax, nd_solutions, true_pf, n_objectives, subplot_title)
fig.tight_layout()
# Save figure
if n_tasks == 1:
filename = f'{prob}_merged.{self.config.figure_format}'
else:
filename = f'{prob}-Task{task_idx + 1}_merged.{self.config.figure_format}'
fig.savefig(nd_folder / filename, dpi=300, bbox_inches='tight')
plt.close(fig)
print(f"Merged non-dominated solutions plots saved to: {nd_folder}\n")
def _plot_nd_subplot_2d(
self,
ax: plt.Axes,
nd_solutions: np.ndarray,
true_pf: Optional[np.ndarray],
n_objectives: int,
title: str
) -> None:
"""Plot 2D or parallel coordinates subplot for merged ND solutions."""
if n_objectives == 2:
if true_pf is not None and true_pf.shape[1] == 2:
sort_idx = np.argsort(true_pf[:, 0])
sorted_pf = true_pf[sort_idx]
ax.scatter(sorted_pf[:, 0], sorted_pf[:, 1],
c='gray', s=2, linewidth=0.1, zorder=1)
ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1],
c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
linewidth=0.8, zorder=2)
ax.set_xlabel('$f_1$', fontsize=12)
ax.set_ylabel('$f_2$', fontsize=12)
ax.grid(True, alpha=0.2, linestyle='-')
else:
# Parallel coordinates
for i in range(nd_solutions.shape[0]):
ax.plot(range(n_objectives), nd_solutions[i, :],
'b-', alpha=0.3, linewidth=0.8)
ax.set_xlabel('Objective', fontsize=12)
ax.set_ylabel('Value', fontsize=12)
ax.set_xticks(range(n_objectives))
ax.set_xticklabels([rf'$f_{{{i + 1}}}$' for i in range(n_objectives)])
ax.grid(True, alpha=0.3, linestyle='--')
ax.set_title(title, fontsize=12)
def _plot_nd_subplot_3d(
self,
ax: plt.Axes,
nd_solutions: np.ndarray,
true_pf: Optional[np.ndarray],
title: str
) -> None:
"""Plot 3D subplot for merged ND solutions."""
if true_pf is not None and true_pf.shape[1] == 3:
ax.scatter(true_pf[:, 0], true_pf[:, 1], true_pf[:, 2],
c='gray', s=4, alpha=0.2, zorder=1, depthshade=True)
ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1], nd_solutions[:, 2],
c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
linewidth=0.8, zorder=2, depthshade=True)
ax.set_xlabel('$f_1$', fontsize=10)
ax.set_ylabel('$f_2$', fontsize=10)
ax.set_zlabel('$f_3$', fontsize=10)
ax.set_title(title, fontsize=12)
ax.view_init(elev=20, azim=60)
def _create_nd_plot(
self,
nd_solutions: np.ndarray,
true_pf: Optional[np.ndarray],
n_objectives: int,
n_tasks: int,
prob: str,
task_idx: int,
algo: str
) -> plt.Figure:
"""
Create a non-dominated solution plot.
Parameters
----------
nd_solutions : np.ndarray
Non-dominated solutions array with shape (n_solutions, n_objectives).
true_pf : Optional[np.ndarray]
True Pareto front array.
n_objectives : int
Number of objectives.
n_tasks : int
Total number of tasks.
prob : str
Problem name.
task_idx : int
Task index.
algo : str
Algorithm name.
Returns
-------
plt.Figure
Matplotlib figure object.
"""
fig = plt.figure(figsize=(4.5, 3.5))
if n_objectives == 2:
ax = fig.add_subplot(111)
if true_pf is not None and true_pf.shape[1] == 2:
sort_idx = np.argsort(true_pf[:, 0])
sorted_pf = true_pf[sort_idx]
ax.scatter(sorted_pf[:, 0], sorted_pf[:, 1],
c='gray', s=2, linewidth=0.1, label='True PF', zorder=1)
ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1],
c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
linewidth=0.8, label='ND Solutions', zorder=2)
ax.set_xlabel('$f_1$', fontsize=12)
ax.set_ylabel('$f_2$', fontsize=12)
ax.grid(True, alpha=0.2, linestyle='-')
elif n_objectives == 3:
ax = fig.add_subplot(111, projection='3d')
if true_pf is not None and true_pf.shape[1] == 3:
ax.scatter(true_pf[:, 0], true_pf[:, 1], true_pf[:, 2],
c='gray', s=4, alpha=0.2, label='True PF', zorder=1, depthshade=True)
ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1], nd_solutions[:, 2],
c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
linewidth=0.8, label='ND Solutions', zorder=2, depthshade=True)
ax.set_xlabel('$f_1$', fontsize=12)
ax.set_ylabel('$f_2$', fontsize=12)
ax.set_zlabel('$f_3$', fontsize=12)
ax.view_init(elev=20, azim=60)
else:
# Parallel coordinates for many-objective
ax = fig.add_subplot(111)
for i in range(nd_solutions.shape[0]):
ax.plot(range(n_objectives), nd_solutions[i, :],
'b-', alpha=0.3, linewidth=0.8)
ax.set_xlabel('Objective', fontsize=12)
ax.set_ylabel('Value', fontsize=12)
ax.set_xticks(range(n_objectives))
ax.set_xticklabels([rf'$f_{{{i + 1}}}$' for i in range(n_objectives)])
ax.grid(True, alpha=0.3, linestyle='--')
title = f'{prob} - {algo}' if n_tasks == 1 else f'{prob} - Task{task_idx + 1} - {algo}'
plt.title(title, fontsize=10)
plt.tight_layout()
return fig
# =============================================================================
# Main Data Analyzer Class
# =============================================================================
[docs]
class DataAnalyzer:
"""
Main class for comprehensive data analysis and visualization of multi-task optimization experiments.
This class provides a complete pipeline for:
- Scanning data directories to detect algorithms, problems, and runs
- Calculating performance metrics (IGD, HV, or objective values)
- Generating statistical comparison tables (Excel or LaTeX)
- Creating convergence curve plots
- Visualizing runtime comparisons
- Plotting non-dominated solutions
Attributes
----------
data_path : Path
Path to the data directory containing experiment results.
settings : Optional[Dict[str, Any]]
Problem settings including reference definitions and metric configuration.
algorithm_order : Optional[List[str]]
Custom ordering of algorithms for display.
table_config : TableConfig
Configuration for table generation.
plot_config : PlotConfig
Configuration for plot generation.
"""
def __init__(
self,
data_path: Union[str, Path] = './Data',
settings: Optional[Dict[str, Any]] = None,
algorithm_order: Optional[List[str]] = None,
save_path: Union[str, Path] = './Results',
table_format: str = 'excel',
figure_format: str = 'pdf',
statistic_type: str = 'mean',
significance_level: float = 0.05,
rank_sum_test: bool = True,
log_scale: bool = False,
show_pf: bool = True,
show_nd: bool = True,
merge_plots: bool = False,
merge_columns: int = 3,
show_std_band: bool = False,
best_so_far: bool = True,
clear_results: bool = True,
convergence_k: Optional[int] = None
):
"""
Initialize DataAnalyzer with configuration parameters.
Parameters
----------
data_path : Union[str, Path], optional
Path to data directory containing algorithm subdirectories.
Each subdirectory should contain pickle files named: ALGO_problem_run.pkl
Default: './Data'
settings : Optional[Dict[str, Any]], optional
Problem settings dictionary containing:
- Problem names as keys (e.g., 'P1', 'P2')
- Task definitions as nested dictionaries
- 'metric': str ('IGD' or 'HV')
- 'ref_path': str (path to reference files)
- 'n_ref': int (number of reference points)
Default: None (single-objective mode)
algorithm_order : Optional[List[str]], optional
Custom ordering of algorithms for display.
The last algorithm is used as baseline for statistical tests.
Default: None (alphabetical order)
save_path : Union[str, Path], optional
Directory path to save all output files.
Default: './Results'
table_format : str, optional
Output table format: 'excel' or 'latex'.
Default: 'excel'
figure_format : str, optional
Output figure format: 'pdf', 'png', 'svg', etc.
Default: 'pdf'
statistic_type : str, optional
Type of statistic: 'mean', 'median', 'max', 'min'.
Default: 'mean'
significance_level : float, optional
P-value threshold for statistical significance testing.
Default: 0.05
rank_sum_test : bool, optional
Whether to perform Wilcoxon rank-sum test.
Default: True
log_scale : bool, optional
Whether to use logarithmic scale for convergence plot y-axis.
Default: False
show_pf : bool, optional
Whether to show true Pareto front in ND solution plots.
Default: True
show_nd : bool, optional
Whether to filter and show only non-dominated solutions.
Default: True
merge_plots : bool, optional
Whether to merge all plots into a single figure.
Default: False
merge_columns : int, optional
Number of columns in merged plot layout.
Default: 3
show_std_band : bool, optional
Whether to show standard deviation band on convergence curves.
Default: False
best_so_far : bool, optional
Whether to use best-so-far metric values.
Default: True
clear_results : bool, optional
Whether to clear existing results folder before analysis.
Default: True
convergence_k : Optional[int], optional
Number of data points to sample from convergence curves for export.
If None, no convergence data is exported.
Default: None
"""
self.data_path = Path(data_path)
self.settings = settings
self.algorithm_order = algorithm_order
self.best_so_far = best_so_far
self.clear_results = clear_results
# Parse enums
stat_type = StatisticType(statistic_type)
tbl_format = TableFormat(table_format)
# Initialize configurations
self.table_config = TableConfig(
table_format=tbl_format,
statistic_type=stat_type,
significance_level=significance_level,
rank_sum_test=rank_sum_test,
save_path=Path(save_path)
)
self.plot_config = PlotConfig(
figure_format=figure_format,
statistic_type=stat_type,
log_scale=log_scale,
show_pf=show_pf,
show_nd=show_nd,
merge_plots=merge_plots,
merge_columns=merge_columns,
show_std_band=show_std_band,
save_path=Path(save_path)
)
self.convergence_k = convergence_k
# Internal state
self._scan_result: Optional[ScanResult] = None
self._metric_results: Optional[MetricResults] = None
def scan_data(self) -> ScanResult:
"""
Scan the data directory to detect algorithms, problems, run counts.
Returns
-------
ScanResult
Dataclass containing:
- algorithms: List[str] - Sorted list of algorithm names
- problems: List[str] - Sorted list of problem names
- runs: int - Number of independent runs
- data_path: Path - Path to scanned directory
Raises
------
FileNotFoundError
If data_path does not exist.
ValueError
If no algorithm directories or pickle files found.
"""
algorithms = []
problems = []
runs_dict = {}
for algo_dir in [d for d in self.data_path.iterdir() if d.is_dir()]:
algo = algo_dir.name
algorithms.append(algo)
runs_dict[algo] = {}
for pkl in algo_dir.glob('*.pkl'):
stem = pkl.stem
prefix = algo + '_'
if stem.startswith(prefix):
remainder = stem[len(prefix):]
last_underscore = remainder.rfind('_')
if last_underscore > 0:
prob = remainder[:last_underscore]
runs_dict[algo].setdefault(prob, []).append(pkl)
if prob not in problems:
problems.append(prob)
algorithms.sort()
problems.sort(key=lambda x: int(''.join(filter(str.isdigit, x))) if any(c.isdigit() for c in x) else x)
# problems.sort()
first_algo = algorithms[0]
first_prob = problems[0]
runs = len(runs_dict[first_algo][first_prob])
print(f"Found {len(algorithms)} algorithms: {algorithms}")
print(f"Found {len(problems)} problems: {problems}")
print(f"Run times: {runs}")
self._scan_result = ScanResult(
algorithms=algorithms,
problems=problems,
runs=runs,
data_path=self.data_path
)
return self._scan_result
def calculate_metrics(self) -> MetricResults:
"""
Calculate metric values for all algorithms, problems, and runs.
Returns
-------
MetricResults
Dataclass containing all computed metrics:
- metric_values: Metric values per generation
- best_values: Final best metric values
- objective_values: Original objective values
- runtime: Runtime in seconds
- max_nfes: Maximum function evaluations
- metric_name: Name of metric used
Raises
------
RuntimeError
If scan_data() has not been called.
"""
if self._scan_result is None:
self.scan_data()
scan = self._scan_result
algo_order = self.algorithm_order if self.algorithm_order else scan.algorithms
metric_name = self.settings.get('metric') if self.settings else None
# Initialize storage dictionaries
all_values = {algo: {prob: {} for prob in scan.problems} for algo in algo_order}
all_values_best_so_far = {algo: {prob: {} for prob in scan.problems} for algo in algo_order}
all_best_values = {algo: {prob: {} for prob in scan.problems} for algo in algo_order}
original_objective_values = {algo: {prob: {} for prob in scan.problems} for algo in algo_order}
all_runtime = {algo: {prob: {} for prob in scan.problems} for algo in algo_order}
all_max_nfes = {algo: {prob: None for prob in scan.problems} for algo in algo_order}
total = len(algo_order) * len(scan.problems) * scan.runs
pbar = tqdm(total=total, desc="Calculating metric values", dynamic_ncols=False, delay=0.2)
for algo in algo_order:
for prob in scan.problems:
for run in range(1, scan.runs + 1):
pkl_file = f"{algo}_{prob}_{run}.pkl"
pkl_path = self.data_path / algo / pkl_file
data = DataUtils.load_pickle(pkl_path)
metric_values, metric_values_best_bs = self._get_single_run_metric_value(data, prob)
all_values[algo][prob][run] = metric_values
all_values_best_so_far[algo][prob][run] = metric_values_best_bs
last_vals = [
np.asarray(task_arr).ravel()[-1] if len(task_arr) > 0 else np.nan
for task_arr in metric_values_best_bs
]
all_best_values[algo][prob][run] = last_vals
last_objs = [data['all_objs'][t][-1] for t in range(len(data['all_objs']))]
original_objective_values[algo][prob][run] = last_objs
all_runtime[algo][prob][run] = data['runtime']
if all_max_nfes[algo][prob] is None:
all_max_nfes[algo][prob] = data['max_nfes']
pbar.update(1)
pbar.close()
selected = all_values_best_so_far if self.best_so_far else all_values
self._metric_results = MetricResults(
metric_values=selected,
best_values=all_best_values,
objective_values=original_objective_values,
runtime=all_runtime,
max_nfes=all_max_nfes,
metric_name=metric_name
)
return self._metric_results
def _get_single_run_metric_value(
self,
data: Dict[str, Any],
prob: str
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""
Calculate metric values for a single run.
Parameters
----------
data : Dict[str, Any]
Loaded pickle data containing 'all_objs' key.
prob : str
Problem name for loading references.
Returns
-------
Tuple[List[np.ndarray], List[np.ndarray]]
Tuple of (metric_values, metric_values_best_so_far).
Each is a list of arrays, one per task.
"""
all_decs = data['all_decs']
all_objs = data['all_objs']
all_cons = data.get('all_cons', None)
n_tasks = len(all_objs)
n_gens_per_task = [len(all_objs[t]) for t in range(n_tasks)]
metric_values = [np.zeros((n_gens_per_task[t], 1)) for t in range(n_tasks)]
metric_values_best_so_far = [np.zeros((n_gens_per_task[t], 1)) for t in range(n_tasks)]
for t in range(n_tasks):
task_key = f'T{t + 1}'
best_so_far = None
reference = None
if self.settings is not None and n_gens_per_task[t] > 0:
M = all_objs[t][0].shape[1]
D = all_decs[t][0].shape[1]
C = all_cons[t][0].shape[1] if all_cons is not None else 0
reference = DataUtils.load_reference(
self.settings,
prob,
task_key,
M=M,
D=D,
C=C
)
for gen in range(n_gens_per_task[t]):
objs_tgen = all_objs[t][gen]
cons_tgen = all_cons[t][gen] if all_cons is not None else None
M = objs_tgen.shape[1]
if M == 1:
metric_value = np.min(objs_tgen[:, 0])
sign = -1
else:
if self.settings is None:
raise ValueError('Multi-objective metric calculation requires settings parameter')
metric_name = self.settings.get('metric')
if metric_name == 'IGD':
metric_instance = IGD()
metric_value = metric_instance.calculate(objs_tgen, reference)
sign = metric_instance.sign
elif metric_name == 'HV':
metric_instance = HV()
# If reference is 1D or single row, treat as ref point; otherwise as PF
if reference.ndim == 1 or reference.shape[0] == 1:
ref_point = reference.flatten()
metric_value = metric_instance.calculate(objs_tgen, reference=ref_point)
else:
metric_value = metric_instance.calculate(objs_tgen, pf=reference)
sign = metric_instance.sign
elif metric_name == 'IGDp':
metric_instance = IGDp()
metric_value = metric_instance.calculate(objs_tgen, reference)
sign = metric_instance.sign
elif metric_name == 'GD':
metric_instance = GD()
metric_value = metric_instance.calculate(objs_tgen, reference)
sign = metric_instance.sign
elif metric_name == 'DeltaP':
metric_instance = DeltaP()
metric_value = metric_instance.calculate(objs_tgen, reference)
sign = metric_instance.sign
elif metric_name == 'Spacing':
metric_instance = Spacing()
metric_value = metric_instance.calculate(objs_tgen)
sign = metric_instance.sign
elif metric_name == 'Spread':
metric_instance = Spread()
metric_value = metric_instance.calculate(objs_tgen, reference)
sign = metric_instance.sign
elif metric_name == 'FR':
if cons_tgen is None:
raise ValueError('FR metric requires constraint data, but all_cons is not available')
metric_instance = FR()
metric_value = metric_instance.calculate(cons_tgen)
sign = metric_instance.sign
elif metric_name == 'CV':
if cons_tgen is None:
raise ValueError('CV metric requires constraint data, but all_cons is not available')
metric_instance = CV()
metric_value = metric_instance.calculate(cons_tgen)
sign = metric_instance.sign
else:
raise ValueError(f'Unsupported metric: {metric_name}')
metric_values[t][gen, 0] = metric_value
if best_so_far is None:
best_so_far = metric_value
else:
if sign == -1:
best_so_far = min(best_so_far, metric_value)
else:
best_so_far = max(best_so_far, metric_value)
metric_values_best_so_far[t][gen, 0] = best_so_far
return metric_values, metric_values_best_so_far
def generate_tables(self) -> Union[pd.DataFrame, str]:
"""
Generate comparison tables with statistical analysis.
Returns
-------
Union[pd.DataFrame, str]
DataFrame for Excel format, LaTeX string for LaTeX format.
Raises
------
RuntimeError
If calculate_metrics() has not been called.
"""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
table_gen = TableGenerator(self.table_config)
return table_gen.generate(
self._metric_results.best_values,
algo_order,
self._metric_results.metric_name
)
def generate_convergence_plots(self) -> None:
"""
Generate and save convergence curve plots.
Returns
-------
None
Saves figures to disk at configured save_path.
Raises
------
RuntimeError
If calculate_metrics() has not been called.
"""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
plot_gen = PlotGenerator(self.plot_config)
plot_gen.plot_convergence_curves(
self._metric_results.metric_values,
self._metric_results.best_values,
self._metric_results.max_nfes,
algo_order,
self._metric_results.metric_name
)
def generate_runtime_plots(self) -> None:
"""
Generate and save runtime comparison bar plots.
Returns
-------
None
Saves figure to disk at configured save_path.
Raises
------
RuntimeError
If calculate_metrics() has not been called.
"""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
plot_gen = PlotGenerator(self.plot_config)
plot_gen.plot_runtime(self._metric_results.runtime, algo_order)
def generate_nd_solution_plots(self) -> None:
"""
Generate and save non-dominated solution visualization plots.
Returns
-------
None
Saves figures to disk at configured save_path/ND_Solutions/.
Raises
------
RuntimeError
If calculate_metrics() has not been called.
"""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
plot_gen = PlotGenerator(self.plot_config)
plot_gen.plot_nd_solutions(
self._metric_results.best_values,
self._metric_results.objective_values,
algo_order,
self.settings
)
def export_convergence_data(self, k: Optional[int] = None) -> None:
"""
Export convergence curve data to text files.
For each problem-task combination, exports a file containing evaluation
counts paired with convergence values for all algorithms.
Parameters
----------
k : Optional[int], optional
Number of data points to sample from each convergence curve.
If None, uses self.convergence_k. If both are None, exports all points.
"""
if self._metric_results is None:
self.calculate_metrics()
k = k if k is not None else self.convergence_k
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
metric_values = self._metric_results.metric_values
best_values = self._metric_results.best_values
max_nfes = self._metric_results.max_nfes
problems = sorted(metric_values[algo_order[0]].keys())
save_dir = Path(self.plot_config.save_path) / 'Convergence_Data'
save_dir.mkdir(parents=True, exist_ok=True)
plot_gen = PlotGenerator(self.plot_config)
for prob_idx, prob in enumerate(problems):
first_run_data = best_values[algo_order[0]][prob][1]
num_tasks = len(first_run_data)
for task_idx in range(num_tasks):
filename = f'Problem{prob_idx + 1}_task{task_idx + 1}.txt'
filepath = save_dir / filename
with open(filepath, 'w') as f:
for algo in algo_order:
selected_run = StatisticsCalculator.select_representative_run(
best_values, algo, prob, task_idx,
self.plot_config.statistic_type
)
curve = plot_gen._get_convergence_curve(
metric_values, algo, prob, task_idx, selected_run
)
if len(curve) == 0:
continue
nfes = max_nfes[algo][prob][task_idx]
x = np.linspace(0, nfes, len(curve))
# Sample k points if requested
if k is not None and k > 0 and len(curve) > k:
indices = np.linspace(0, len(curve) - 1, k, dtype=int)
x = x[indices]
curve = curve[indices]
f.write(f'# Algorithm: {algo}\n')
f.write(f'# NFEs\tValue\n')
for xi, yi in zip(x, curve):
f.write(f'{float(xi):.6g}\t{float(yi):.6g}\n')
f.write('\n')
print(f"Convergence data exported to: {save_dir}")
[docs]
def run(self) -> MetricResults:
"""
Execute the complete analysis pipeline.
This method runs all analysis steps in sequence:
1. Clear existing results (if configured)
2. Scan data directory
3. Calculate metrics
4. Generate statistical tables
5. Generate convergence plots
6. Generate runtime plots
7. Generate non-dominated solution plots
Returns
-------
MetricResults
Complete metric results from the analysis.
"""
print("=" * 60)
print('🚀🚀🚀 Starting Data Analysis Pipeline! 🚀🚀🚀')
print("=" * 60)
# Step 0: Clear results folder if requested
if self.clear_results:
results_path = self.table_config.save_path
if results_path.exists():
print(f'\n♻️ Clearing existing results folder: {results_path}')
shutil.rmtree(results_path)
results_path.mkdir(parents=True, exist_ok=True)
# Step 1: Scan data
print('\n🔍 Scanning data directory...')
self.scan_data()
# Step 2: Calculate metrics
print('\n📊 Calculating metric values...')
self.calculate_metrics()
# Step 3: Generate tables
print('\n📋 Generating statistical tables...')
self.generate_tables()
# Step 4: Plot convergence curves
print('\n📈 Plotting convergence curves...')
self.generate_convergence_plots()
# Step 4.5: Export convergence data
if self.convergence_k is not None:
print('\n📂 Exporting convergence data...')
self.export_convergence_data()
# Step 5: Plot runtime
print('\n⏱️ Plotting runtime comparison...')
self.generate_runtime_plots()
# Step 6: Plot non-dominated solutions
print('\n🎯 Plotting non-dominated solutions...')
self.generate_nd_solution_plots()
print("=" * 60)
print('🎉🎉🎉 Data Analysis Completed! 🎉🎉🎉')
print("=" * 60)
return self._metric_results
# =============================================================================
# Module Entry Point and Usage Examples
# =============================================================================
if __name__ == '__main__':
"""
Usage Examples for DataAnalyzer Module
======================================
This module provides a comprehensive analysis pipeline for multi-task
optimization experiments. Below are various usage patterns.
Example 1: Quick Start - Full Pipeline
--------------------------------------
Run complete analysis with default settings::
from data_analyzer import DataAnalyzer
analyzer = DataAnalyzer(data_path='./Data')
results = analyzer.run()
Example 2: Multi-Objective Optimization with Custom Settings
------------------------------------------------------------
Analyze multi-objective results with IGD metric::
from data_analyzer import DataAnalyzer
# Define problem settings with Pareto front references
SETTINGS = {
'metric': 'IGD',
'ref_path': './MOReference',
'n_ref': 10000,
'P1': {
'T1': 'P1_T1_ref.npy',
'T2': 'P1_T2_ref.npy',
},
'P2': {
'T1': lambda n, m: generate_pf(n, m), # Callable reference
}
}
analyzer = DataAnalyzer(
data_path='./Data',
settings=SETTINGS,
save_path='./Results',
table_format='latex',
figure_format='pdf'
)
results = analyzer.run()
Example 3: Step-by-Step Analysis
--------------------------------
Execute individual analysis steps for fine-grained control::
from data_analyzer import DataAnalyzer
analyzer = DataAnalyzer(
data_path='./Data',
settings=SETTINGS,
algorithm_order=['NSGA-II', 'MOEA/D', 'MyAlgo'], # Last is baseline
clear_results=False
)
# Step 1: Scan data directory
scan_result = analyzer.scan_data()
print(f"Found algorithms: {scan_result.algorithms}")
print(f"Found problems: {scan_result.problems}")
# Step 2: Calculate metrics
metric_results = analyzer.calculate_metrics()
# Step 3: Generate only specific outputs
analyzer.generate_tables() # Statistical comparison tables
analyzer.generate_convergence_plots() # Convergence curves
analyzer.generate_runtime_plots() # Runtime bar charts
analyzer.generate_nd_solution_plots() # Pareto front visualizations
Example 4: Custom Table Generation
----------------------------------
Generate tables with specific statistical settings::
from data_analyzer import (
DataAnalyzer, TableGenerator, TableConfig,
TableFormat, StatisticType
)
# Create custom table configuration
table_config = TableConfig(
table_format=TableFormat.LATEX,
statistic_type=StatisticType.MEDIAN,
significance_level=0.01,
rank_sum_test=True,
save_path=Path('./CustomResults')
)
# Use with analyzer
analyzer = DataAnalyzer(data_path='./Data', settings=SETTINGS)
analyzer.scan_data()
analyzer.calculate_metrics()
# Generate table with custom config
table_gen = TableGenerator(table_config)
latex_table = table_gen.generate(
analyzer._metric_results.best_values,
algorithm_order=['Algo1', 'Algo2', 'Baseline'],
metric_name='IGD'
)
Example 5: Custom Plot Generation
---------------------------------
Create plots with specific visual settings::
from data_analyzer import DataAnalyzer, PlotGenerator, PlotConfig, StatisticType
# Create custom plot configuration
plot_config = PlotConfig(
figure_format='png',
statistic_type=StatisticType.MEDIAN,
log_scale=True,
show_pf=True,
show_nd=True,
save_path=Path('./Figures'),
colors=['#E41A1C', '#377EB8', '#4DAF4A'], # Custom colors
markers=['o', 's', '^']
)
analyzer = DataAnalyzer(data_path='./Data', settings=SETTINGS)
analyzer.calculate_metrics()
# Generate plots with custom config
plot_gen = PlotGenerator(plot_config)
plot_gen.plot_convergence_curves(
analyzer._metric_results.metric_values,
analyzer._metric_results.best_values,
analyzer._metric_results.max_nfes,
algorithm_order=['Algo1', 'Algo2'],
metric_name='IGD'
)
Example 6: Customizing Plot Font Sizes and Legend
-------------------------------------------------
Control font sizes, line styles, and legend appearance in convergence plots.
The following parameters can be customized in PlotGenerator._create_convergence_figure():
Font sizes (hardcoded, modify in source if needed)::
ax.set_xlabel('NFEs', fontsize=14) # X-axis label font size
ax.set_ylabel(y_label, fontsize=14) # Y-axis label font size
ax.set_title(title, fontsize=14) # Title font size
ax.tick_params(..., labelsize=14) # Tick label font size
Line width and marker size (adaptive based on algorithm count)::
# 1-4 algorithms: markersize=8, linewidth=2.5
# 5-6 algorithms: markersize=7, linewidth=2.0
# 7+ algorithms: markersize=6, linewidth=1.6
Legend font size (adaptive, see _calculate_legend_fontsize())::
# 2 algorithms: fontsize=14
# 15 algorithms: fontsize=6
# Linear interpolation for values in between
Merged plot legend (fixed size)::
# In _plot_combined_convergence_for_problem():
legend_fontsize = 18 # Fixed legend font size
legend_padding_cm = 0.3 # Gap between legend and plots (cm)
n_legend_cols = min(len(algorithms), 6) # Max 6 columns in legend
To modify these values, edit the corresponding methods in PlotGenerator class.
Example 7: Access Raw Results
-----------------------------
Access computed metrics for custom analysis::
from data_analyzer import DataAnalyzer
analyzer = DataAnalyzer(data_path='./Data', settings=SETTINGS)
results = analyzer.run()
# Access metric values
# Structure: results.metric_values[algo][problem][run][task_idx]
algo1_p1_run1_task0 = results.metric_values['Algo1']['P1'][1][0]
# Access best values
# Structure: results.best_values[algo][problem][run] = [task0_val, task1_val, ...]
best_vals = results.best_values['Algo1']['P1'][1]
# Access objective values (Pareto solutions)
# Structure: results.objective_values[algo][problem][run][task_idx] = np.ndarray
pareto_solutions = results.objective_values['Algo1']['P1'][1][0]
# Access runtime
runtime_seconds = results.runtime['Algo1']['P1'][1]
# Access max NFEs per task
max_nfes_list = results.max_nfes['Algo1']['P1']
Example 8: Using Utility Classes Directly
-----------------------------------------
Use statistics and data utilities independently::
from data_analyzer import (
StatisticsCalculator, DataUtils,
StatisticType, OptimizationDirection
)
import numpy as np
# Calculate statistics
data = [1.2, 1.5, 1.1, 1.3, 1.4]
mean, std = StatisticsCalculator.calculate_statistic(data, StatisticType.MEAN)
# Perform statistical comparison
algo_data = [1.0, 1.1, 0.9, 1.2]
base_data = [2.0, 2.1, 1.9, 2.2]
result = StatisticsCalculator.perform_rank_sum_test(
algo_data, base_data,
significance_level=0.05,
direction=OptimizationDirection.MINIMIZE
)
print(f"Comparison: {result.symbol}, p-value: {result.p_value}")
# Load reference data
reference = DataUtils.load_reference(
settings=SETTINGS,
problem='P1',
task_identifier=0, # or 'T1'
n_objectives=2
)
Data Directory Structure
------------------------
Expected directory structure for input data::
./Data/
├── Algorithm1/
│ ├── Algorithm1_Problem1_1.pkl
│ ├── Algorithm1_Problem1_2.pkl
│ ├── Algorithm1_Problem2_1.pkl
│ └── ...
├── Algorithm2/
│ ├── Algorithm2_Problem1_1.pkl
│ └── ...
└── ...
Each .pkl file should contain a dictionary with keys:
- 'all_objs': List[List[np.ndarray]] - Objectives per task per generation
- 'runtime': float - Total runtime in seconds
- 'max_nfes': List[int] - Max function evaluations per task
Output Structure
----------------
Generated output files::
./Results/
├── results_table_mean.xlsx # or .tex for LaTeX
├── Problem1.pdf # Convergence plot (single task)
├── Problem2-Task1.pdf # Convergence plot (multi-task)
├── Problem2-Task2.pdf
├── runtime_comparison.pdf # Runtime bar chart
└── ND_Solutions/
├── Problem1-Algorithm1.pdf # Pareto front plot
├── Problem1-Algorithm2.pdf
└── ...
"""
# Demo: Run analysis with sample configuration
print("DataAnalyzer Module - Demo Run")
print("=" * 50)
# Example configuration (modify paths as needed)
analyzer = DataAnalyzer(
data_path='./Data',
save_path='./Results',
table_format='excel',
figure_format='pdf',
statistic_type='mean',
significance_level=0.05,
rank_sum_test=True,
log_scale=False,
show_pf=True,
show_nd=True,
clear_results=True
)
# Run complete analysis pipeline
results = analyzer.run()