"""
Test Data Analyzer Module for Single-Run Algorithm Testing
This module provides a lightweight analysis pipeline for quick algorithm testing,
reading pickle files directly from the data folder for single-run visualization.
Classes:
TestScanResult: Dataclass for storing test scan results
TestMetricResults: Dataclass for storing test metric results
TestDataAnalyzer: Main class for test data analysis
Usage:
analyzer = TestDataAnalyzer(data_path='./Data', save_path='./Results')
results = analyzer.run()
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.10.10
Version: 2.0
"""
import shutil
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter, LogFormatterSciNotation
# Import from project modules
from ddmtolab.Methods.metrics import IGD, HV, GD, IGDp, FR, CV, DeltaP, Spread, Spacing
from ddmtolab.Methods.Algo_Methods.algo_utils import nd_sort
from ddmtolab.Methods.data_analysis import (
OptimizationDirection,
DataUtils,
DEFAULT_COLORS,
DEFAULT_MARKERS,
)
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class TestScanResult:
"""
Result of scanning a test data directory.
Attributes:
algorithms: List[str]
Sorted list of algorithm names found in the directory.
problems: List[str]
List of problem names (extracted from data or default).
data_path: Path
Path to the scanned data directory.
file_mapping: Dict[str, Path]
Mapping from algorithm name to pickle file path.
"""
algorithms: List[str]
problems: List[str]
data_path: Path
file_mapping: Dict[str, Path]
@dataclass
class TestMetricResults:
"""
Container for test metric calculation results.
Attributes:
metric_values: Dict[str, List[np.ndarray]]
Metric values per generation per algorithm.
Structure: metric_values[algorithm] = List[np.ndarray] (per task)
best_values: Dict[str, List[float]]
Final best metric values per algorithm.
Structure: best_values[algorithm] = List[float] (per task)
objective_values: Dict[str, List[np.ndarray]]
Original objective values per algorithm.
Structure: objective_values[algorithm] = List[np.ndarray]
runtime: Dict[str, float]
Runtime in seconds per algorithm.
max_nfes: Dict[str, List[int]]
Maximum number of function evaluations per algorithm.
metric_name: Optional[str]
Name of the metric used.
problems: List[str]
List of problem names.
"""
metric_values: Dict[str, List[np.ndarray]]
best_values: Dict[str, List[float]]
objective_values: Dict[str, List[np.ndarray]]
runtime: Dict[str, float]
max_nfes: Dict[str, List[int]]
metric_name: Optional[str]
problems: List[str]
@dataclass
class PlotConfig:
"""
Configuration for plot generation.
Attributes:
figure_format: str
Output figure format (e.g., 'pdf', 'png', 'svg').
log_scale: bool
Whether to use logarithmic scale for y-axis.
show_pf: bool
Whether to show true Pareto front in ND solution plots.
show_nd: bool
Whether to filter and show only non-dominated solutions.
save_path: Path
Directory path to save output figures.
colors: List[str]
Color palette for plotting algorithms.
markers: List[str]
Marker styles for plotting algorithms.
"""
figure_format: str = 'pdf'
log_scale: bool = False
show_pf: bool = True
show_nd: bool = True
save_path: Path = Path('./Results')
colors: List[str] = field(default_factory=lambda: DEFAULT_COLORS.copy())
markers: List[str] = field(default_factory=lambda: DEFAULT_MARKERS.copy())
# =============================================================================
# Test Plot Generator
# =============================================================================
class TestPlotGenerator:
"""Class for generating visualization plots for test data."""
def __init__(self, config: PlotConfig):
"""Initialize TestPlotGenerator with configuration."""
self.config = config
@staticmethod
def _calculate_legend_fontsize(n_algorithms: int) -> int:
"""
Calculate legend font size based on number of algorithms.
Linear interpolation:
- 2 algorithms -> font size 14
- 15 algorithms -> font size 6
"""
if n_algorithms <= 2:
return 14
elif n_algorithms >= 15:
return 6
else:
return int(round(14 - (8 / 13) * (n_algorithms - 2)))
def plot_convergence_curves(
self,
metric_values: Dict[str, List[np.ndarray]],
max_nfes: Dict[str, List[int]],
algorithm_order: List[str],
problems: List[str],
metric_name: Optional[str] = None
) -> None:
"""Generate and save convergence curve plots for all algorithms and tasks."""
save_dir = Path(self.config.save_path)
save_dir.mkdir(parents=True, exist_ok=True)
first_algo = algorithm_order[0]
num_tasks = len(metric_values[first_algo])
for task_idx in range(num_tasks):
fig = self._create_convergence_figure(
metric_values, max_nfes, algorithm_order,
task_idx, num_tasks, problems, metric_name
)
if num_tasks == 1:
problem_name = problems[0] if problems else 'Test'
output_file = save_dir / f'{problem_name}_convergence.{self.config.figure_format}'
else:
problem_name = problems[task_idx] if task_idx < len(problems) else f'P{task_idx + 1}'
output_file = save_dir / f'{problem_name}-Task{task_idx + 1}_convergence.{self.config.figure_format}'
fig.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close(fig)
print(f"Convergence plots saved to: {save_dir}")
def _create_convergence_figure(
self,
metric_values: Dict[str, List[np.ndarray]],
max_nfes: Dict[str, List[int]],
algorithm_order: List[str],
task_idx: int,
num_tasks: int,
problems: List[str],
metric_name: Optional[str]
) -> plt.Figure:
"""Create a single convergence curve figure."""
fig, ax = plt.subplots(figsize=(5, 3.5))
# Collect curve data for y-axis range and max NFEs for x-axis formatting
all_curves = []
actual_max_nfes = 0
# Adaptive line width and marker size based on number of algorithms
n_algos = len(algorithm_order)
if n_algos <= 4:
markersize, linewidth = 8, 2.5
elif n_algos <= 6:
markersize, linewidth = 7, 2.0
else:
markersize, linewidth = 6, 1.6
for idx, algo in enumerate(algorithm_order):
curve = np.array(metric_values[algo][task_idx]).ravel()
if len(curve) == 0:
continue
all_curves.append(curve)
nfes = max_nfes[algo][task_idx] if task_idx < len(max_nfes[algo]) else len(curve)
actual_max_nfes = max(actual_max_nfes, nfes)
x = np.linspace(0, nfes, len(curve))
marker_interval = max(1, len(curve) // 10)
ax.plot(
x, curve, label=algo,
color=self.config.colors[idx % len(self.config.colors)],
marker=self.config.markers[idx % len(self.config.markers)],
markevery=marker_interval,
markersize=markersize, linewidth=linewidth, linestyle='-', alpha=0.7
)
# Set axis labels
y_label = metric_name if metric_name is not None else 'Objective Value'
ax.set_xlabel('NFEs', fontsize=14)
ax.set_ylabel(y_label, fontsize=14)
if num_tasks == 1:
title = problems[0] if problems else 'Test Problem'
else:
prob_name = problems[task_idx] if task_idx < len(problems) else f'P{task_idx + 1}'
title = f'{prob_name} - Task {task_idx + 1}'
ax.set_title(title, fontsize=14)
ax.tick_params(axis='both', which='major', labelsize=14)
# Auto-adjust legend font size based on number of algorithms
legend_fontsize = self._calculate_legend_fontsize(len(algorithm_order))
ax.legend(loc='best', fontsize=legend_fontsize)
ax.grid(True, alpha=0.2, linestyle='-')
# Apply axis formatting
if self.config.log_scale:
ax.set_yscale('log')
# Check data range; use linear scale if range is too small
if len(all_curves) > 0:
all_data = np.concatenate([c for c in all_curves])
y_min, y_max = np.min(all_data), np.max(all_data)
if y_max / y_min < 10:
ax.set_yscale('linear')
self._apply_scientific_notation(ax, actual_xmax=actual_max_nfes)
else:
ax.yaxis.set_major_formatter(LogFormatterSciNotation())
if actual_max_nfes > 10000:
formatter = ScalarFormatter(useMathText=True)
formatter.set_scientific(True)
formatter.set_powerlimits((0, 0))
ax.xaxis.set_major_formatter(formatter)
else:
self._apply_scientific_notation(ax, actual_xmax=actual_max_nfes)
# Disable minor ticks (must be called after set_yscale)
ax.minorticks_off()
fig.tight_layout()
return fig
def _apply_scientific_notation(
self,
ax: plt.Axes,
actual_xmax: Optional[float] = None,
x_threshold: float = 10000,
y_threshold: float = 1000
) -> None:
"""Apply scientific notation to axes if values exceed threshold."""
xmax = actual_xmax if actual_xmax is not None else ax.get_xlim()[1]
ymax = ax.get_ylim()[1]
if xmax > x_threshold:
formatter = ScalarFormatter(useMathText=True)
formatter.set_scientific(True)
formatter.set_powerlimits((0, 0))
ax.xaxis.set_major_formatter(formatter)
if ymax > y_threshold:
formatter = ScalarFormatter(useMathText=True)
formatter.set_scientific(True)
formatter.set_powerlimits((0, 0))
ax.yaxis.set_major_formatter(formatter)
def plot_runtime(
self,
runtime: Dict[str, float],
algorithm_order: List[str]
) -> None:
"""Generate and save a bar plot showing runtime comparison."""
save_dir = Path(self.config.save_path)
save_dir.mkdir(parents=True, exist_ok=True)
n_algos = len(algorithm_order)
# Adaptive figure width based on number of algorithms
fig_width = max(4, min(6, 2 + n_algos * 0.8))
fig, ax = plt.subplots(figsize=(fig_width, 3.5))
x = np.arange(n_algos)
runtimes = [runtime[algo] for algo in algorithm_order]
colors = [self.config.colors[i % len(self.config.colors)] for i in range(n_algos)]
# Adaptive bar width: narrower bars when fewer algorithms
if n_algos == 1:
bar_width = 0.35
elif n_algos == 2:
bar_width = 0.45
elif n_algos <= 4:
bar_width = 0.55
else:
bar_width = 0.7
bars = ax.bar(x, runtimes, width=bar_width, color=colors, alpha=0.8,
edgecolor='black', linewidth=0.8)
# Add value labels on bars
for bar, val in zip(bars, runtimes):
height = bar.get_height()
ax.annotate(f'{val:.2f}s',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3),
textcoords="offset points",
ha='center', va='bottom', fontsize=10)
ax.set_ylabel('Runtime (s)', fontsize=12)
ax.set_xticks(x)
ax.set_xticklabels(algorithm_order, fontsize=12)
ax.tick_params(axis='both', which='major', labelsize=10)
ax.grid(True, axis='y', alpha=0.3, linestyle='-')
# Add top margin for labels (15% extra space)
y_max = max(runtimes) if runtimes else 1
ax.set_ylim(0, y_max * 1.15)
fig.tight_layout()
output_file = save_dir / f'runtime_comparison.{self.config.figure_format}'
fig.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close(fig)
print(f"Runtime plot saved to: {output_file}")
def plot_nd_solutions(
self,
objective_values: Dict[str, List[np.ndarray]],
algorithm_order: List[str],
problems: List[str],
settings: Optional[Dict[str, Any]] = None
) -> None:
"""Generate and save non-dominated solution plots."""
nd_folder = Path(self.config.save_path) / 'ND_Solutions'
nd_folder.mkdir(parents=True, exist_ok=True)
first_algo = algorithm_order[0]
n_tasks = len(objective_values[first_algo])
for algo in algorithm_order:
for task_idx in range(n_tasks):
objectives = objective_values[algo][task_idx]
if objectives is None or objectives.shape[0] == 0:
continue
n_objectives = objectives.shape[1]
if n_objectives <= 1:
continue
# Filter non-dominated solutions if requested
if self.config.show_nd:
front_no, _ = nd_sort(objectives, objectives.shape[0])
nd_solutions = objectives[front_no == 1]
else:
nd_solutions = objectives
# Load true Pareto front if requested
true_pf = None
if self.config.show_pf and settings is not None:
prob = problems[task_idx] if task_idx < len(problems) else f'P{task_idx + 1}'
true_pf = DataUtils.load_reference(settings, prob, task_idx, n_objectives)
# Create plot
fig = self._create_nd_plot(nd_solutions, true_pf, n_objectives, n_tasks,
problems, task_idx, algo)
# Save figure
prob_name = problems[task_idx] if task_idx < len(problems) else f'P{task_idx + 1}'
if n_tasks == 1:
filename = f'{prob_name}-{algo}.{self.config.figure_format}'
else:
filename = f'{prob_name}-Task{task_idx + 1}-{algo}.{self.config.figure_format}'
fig.savefig(nd_folder / filename, dpi=300)
plt.close(fig)
print(f"ND solution plots saved to: {nd_folder}")
def _create_nd_plot(
self,
nd_solutions: np.ndarray,
true_pf: Optional[np.ndarray],
n_objectives: int,
n_tasks: int,
problems: List[str],
task_idx: int,
algo: str
) -> plt.Figure:
"""Create a non-dominated solution plot."""
fig = plt.figure(figsize=(4.5, 3.5))
if n_objectives == 2:
ax = fig.add_subplot(111)
if true_pf is not None and true_pf.shape[1] == 2:
sort_idx = np.argsort(true_pf[:, 0])
sorted_pf = true_pf[sort_idx]
ax.scatter(sorted_pf[:, 0], sorted_pf[:, 1],
c='gray', s=2, linewidth=0.1, zorder=1)
ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1],
c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
linewidth=0.8, zorder=2)
ax.set_xlabel('$f_1$', fontsize=12)
ax.set_ylabel('$f_2$', fontsize=12)
ax.grid(True, alpha=0.2, linestyle='-')
elif n_objectives == 3:
ax = fig.add_subplot(111, projection='3d')
if true_pf is not None and true_pf.shape[1] == 3:
ax.scatter(true_pf[:, 0], true_pf[:, 1], true_pf[:, 2],
c='gray', s=4, alpha=0.2, zorder=1, depthshade=True)
ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1], nd_solutions[:, 2],
c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
linewidth=0.8, zorder=2, depthshade=True)
ax.set_xlabel('$f_1$', fontsize=12)
ax.set_ylabel('$f_2$', fontsize=12)
ax.set_zlabel('$f_3$', fontsize=12)
ax.view_init(elev=20, azim=60)
else:
# Parallel coordinates for many-objective
ax = fig.add_subplot(111)
for i in range(nd_solutions.shape[0]):
ax.plot(range(n_objectives), nd_solutions[i, :],
'b-', alpha=0.3, linewidth=0.8)
ax.set_xlabel('Objective', fontsize=12)
ax.set_ylabel('Value', fontsize=12)
ax.set_xticks(range(n_objectives))
ax.set_xticklabels([rf'$f_{{{i + 1}}}$' for i in range(n_objectives)])
ax.grid(True, alpha=0.3, linestyle='--')
prob_name = problems[task_idx] if task_idx < len(problems) else f'P{task_idx + 1}'
if n_tasks == 1:
title = f'{prob_name} - {algo}'
else:
title = f'{prob_name} - Task {task_idx + 1} - {algo}'
plt.title(title, fontsize=10)
plt.tight_layout()
return fig
# =============================================================================
# Test Table Generator
# =============================================================================
class TestTableGenerator:
"""Class for generating Excel tables for test data."""
def __init__(self, save_path: Path = Path('./Results')):
"""Initialize TestTableGenerator."""
self.save_path = save_path
def generate_excel_table(
self,
best_values: Dict[str, List[float]],
runtime: Dict[str, float],
algorithm_order: List[str],
problems: List[str],
metric_name: Optional[str] = None
) -> Path:
"""
Generate an Excel table comparing algorithm performance.
Parameters:
best_values: Dict[str, List[float]]
Final metric values per algorithm per task.
runtime: Dict[str, float]
Runtime per algorithm.
algorithm_order: List[str]
Algorithm display order.
problems: List[str]
Problem/task names.
metric_name: Optional[str]
Metric name for table caption.
Returns:
Path: Path to generated Excel file.
"""
try:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
except ImportError:
print("Warning: openpyxl not installed. Skipping Excel table generation.")
return None
save_dir = Path(self.save_path)
save_dir.mkdir(parents=True, exist_ok=True)
num_tasks = len(best_values[algorithm_order[0]])
direction = DataUtils.get_metric_direction(metric_name)
# Create workbook
wb = Workbook()
ws = wb.active
metric_str = metric_name if metric_name else 'Objective'
ws.title = f"Test Results ({metric_str})"
# Styles
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
best_font = Font(bold=True)
center_align = Alignment(horizontal='center', vertical='center')
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
# Header row
headers = ['Problem'] + algorithm_order
for col_idx, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = center_align
cell.border = thin_border
# Data rows
row_idx = 2
for task_idx in range(num_tasks):
prob_name = problems[task_idx] if task_idx < len(problems) else f'Task {task_idx + 1}'
# Find best value for this task
values = {}
for algo in algorithm_order:
val = best_values[algo][task_idx]
values[algo] = val
valid_values = {k: v for k, v in values.items() if not np.isnan(v)}
best_algo = None
if valid_values:
if direction == OptimizationDirection.MINIMIZE:
best_algo = min(valid_values, key=valid_values.get)
else:
best_algo = max(valid_values, key=valid_values.get)
# Write row
ws.cell(row=row_idx, column=1, value=prob_name).border = thin_border
for col_idx, algo in enumerate(algorithm_order, 2):
val = values[algo]
cell = ws.cell(row=row_idx, column=col_idx)
cell.value = f"{val:.4e}" if not np.isnan(val) else 'N/A'
cell.alignment = center_align
cell.border = thin_border
if algo == best_algo:
cell.font = best_font
row_idx += 1
# Runtime row
ws.cell(row=row_idx, column=1, value='Runtime (s)').border = thin_border
best_runtime_algo = min(runtime, key=runtime.get)
for col_idx, algo in enumerate(algorithm_order, 2):
cell = ws.cell(row=row_idx, column=col_idx)
cell.value = f"{runtime[algo]:.2f}"
cell.alignment = center_align
cell.border = thin_border
if algo == best_runtime_algo:
cell.font = best_font
# Adjust column widths
ws.column_dimensions['A'].width = 15
for col_idx in range(2, len(algorithm_order) + 2):
ws.column_dimensions[chr(64 + col_idx)].width = 15
# Save file
output_file = save_dir / 'test_results.xlsx'
wb.save(output_file)
print(f"Excel table saved to: {output_file}")
return output_file
# =============================================================================
# Main Test Data Analyzer Class
# =============================================================================
[docs]
class TestDataAnalyzer:
"""
Main class for analyzing single-run test data.
This class handles pickle files stored directly in the data folder,
providing a lightweight analysis pipeline without statistical analysis.
Attributes:
data_path: Path
Path to the data directory containing pickle files.
settings: Optional[Dict[str, Any]]
Problem settings including reference definitions.
algorithm_order: Optional[List[str]]
Custom ordering of algorithms for display.
plot_config: PlotConfig
Configuration for plot generation.
"""
def __init__(
self,
data_path: Union[str, Path] = './Data',
settings: Optional[Dict[str, Any]] = None,
algorithm_order: Optional[List[str]] = None,
save_path: Union[str, Path] = './Results',
figure_format: str = 'pdf',
log_scale: bool = False,
show_pf: bool = True,
show_nd: bool = True,
best_so_far: bool = True,
clear_results: bool = True,
file_suffix: str = '.pkl'
):
"""
Initialize TestDataAnalyzer.
Parameters:
data_path: Union[str, Path]
Path to data directory containing pickle files.
Default: './Data'
settings: Optional[Dict[str, Any]]
Problem settings dictionary for multi-objective metrics.
Default: None (single-objective mode)
algorithm_order: Optional[List[str]]
Custom ordering of algorithms for display.
Default: None (alphabetical order)
save_path: Union[str, Path]
Directory path to save all output files.
Default: './Results'
figure_format: str
Output figure format: 'pdf', 'png', 'svg', etc.
Default: 'pdf'
log_scale: bool
Whether to use logarithmic scale for convergence plot y-axis.
Default: False
show_pf: bool
Whether to show true Pareto front in ND solution plots.
Default: True
show_nd: bool
Whether to filter and show only non-dominated solutions.
Default: True
best_so_far: bool
Whether to use best-so-far metric values.
Default: True
clear_results: bool
Whether to clear existing results folder before analysis.
Default: True
file_suffix: str
Suffix pattern for pickle files.
Default: '.pkl'
"""
self.data_path = Path(data_path)
self.settings = settings
self.algorithm_order = algorithm_order
self.best_so_far = best_so_far
self.clear_results = clear_results
self.file_suffix = file_suffix
self.plot_config = PlotConfig(
figure_format=figure_format,
log_scale=log_scale,
show_pf=show_pf,
show_nd=show_nd,
save_path=Path(save_path)
)
self.table_save_path = Path(save_path)
# Internal state
self._scan_result: Optional[TestScanResult] = None
self._metric_results: Optional[TestMetricResults] = None
def scan_data(self) -> TestScanResult:
"""
Scan the data directory to detect pickle files.
Returns:
TestScanResult
Dataclass containing:
- algorithms: List[str] - Algorithm names extracted from filenames
- problems: List[str] - Problem names
- data_path: Path - Path to scanned directory
- file_mapping: Dict[str, Path] - Algorithm to file path mapping
Raises:
FileNotFoundError: If data_path does not exist.
ValueError: If no pickle files found.
"""
if not self.data_path.exists():
raise FileNotFoundError(f"Data path does not exist: {self.data_path}")
# Find all pickle files with specified suffix
file_mapping = {}
for pkl_file in self.data_path.glob(f'*{self.file_suffix}'):
# Extract algorithm name from filename (remove suffix)
algo_name = pkl_file.stem
file_mapping[algo_name] = pkl_file
if not file_mapping:
raise ValueError(f"No pickle files found in {self.data_path} with suffix '{self.file_suffix}'")
algorithms = sorted(file_mapping.keys())
# Try to extract problem names from first file
first_file = file_mapping[algorithms[0]]
data = DataUtils.load_pickle(first_file)
n_tasks = len(data['all_objs'])
# Generate problem names
if self.settings and 'problems' in self.settings:
problems = self.settings['problems'][:n_tasks]
else:
problems = [f'Task{i + 1}' for i in range(n_tasks)]
print(f"Found {len(algorithms)} algorithms: {algorithms}")
print(f"Found {n_tasks} tasks/problems: {problems}")
self._scan_result = TestScanResult(
algorithms=algorithms,
problems=problems,
data_path=self.data_path,
file_mapping=file_mapping
)
return self._scan_result
def calculate_metrics(self) -> TestMetricResults:
"""
Calculate metric values for all algorithms.
Returns:
TestMetricResults
Dataclass containing all computed metrics.
Raises:
RuntimeError: If scan_data() has not been called.
"""
if self._scan_result is None:
self.scan_data()
scan = self._scan_result
algo_order = self.algorithm_order if self.algorithm_order else scan.algorithms
metric_name = self.settings.get('metric') if self.settings else None
# Initialize storage dictionaries
all_values = {}
all_best_values = {}
all_objective_values = {}
all_runtime = {}
all_max_nfes = {}
for algo in algo_order:
pkl_path = scan.file_mapping[algo]
data = DataUtils.load_pickle(pkl_path)
metric_values, metric_values_best_so_far = self._calculate_single_algorithm_metrics(data)
selected = metric_values_best_so_far if self.best_so_far else metric_values
all_values[algo] = selected
# Extract final values
last_vals = [
np.asarray(task_arr).ravel()[-1] if len(task_arr) > 0 else np.nan
for task_arr in selected
]
all_best_values[algo] = last_vals
# Extract final objective values
last_objs = [data['all_objs'][t][-1] for t in range(len(data['all_objs']))]
all_objective_values[algo] = last_objs
all_runtime[algo] = data['runtime']
all_max_nfes[algo] = data['max_nfes']
print(f" {algo}: {len(data['all_objs'])} tasks, runtime={data['runtime']:.2f}s")
self._metric_results = TestMetricResults(
metric_values=all_values,
best_values=all_best_values,
objective_values=all_objective_values,
runtime=all_runtime,
max_nfes=all_max_nfes,
metric_name=metric_name,
problems=scan.problems
)
return self._metric_results
def _calculate_single_algorithm_metrics(
self,
data: Dict[str, Any]
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""
Calculate metric values for a single algorithm's data.
Parameters:
data: Dict[str, Any]
Loaded pickle data.
Returns:
Tuple[List[np.ndarray], List[np.ndarray]]
Tuple of (metric_values, metric_values_best_so_far).
"""
all_objs = data['all_objs']
all_cons = data.get('all_cons', None)
n_tasks = len(all_objs)
metric_values = []
metric_values_best_so_far = []
for t in range(n_tasks):
n_gens = len(all_objs[t])
task_values = np.zeros(n_gens)
task_best_so_far = np.zeros(n_gens)
best_so_far = None
for gen in range(n_gens):
objs_gen = all_objs[t][gen]
cons_gen = all_cons[t][gen] if all_cons is not None else None
M = objs_gen.shape[1]
if M == 1:
# Single-objective
metric_value = np.min(objs_gen[:, 0])
sign = -1
else:
# Multi-objective
if self.settings is None:
# Default to simple dominated hypervolume approximation
metric_value = np.min(np.sum(objs_gen, axis=1))
sign = -1
else:
metric_name = self.settings.get('metric', 'IGD')
prob = self._scan_result.problems[t] if t < len(self._scan_result.problems) else f'P{t + 1}'
reference = DataUtils.load_reference(self.settings, prob, t, M)
if metric_name == 'IGD':
metric_instance = IGD()
metric_value = metric_instance.calculate(objs_gen, reference)
sign = metric_instance.sign
elif metric_name == 'HV':
metric_instance = HV()
# If reference is 1D or single row, treat as ref point; otherwise as PF
if reference.ndim == 1 or reference.shape[0] == 1:
ref_point = reference.flatten()
metric_value = metric_instance.calculate(objs_gen, reference=ref_point)
else:
metric_value = metric_instance.calculate(objs_gen, pf=reference)
sign = metric_instance.sign
elif metric_name == 'IGDp':
metric_instance = IGDp()
metric_value = metric_instance.calculate(objs_gen, reference)
sign = metric_instance.sign
elif metric_name == 'GD':
metric_instance = GD()
metric_value = metric_instance.calculate(objs_gen, reference)
sign = metric_instance.sign
elif metric_name == 'DeltaP':
metric_instance = DeltaP()
metric_value = metric_instance.calculate(objs_gen, reference)
sign = metric_instance.sign
elif metric_name == 'Spacing':
metric_instance = Spacing()
metric_value = metric_instance.calculate(objs_gen)
sign = metric_instance.sign
elif metric_name == 'Spread':
metric_instance = Spread()
metric_value = metric_instance.calculate(objs_gen, reference)
sign = metric_instance.sign
elif metric_name == 'FR':
if cons_gen is None:
raise ValueError('FR metric requires constraint data, but all_cons is not available')
metric_instance = FR()
metric_value = metric_instance.calculate(cons_gen)
sign = metric_instance.sign
elif metric_name == 'CV':
if cons_gen is None:
raise ValueError('CV metric requires constraint data, but all_cons is not available')
metric_instance = CV()
metric_value = metric_instance.calculate(cons_gen)
sign = metric_instance.sign
else:
raise ValueError(f'Unsupported metric: {metric_name}')
task_values[gen] = metric_value
if best_so_far is None:
best_so_far = metric_value
else:
if sign == -1:
best_so_far = min(best_so_far, metric_value)
else:
best_so_far = max(best_so_far, metric_value)
task_best_so_far[gen] = best_so_far
metric_values.append(task_values)
metric_values_best_so_far.append(task_best_so_far)
return metric_values, metric_values_best_so_far
def generate_convergence_plots(self) -> None:
"""Generate and save convergence curve plots."""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
plot_gen = TestPlotGenerator(self.plot_config)
plot_gen.plot_convergence_curves(
self._metric_results.metric_values,
self._metric_results.max_nfes,
algo_order,
self._metric_results.problems,
self._metric_results.metric_name
)
def generate_runtime_plots(self) -> None:
"""Generate and save runtime comparison bar plots."""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
plot_gen = TestPlotGenerator(self.plot_config)
plot_gen.plot_runtime(self._metric_results.runtime, algo_order)
def generate_nd_solution_plots(self) -> None:
"""Generate and save non-dominated solution plots."""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
plot_gen = TestPlotGenerator(self.plot_config)
plot_gen.plot_nd_solutions(
self._metric_results.objective_values,
algo_order,
self._metric_results.problems,
self.settings
)
def generate_excel_tables(self) -> Path:
"""
Generate Excel comparison tables.
Returns:
Path: Path to generated Excel file.
"""
if self._metric_results is None:
self.calculate_metrics()
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
table_gen = TestTableGenerator(self.table_save_path)
# Generate results table
output_path = table_gen.generate_excel_table(
self._metric_results.best_values,
self._metric_results.runtime,
algo_order,
self._metric_results.problems,
self._metric_results.metric_name
)
return output_path
[docs]
def run(self) -> TestMetricResults:
"""
Execute the complete test analysis pipeline.
Returns:
TestMetricResults
Complete metric results from the analysis.
"""
print("=" * 60)
print("Starting Test Data Analysis Pipeline")
print("=" * 60)
# Step 0: Clear results folder if requested
if self.clear_results:
results_path = self.plot_config.save_path
if results_path.exists():
print(f'\nClearing existing results folder: {results_path}')
# Retry mechanism for Windows file locking issues
max_retries = 3
for attempt in range(max_retries):
try:
shutil.rmtree(results_path)
break
except PermissionError as e:
if attempt < max_retries - 1:
import time
print(f' Warning: Folder in use, retrying in 1s... ({attempt + 1}/{max_retries})')
time.sleep(1)
else:
print(f' Error: Cannot delete {results_path}. Please close any programs using this folder.')
raise e
results_path.mkdir(parents=True, exist_ok=True)
# Step 1: Scan data
print('\n[1/6] Scanning test data directory...')
self.scan_data()
# Step 2: Calculate metrics
print('\n[2/6] Calculating metric values...')
self.calculate_metrics()
# Step 3: Generate Excel tables
print('\n[3/6] Generating Excel tables...')
self.generate_excel_tables()
# Step 4: Plot convergence curves
print('\n[4/6] Plotting convergence curves...')
self.generate_convergence_plots()
# Step 5: Plot runtime
print('\n[5/6] Plotting runtime comparison...')
self.generate_runtime_plots()
# Step 6: Plot ND solutions (if multi-objective)
algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms
first_algo = algo_order[0]
first_objs = self._metric_results.objective_values[first_algo][0]
if first_objs.shape[1] > 1:
print('\n[6/6] Plotting non-dominated solutions...')
self.generate_nd_solution_plots()
else:
print('\n[6/6] Skipping ND plots (single-objective)')
print("\n" + "=" * 60)
print("Test Data Analysis Completed")
print("=" * 60)
return self._metric_results
# =============================================================================
# Module Entry Point
# =============================================================================
if __name__ == '__main__':
"""
Usage Examples for TestDataAnalyzer
====================================
Example 1: Quick Start
----------------------
Analyze pickle files in ./Data folder:
from ddmtolab.Methods.test_data_analysis import TestDataAnalyzer
analyzer = TestDataAnalyzer(data_path='./Data')
results = analyzer.run()
Example 2: With Custom Settings (Multi-Objective)
-------------------------------------------------
Multi-objective analysis with IGD metric:
SETTINGS = {
'metric': 'IGD',
'ref_path': './MOReference',
'problems': ['ZDT1', 'ZDT2'],
'ZDT1': {'T1': 'ZDT1_ref.npy'},
'ZDT2': {'T1': 'ZDT2_ref.npy'}
}
analyzer = TestDataAnalyzer(
data_path='./Data',
settings=SETTINGS,
save_path='./Results',
figure_format='png'
)
results = analyzer.run()
Example 3: Custom Algorithm Order
---------------------------------
analyzer = TestDataAnalyzer(
data_path='./Data',
algorithm_order=['DE', 'GA', 'PSO'],
figure_format='pdf'
)
results = analyzer.run()
Example 4: Step-by-Step Analysis
--------------------------------
analyzer = TestDataAnalyzer(data_path='./Data')
# Scan files
scan_result = analyzer.scan_data()
print(f"Found: {scan_result.algorithms}")
# Calculate metrics
results = analyzer.calculate_metrics()
# Generate specific outputs
analyzer.generate_latex_tables()
analyzer.generate_convergence_plots()
Example 5: Plot Customization
-----------------------------
Control figure format, log scale, and Pareto front display:
analyzer = TestDataAnalyzer(
data_path='./Data',
figure_format='pdf', # Output format: 'pdf', 'png', 'svg'
log_scale=True, # Use log scale for y-axis
show_pf=True, # Show true Pareto front
show_nd=True # Filter non-dominated solutions
)
Expected File Structure
-----------------------
./Data/
├── GA.pkl
├── DE.pkl
├── PSO.pkl
└── ...
Each .pkl file should contain:
- 'all_objs': List[List[np.ndarray]] - Objectives per task per generation
- 'runtime': float - Total runtime in seconds
- 'max_nfes': List[int] - Max function evaluations per task
"""
# Demo run
print("TestDataAnalyzer - Demo")
print("=" * 50)
analyzer = TestDataAnalyzer(
data_path='./Data',
save_path='./Results',
figure_format='pdf',
clear_results=True
)
# Run complete analysis
try:
results = analyzer.run()
except (FileNotFoundError, ValueError) as e:
print(f"Demo skipped: {e}")
print("Create pickle files in ./Data/ to run analysis.")