Source code for ddmtolab.Methods.data_analysis

"""
Data Analyzer Module for Multi-Task Optimization Experiments

This module provides a comprehensive analysis and visualization pipeline for
multi-task optimization experiments, including metric calculation, statistical
comparison tables, convergence plots, runtime analysis, and Pareto front visualization.

Classes:
    MetricResults: Dataclass for storing metric calculation results
    TableConfig: Dataclass for table generation configuration
    PlotConfig: Dataclass for plot generation configuration
    DataAnalyzer: Main class for data analysis pipeline

Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.10.10
Version: 2.1
"""

import os
import pickle
import shutil
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Union, Callable
from dataclasses import dataclass, field
from matplotlib.ticker import FuncFormatter
from enum import Enum

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from openpyxl import load_workbook
from openpyxl.styles import Border, Side, Alignment, Font
from scipy import stats
from tqdm import tqdm

# Import from project modules
from ddmtolab.Methods.metrics import IGD, HV, GD, IGDp, FR, CV, DeltaP, Spread, Spacing
from ddmtolab.Methods.Algo_Methods.algo_utils import nd_sort


# =============================================================================
# Enums and Constants
# =============================================================================

class OptimizationDirection(Enum):
    """Optimization direction enumeration."""
    MINIMIZE = "minimize"
    MAXIMIZE = "maximize"


class TableFormat(Enum):
    """Output table format enumeration."""
    EXCEL = "excel"
    LATEX = "latex"


class StatisticType(Enum):
    """Statistical measure type enumeration."""
    MEAN = "mean"
    MEDIAN = "median"
    MAX = "max"
    MIN = "min"


# Default color palette for plots
DEFAULT_COLORS = [
    '#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd',
    '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf',
    '#e41a1c', '#377eb8', '#4daf4a', '#984ea3', '#ff8c00'
]

# Default markers for plots
DEFAULT_MARKERS = ['o', 's', '^', 'v', 'D', 'p', '*', 'h', '<', '>', 'X', 'P', 'd', '8', 'H']


# =============================================================================
# Data Classes
# =============================================================================

@dataclass
class ScanResult:
    """
    Result of scanning a data directory.

    :no-index:

    Attributes
    ----------
    algorithms : List[str]
        Sorted list of algorithm names found in the directory.
    problems : List[str]
        Sorted list of problem names extracted from filenames.
    runs : int
        Number of independent runs per algorithm-problem combination.
    data_path : Path
        Path to the scanned data directory.
    """
    algorithms: List[str]
    problems: List[str]
    runs: int
    data_path: Path


@dataclass
class MetricResults:
    """
    Container for all metric calculation results.

    :no-index:

    Attributes
    ----------
    metric_values : Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
        Nested dictionary storing metric values per generation.
        Structure: metric_values[algorithm][problem][run] = List[np.ndarray]
        where each np.ndarray contains metric values per generation for each task.
    best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
        Nested dictionary storing final best metric values.
        Structure: best_values[algorithm][problem][run] = List[float]
        where each float is the final best value for each task.
    objective_values : Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
        Nested dictionary storing original objective values.
        Structure: objective_values[algorithm][problem][run] = List[np.ndarray]
        where each np.ndarray has shape (n_solutions, n_objectives).
    runtime : Dict[str, Dict[str, Dict[int, float]]]
        Nested dictionary storing runtime in seconds.
        Structure: runtime[algorithm][problem][run] = float
    max_nfes : Dict[str, Dict[str, List[int]]]
        Nested dictionary storing maximum number of function evaluations.
        Structure: max_nfes[algorithm][problem] = List[int] (per task)
    metric_name : Optional[str]
        Name of the metric used (e.g., 'IGD', 'HV', or None for single-objective).
    """
    metric_values: Dict[str, Dict[str, Dict[int, Any]]]
    best_values: Dict[str, Dict[str, Dict[int, List[float]]]]
    objective_values: Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
    runtime: Dict[str, Dict[str, Dict[int, float]]]
    max_nfes: Dict[str, Dict[str, List[int]]]
    metric_name: Optional[str]


@dataclass
class TableConfig:
    """
    Configuration for table generation.

    :no-index:

    Attributes
    ----------
    table_format : TableFormat
        Output format (EXCEL or LATEX).
    statistic_type : StatisticType
        Type of statistic to display (MEAN, MEDIAN, MAX, MIN).
    significance_level : float
        P-value threshold for statistical significance testing.
        Default: 0.05
    rank_sum_test : bool
        Whether to perform Wilcoxon rank-sum test.
        Default: True
    save_path : Path
        Directory path to save output tables.
    """
    table_format: TableFormat = TableFormat.EXCEL
    statistic_type: StatisticType = StatisticType.MEAN
    significance_level: float = 0.05
    rank_sum_test: bool = True
    save_path: Path = Path('./Results')


@dataclass
class PlotConfig:
    """
    Configuration for plot generation.

    :no-index:

    Attributes
    ----------
    figure_format : str
        Output figure format (e.g., 'pdf', 'png', 'svg').
        Default: 'pdf'
    statistic_type : StatisticType
        Type of statistic for selecting representative run.
    log_scale : bool
        Whether to use logarithmic scale for y-axis.
        Default: False
    show_pf : bool
        Whether to show true Pareto front in ND solution plots.
        Default: True
    show_nd : bool
        Whether to filter and show only non-dominated solutions.
        Default: True
    merge_plots : bool
        Whether to merge all plots into a single figure.
        Default: False
    merge_columns : int
        Number of columns in merged plot layout.
        Default: 3
    show_std_band : bool
        Whether to show standard deviation band on convergence curves.
        Default: False
    save_path : Path
        Directory path to save output figures.
    colors : List[str]
        Color palette for plotting algorithms.
    markers : List[str]
        Marker styles for plotting algorithms.
    """
    figure_format: str = 'pdf'
    statistic_type: StatisticType = StatisticType.MEAN
    log_scale: bool = False
    show_pf: bool = True
    show_nd: bool = True
    merge_plots: bool = False
    merge_columns: int = 3
    show_std_band: bool = False
    save_path: Path = Path('./Results')
    colors: List[str] = field(default_factory=lambda: DEFAULT_COLORS.copy())
    markers: List[str] = field(default_factory=lambda: DEFAULT_MARKERS.copy())


@dataclass
class ComparisonResult:
    """
    Result of statistical comparison between algorithms.

    :no-index:

    Attributes
    ----------
    symbol : str
        Comparison symbol: '+' (better), '-' (worse), '=' (no significant difference).
    p_value : Optional[float]
        P-value from statistical test, or None if test not performed.
    """
    symbol: str
    p_value: Optional[float] = None


@dataclass
class ComparisonCounts:
    """
    Aggregated comparison counts for an algorithm.

    :no-index:

    Attributes
    ----------
    plus : int
        Number of significantly better results.
    minus : int
        Number of significantly worse results.
    equal : int
        Number of statistically equivalent results.
    """
    plus: int = 0
    minus: int = 0
    equal: int = 0


# =============================================================================
# Utility Functions
# =============================================================================

class DataUtils:
    """
    Utility class for data loading and processing operations.
    """

    @staticmethod
    def load_pickle(file_path: Path) -> Dict[str, Any]:
        """
        Load and return a Python object from a pickle file.

        Parameters
        ----------
        file_path : Path
            Path to the pickle file.

        Returns
        -------
        Dict[str, Any]
            Unpickled Python object (typically a dictionary containing
            'all_objs', 'runtime', 'max_nfes' keys).

        Raises
        ------
        FileNotFoundError
            If the pickle file does not exist.
        pickle.UnpicklingError
            If the file cannot be unpickled.
        """
        with open(file_path, 'rb') as f:
            return pickle.load(f)

    @staticmethod
    def load_reference(
            settings: Dict[str, Any],
            problem: str,
            task_identifier: Union[str, int],
            M: int,
            D: Optional[int] = None,
            C: int = 0
    ) -> Optional[np.ndarray]:
        """
        Load reference data (Pareto Front or reference point) for a specific problem and task.

        Parameters
        ----------
        settings : Dict[str, Any]
            Dictionary containing problem configurations and reference definitions.
            Expected keys:

            - problem (str): Contains task definitions
            - 'n_ref' (int, optional): Number of reference points (default: 10000)
            - 'ref_path' (str, optional): Path to reference files (default: './MOReference')

        problem : str
            Name of the problem (e.g., "DTLZ1", "DTLZ2").
        task_identifier : Union[str, int]
            Task identifier - either task name (str like "T1") or index (int like 0).
        M : int
            Number of objectives (required).
        D : int, optional
            Number of decision variables (dimension).
        C : int, optional
            Number of constraints (default: 0).

        Returns
        -------
        Optional[np.ndarray]
            Reference data with shape (n_points, M), or None if not available.

        Notes
        -----
        Supports three types of reference definitions:

        1. Callable: Function that returns reference data

           - Must accept parameter N (number of reference points)
           - Must accept parameter M (number of objectives)
           - May optionally accept parameters D, C
           - Example signatures: ``func(N, M)``, ``func(N, M, D)``, ``func(N, M, D, C)``

        2. String: File path to .npy or .csv reference file
        3. Array-like: Direct reference data (list, tuple, np.ndarray)

        If 'all_tasks' key is present instead of individual task keys, the same
        reference data will be used for all tasks.
        """
        # Convert task index to task name if necessary
        task_name = f"T{task_identifier + 1}" if isinstance(task_identifier, int) else task_identifier

        # Check if problem exists in settings
        if problem not in settings:
            print(f"Warning: Problem '{problem}' not found in settings")
            return None

        problem_settings = settings[problem]

        # Check if task exists for this problem
        if task_name in problem_settings:
            ref_definition = problem_settings[task_name]
        elif 'all_tasks' in problem_settings:
            # Use the same reference for all tasks
            ref_definition = problem_settings['all_tasks']
        else:
            print(f"Warning: Task '{task_name}' and 'all_tasks' not found for problem '{problem}'")
            return None

        # Case 1: Callable function
        if callable(ref_definition):
            N = settings.get('n_ref', 10000)

            try:
                import inspect
                sig = inspect.signature(ref_definition)
                params = list(sig.parameters.keys())
                num_params = len(params)

                if num_params == 2:
                    # func(N, M)
                    return ref_definition(N, M)
                elif num_params == 3:
                    # func(N, M, D)
                    if D is None:
                        print(f"Warning: D not provided for {problem}_{task_name}, using 0")
                        D = 0
                    return ref_definition(N, M, D)
                elif num_params >= 4:
                    # func(N, M, D, C)
                    if D is None:
                        print(f"Warning: D not provided for {problem}_{task_name}, using 0")
                        D = 0
                    return ref_definition(N, M, D, C)
                else:
                    print(
                        f"Warning: Unexpected number of parameters ({num_params}) for reference function {problem}_{task_name}")
                    return None

            except Exception as e:
                print(f"Warning: Failed to call reference function for {problem}_{task_name}: {e}")
                return None

        # Case 2: String (file path or file name)
        elif isinstance(ref_definition, str):
            return DataUtils._load_reference_from_file(
                settings,
                ref_definition,
                problem,
                task_name
            )

        # Case 3: Array-like (list, tuple, numpy array)
        elif isinstance(ref_definition, (list, tuple, np.ndarray)):
            reference = np.array(ref_definition)
            # Ensure it's at least 2D
            if reference.ndim == 1:
                reference = reference.reshape(1, -1)
            return reference

        else:
            print(f"Warning: Unknown reference definition type for {problem}_{task_name}: {type(ref_definition)}")
            return None

    @staticmethod
    def _load_reference_from_file(
            settings: Dict[str, Any],
            ref_definition: str,
            problem: str,
            task_name: str
    ) -> Optional[np.ndarray]:
        """
        Load reference data from file.

        Parameters
        ----------
        settings : Dict[str, Any]
            Settings dictionary containing 'ref_path'.
        ref_definition : str
            File path or filename.
        problem : str
            Problem name for alternative path construction.
        task_name : str
            Task name for alternative path construction.

        Returns
        -------
        Optional[np.ndarray]
            Loaded reference data or None if loading fails.
        """
        ref_path = settings.get('ref_path', './MOReference')

        # Construct full path
        if not os.path.isabs(ref_definition):
            full_path = os.path.join(ref_path, ref_definition)
        else:
            full_path = ref_definition

        # Try to load the file
        try:
            if full_path.endswith('.npy'):
                return np.load(full_path)
            elif full_path.endswith('.csv'):
                return np.loadtxt(full_path, delimiter=',')
            else:
                print(f"Warning: Unsupported file format for '{full_path}'")
                return None
        except FileNotFoundError:
            # Try alternative naming conventions
            base_name = f"{problem}_{task_name}_ref"

            for ext in ['.npy', '.csv']:
                alt_path = os.path.join(ref_path, base_name + ext)
                if os.path.exists(alt_path):
                    try:
                        if ext == '.npy':
                            return np.load(alt_path)
                        else:
                            return np.loadtxt(alt_path, delimiter=',')
                    except Exception as e:
                        print(f"Error loading file '{alt_path}': {e}")

            print(f"Warning: File not found: '{full_path}'")
            return None
        except Exception as e:
            print(f"Error loading reference data from file '{full_path}': {e}")
            return None

    @staticmethod
    def get_metric_direction(metric_name: Optional[str]) -> OptimizationDirection:
        """
        Determine optimization direction based on metric type (Version 2 - More maintainable).

        Parameters
        ----------
        metric_name : Optional[str]
            Name of the metric or None for single-objective.

        Returns
        -------
        OptimizationDirection
            MINIMIZE or MAXIMIZE based on the metric's sign attribute.
        """
        if metric_name is None:
            return OptimizationDirection.MINIMIZE

        # Metric sign mapping (based on your code)
        # sign = -1 means minimize, sign = 1 means maximize
        metric_signs = {
            'IGD': -1,  # Inverted Generational Distance (minimize)
            'HV': 1,  # Hypervolume (maximize)
            'IGDp': -1,  # IGD+ (minimize)
            'GD': -1,  # Generational Distance (minimize)
            'DeltaP': -1,  # Delta_p (minimize)
            'Spacing': -1,  # Spacing (minimize)
            'Spread': -1,  # Spread (minimize)
            'FR': 1,  # Feasibility Rate (maximize)
            'CV': -1,  # Constraint Violation (minimize)
        }

        if metric_name not in metric_signs:
            raise ValueError(f'Unsupported metric: {metric_name}')

        sign = metric_signs[metric_name]
        return OptimizationDirection.MAXIMIZE if sign == 1 else OptimizationDirection.MINIMIZE


# =============================================================================
# Statistics Module
# =============================================================================

class StatisticsCalculator:
    """
    Class for statistical calculations and hypothesis testing.
    """

    @staticmethod
    def calculate_statistic(
            data: List[float],
            statistic_type: StatisticType
    ) -> Tuple[float, Optional[float]]:
        """
        Calculate a statistical measure and optional standard deviation from data.

        Parameters
        ----------
        data : List[float]
            List of numeric values to compute statistics from.
        statistic_type : StatisticType
            Type of statistic to calculate (MEAN, MEDIAN, MAX, MIN).

        Returns
        -------
        Tuple[float, Optional[float]]
            Tuple of (statistic_value, std_value).
            std_value is only returned for MEAN, None otherwise.
            Returns (np.nan, np.nan) for empty data.
        """
        if len(data) == 0:
            return np.nan, np.nan

        if statistic_type == StatisticType.MEAN:
            stat_value = np.mean(data)
            std_value = np.std(data, ddof=1) if len(data) > 1 else 0.0
            return stat_value, std_value
        elif statistic_type == StatisticType.MEDIAN:
            return np.median(data), None
        elif statistic_type == StatisticType.MAX:
            return np.max(data), None
        elif statistic_type == StatisticType.MIN:
            return np.min(data), None
        else:
            return np.nan, np.nan

    @staticmethod
    def perform_rank_sum_test(
            algo_data: List[float],
            base_data: List[float],
            significance_level: float = 0.05,
            direction: OptimizationDirection = OptimizationDirection.MINIMIZE
    ) -> ComparisonResult:
        """
        Perform Wilcoxon rank-sum test to compare two algorithms.

        Parameters
        ----------
        algo_data : List[float]
            Data from the algorithm being tested.
        base_data : List[float]
            Data from the baseline algorithm.
        significance_level : float, optional
            P-value threshold for significance (default: 0.05).
        direction : OptimizationDirection, optional
            Optimization direction (MINIMIZE or MAXIMIZE).

        Returns
        -------
        ComparisonResult
            Result containing comparison symbol and p-value.
            Symbol: '+' (better), '-' (worse), '=' (no significant difference).
        """
        if len(algo_data) == 0 or len(base_data) == 0:
            return ComparisonResult(symbol='=', p_value=None)

        try:
            _, p_value = stats.ranksums(algo_data, base_data)

            if p_value < significance_level:
                algo_median = np.median(algo_data)
                base_median = np.median(base_data)

                if direction == OptimizationDirection.MINIMIZE:
                    symbol = '+' if algo_median < base_median else '-'
                else:
                    symbol = '+' if algo_median > base_median else '-'
            else:
                symbol = '='

            return ComparisonResult(symbol=symbol, p_value=p_value)
        except Exception:
            return ComparisonResult(symbol='=', p_value=None)

    @staticmethod
    def collect_task_data(
            all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
            algo: str,
            prob: str,
            task_idx: int
    ) -> List[float]:
        """
        Collect non-NaN values from all runs for a specific algorithm-problem-task combination.

        Parameters
        ----------
        all_best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
            Nested dictionary containing best metric values.
        algo : str
            Algorithm name.
        prob : str
            Problem name.
        task_idx : int
            Task index (0-based).

        Returns
        -------
        List[float]
            List of non-NaN metric values from all runs.
        """
        data = []
        for run in all_best_values[algo][prob].keys():
            value = all_best_values[algo][prob][run][task_idx]
            if not np.isnan(value):
                data.append(value)
        return data

    @staticmethod
    def select_representative_run(
            all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
            algo: str,
            prob: str,
            task_idx: int,
            statistic_type: StatisticType
    ) -> Optional[int]:
        """
        Select a representative run based on the specified statistic type.

        Parameters
        ----------
        all_best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
            Nested dictionary containing best metric values.
        algo : str
            Algorithm name.
        prob : str
            Problem name.
        task_idx : int
            Task index (0-based).
        statistic_type : StatisticType
            Type of statistic (MEAN returns None as all runs are used).

        Returns
        -------
        Optional[int]
            Run number of the representative run, or None if MEAN or no valid data.
        """
        if statistic_type == StatisticType.MEAN:
            return None

        # Collect final values from all runs
        final_values = []
        runs = []

        for run in all_best_values[algo][prob].keys():
            value = all_best_values[algo][prob][run][task_idx]
            if not np.isnan(value):
                final_values.append(value)
                runs.append(run)

        if len(final_values) == 0:
            return None

        final_values = np.array(final_values)
        runs = np.array(runs)

        if statistic_type == StatisticType.MEDIAN:
            target_value = np.median(final_values)
            idx = np.argmin(np.abs(final_values - target_value))
        elif statistic_type == StatisticType.MAX:
            idx = np.argmax(final_values)
        elif statistic_type == StatisticType.MIN:
            idx = np.argmin(final_values)
        else:
            return None

        return runs[idx]


# =============================================================================
# Table Generator Module
# =============================================================================

class TableGenerator:
    """
    Class for generating comparison tables in Excel and LaTeX formats.
    """

    def __init__(self, config: TableConfig):
        """
        Initialize TableGenerator with configuration.

        Parameters
        ----------
        config : TableConfig
            Configuration object for table generation.
        """
        self.config = config

    def generate(
            self,
            all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
            algorithm_order: List[str],
            metric_name: Optional[str] = None
    ) -> Union[pd.DataFrame, str]:
        """
        Generate comparison table with statistical analysis.

        Parameters
        ----------
        all_best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
            Nested dictionary containing best metric values.
            Structure: all_best_values[algorithm][problem][run] = List[float]
        algorithm_order : List[str]
            List of algorithm names in display order.
            The last algorithm is treated as the baseline for comparisons.
        metric_name : Optional[str], optional
            Metric name to determine optimization direction.

        Returns
        -------
        Union[pd.DataFrame, str]
            DataFrame for Excel format, LaTeX string for LaTeX format.
        """
        # Extract problems and determine task count
        # problems = sorted(all_best_values[algorithm_order[0]].keys())
        problems = sorted(all_best_values[algorithm_order[0]].keys(),
                          key=lambda x: int(''.join(filter(str.isdigit, x))) if any(c.isdigit() for c in x) else x)

        # Determine optimization direction
        direction = DataUtils.get_metric_direction(metric_name)

        # Generate data rows
        rows, comparison_counts, algorithm_ranks = self._generate_data_rows(all_best_values, algorithm_order, problems, direction)

        # Generate and save table
        if self.config.table_format == TableFormat.EXCEL:
            return self._generate_excel_table(rows, algorithm_order, comparison_counts, algorithm_ranks, direction)
        else:
            return self._generate_latex_table(rows, algorithm_order, comparison_counts, algorithm_ranks, direction)

    def _generate_data_rows(
            self,
            all_best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
            algorithm_order: List[str],
            problems: List[str],
            direction: OptimizationDirection
    ) -> Tuple[List[Dict[str, Any]], Dict[str, ComparisonCounts], Dict[str, List[int]]]:
        """
        ...
        Returns
        -------
        Tuple[List[Dict[str, Any]], Dict[str, ComparisonCounts], Dict[str, List[int]]]
            Tuple of (rows, comparison_counts, algorithm_ranks).
            algorithm_ranks[algo] = List[int]
        """
        base_algo = algorithm_order[-1]
        rows = []
        comparison_counts = {algo: ComparisonCounts() for algo in algorithm_order[:-1]}
        algorithm_ranks = {algo: [] for algo in algorithm_order}

        for prob in problems:
            first_algo = algorithm_order[0]
            first_run = list(all_best_values[first_algo][prob].keys())[0]
            num_tasks = len(all_best_values[first_algo][prob][first_run])

            for task_idx in range(num_tasks):
                row = {'Problem': prob, 'Task': task_idx + 1}

                algo_stat_values = {}

                base_data = StatisticsCalculator.collect_task_data(
                    all_best_values, base_algo, prob, task_idx
                )

                for algo in algorithm_order:
                    algo_data = StatisticsCalculator.collect_task_data(
                        all_best_values, algo, prob, task_idx
                    )

                    stat_value, std_value = StatisticsCalculator.calculate_statistic(
                        algo_data, self.config.statistic_type
                    )

                    algo_stat_values[algo] = stat_value

                    symbol = ''
                    if self.config.rank_sum_test and algo != base_algo:
                        result = StatisticsCalculator.perform_rank_sum_test(
                            algo_data, base_data,
                            self.config.significance_level, direction
                        )
                        symbol = result.symbol

                        if algo in comparison_counts:
                            if symbol == '+':
                                comparison_counts[algo].plus += 1
                            elif symbol == '-':
                                comparison_counts[algo].minus += 1
                            else:
                                comparison_counts[algo].equal += 1

                    cell_content = self._format_cell_content(stat_value, std_value, symbol)
                    row[algo] = cell_content

                row_ranks = self._calculate_row_ranks(algo_stat_values, direction)
                for algo, rank in row_ranks.items():
                    algorithm_ranks[algo].append(rank)

                rows.append(row)

        return rows, comparison_counts, algorithm_ranks

    def _calculate_row_ranks(
            self,
            algo_values: Dict[str, float],
            direction: OptimizationDirection
    ) -> Dict[str, int]:
        """
        Calculate the rank of each algorithm in a single row.

        Parameters
        ----------
        algo_values : Dict[str, float]
            Statistical values for each algorithm.
        direction : OptimizationDirection
            Optimization direction (MINIMIZE or MAXIMIZE).

        Returns
        -------
        Dict[str, int]
            Rank of each algorithm (1 is the best).
        """
        # Filter out NaN values
        valid_algos = {algo: val for algo, val in algo_values.items() if not np.isnan(val)}

        if not valid_algos:
            return {algo: np.nan for algo in algo_values.keys()}

        # Sort based on optimization direction
        if direction == OptimizationDirection.MINIMIZE:
            sorted_algos = sorted(valid_algos.items(), key=lambda x: x[1])
        else:
            sorted_algos = sorted(valid_algos.items(), key=lambda x: x[1], reverse=True)

        # Assign ranks
        ranks = {}
        for rank, (algo, _) in enumerate(sorted_algos, start=1):
            ranks[algo] = rank

        # Set NaN for algorithms with NaN values
        for algo in algo_values.keys():
            if algo not in ranks:
                ranks[algo] = np.nan

        return ranks

    def _format_cell_content(
            self,
            stat_value: float,
            std_value: Optional[float],
            symbol: str
    ) -> str:
        """
        Format a table cell with statistic value, optional std deviation, and comparison symbol.

        Parameters
        ----------
        stat_value : float
            Statistical value.
        std_value : Optional[float]
            Standard deviation (or None).
        symbol : str
            Comparison symbol.

        Returns
        -------
        str
            Formatted cell content string.
        """
        if np.isnan(stat_value):
            return 'N/A'

        if self.config.table_format == TableFormat.EXCEL:
            if self.config.statistic_type == StatisticType.MEAN:
                cell_content = f"{stat_value:.4e}({std_value:.2e})"
            else:
                cell_content = f"{stat_value:.4e}"

            if symbol:
                cell_content += f" {symbol}"
        else:
            # LaTeX format
            if self.config.statistic_type == StatisticType.MEAN:
                stat_str = f"{stat_value:.4e}".replace('e-', 'e$-$')
                std_str = f"{std_value:.2e}".replace('e-', 'e$-$')
                cell_content = f"{stat_str}({std_str})"
            else:
                stat_str = f"{stat_value:.4e}".replace('e-', 'e$-$')
                cell_content = stat_str

            if symbol:
                symbol_map = {'+': '~$+$', '-': '~$-$', '=': '~='}
                cell_content += symbol_map.get(symbol, '')

        return cell_content

    def _find_best_value_in_row(
            self,
            row: Dict[str, Any],
            algorithm_order: List[str],
            direction: OptimizationDirection
    ) -> Optional[str]:
        """
        Find the algorithm with the best performance in a table row.

        Parameters
        ----------
        row : Dict[str, Any]
            Dictionary mapping algorithm names to formatted cell values.
        algorithm_order : List[str]
            List of algorithm names.
        direction : OptimizationDirection
            Optimization direction.

        Returns
        -------
        Optional[str]
            Name of the best-performing algorithm or None.
        """
        best_val = None
        best_algo = None

        for algo in algorithm_order:
            cell = row[algo]
            if cell != 'N/A':
                try:
                    val_str = cell.split('(')[0].replace('e$-$', 'e-')
                    val = float(val_str)

                    if direction == OptimizationDirection.MINIMIZE:
                        if best_val is None or val < best_val:
                            best_val = val
                            best_algo = algo
                    else:
                        if best_val is None or val > best_val:
                            best_val = val
                            best_algo = algo
                except Exception:
                    pass

        return best_algo

    def _generate_excel_table(
            self,
            rows: List[Dict[str, Any]],
            algorithm_order: List[str],
            comparison_counts: Dict[str, ComparisonCounts],
            algorithm_ranks: Dict[str, List[int]],
            direction: OptimizationDirection
    ) -> pd.DataFrame:
        """
        Generate and save a formatted Excel table.

        Parameters
        ----------
        rows : List[Dict[str, Any]]
            Table row data.
        algorithm_order : List[str]
            Algorithm display order.
        comparison_counts : Dict[str, ComparisonCounts]
            Comparison result counts.
        direction : OptimizationDirection
            Optimization direction.

        Returns
        -------
        pd.DataFrame
            DataFrame containing the table data.
        """
        if self.config.rank_sum_test:
            summary_row = {'Problem': '+/-/=', 'Task': ''}
            for algo in algorithm_order[:-1]:
                counts = comparison_counts[algo]
                summary_row[algo] = f"{counts.plus}/{counts.minus}/{counts.equal}"
            summary_row[algorithm_order[-1]] = 'Base'
            rows.append(summary_row)

        avg_rank_row = {'Problem': 'Average Rank', 'Task': ''}
        for algo in algorithm_order:
            ranks = algorithm_ranks[algo]
            valid_ranks = [r for r in ranks if not np.isnan(r)]
            if valid_ranks:
                avg_rank = np.mean(valid_ranks)
                avg_rank_row[algo] = f"{avg_rank:.2f}"
            else:
                avg_rank_row[algo] = 'N/A'
        rows.append(avg_rank_row)

        # Create DataFrame
        df = pd.DataFrame(rows)
        columns = ['Problem', 'Task'] + algorithm_order
        df = df[columns]

        # Save and format
        save_dir = Path(self.config.save_path)
        save_dir.mkdir(parents=True, exist_ok=True)
        output_file = save_dir / f'results_table_{self.config.statistic_type.value}.xlsx'
        df.to_excel(output_file, index=False)

        # Apply Excel formatting
        self._apply_excel_formatting(output_file, df, algorithm_order, direction)

        print(f"Excel table saved to: {output_file}")
        return df

    def _apply_excel_formatting(
            self,
            output_file: Path,
            df: pd.DataFrame,
            algorithm_order: List[str],
            direction: OptimizationDirection
    ) -> None:
        """..."""
        wb = load_workbook(output_file)
        ws = wb.active

        # Define styles
        thin_border = Border(
            left=Side(style='thin'),
            right=Side(style='thin'),
            top=Side(style='thin'),
            bottom=Side(style='thin')
        )
        normal_font = Font(name='Times New Roman', size=11)
        bold_font = Font(name='Times New Roman', size=11, bold=True)

        # Apply formatting and auto-adjust column widths
        for column in ws.columns:
            max_length = 0
            column_letter = column[0].column_letter

            for cell in column:
                cell.border = thin_border
                cell.alignment = Alignment(horizontal='center', vertical='center')
                cell.font = normal_font

                try:
                    if cell.value:
                        cell_length = len(str(cell.value))
                        if cell_length > max_length:
                            max_length = cell_length
                except Exception:
                    pass

            ws.column_dimensions[column_letter].width = max_length + 2

        # Bold the best value in each data row
        num_summary_rows = 1  # Always has Average Rank row
        if self.config.rank_sum_test:
            num_summary_rows += 1  # Add +/-/= row

        num_data_rows = len(df) - num_summary_rows

        for row_idx in range(2, num_data_rows + 2):
            best_val = None
            best_col = None

            for col_idx, algo in enumerate(algorithm_order, start=3):
                cell = ws.cell(row=row_idx, column=col_idx)
                cell_value = cell.value

                if cell_value and cell_value != 'N/A':
                    try:
                        val_str = str(cell_value).split('(')[0].strip()
                        val = float(val_str)

                        if direction == OptimizationDirection.MINIMIZE:
                            if best_val is None or val < best_val:
                                best_val = val
                                best_col = col_idx
                        else:
                            if best_val is None or val > best_val:
                                best_val = val
                                best_col = col_idx
                    except Exception:
                        pass

            if best_col is not None:
                ws.cell(row=row_idx, column=best_col).font = bold_font

        # Bold the best (minimum) average rank
        avg_rank_row_idx = len(df) + 1  # Last row in the table
        best_avg_rank = None
        best_avg_rank_col = None

        for col_idx, algo in enumerate(algorithm_order, start=3):
            cell = ws.cell(row=avg_rank_row_idx, column=col_idx)
            cell_value = cell.value

            if cell_value and cell_value != 'N/A':
                try:
                    avg_rank = float(cell_value)
                    if best_avg_rank is None or avg_rank < best_avg_rank:
                        best_avg_rank = avg_rank
                        best_avg_rank_col = col_idx
                except Exception:
                    pass

        if best_avg_rank_col is not None:
            ws.cell(row=avg_rank_row_idx, column=best_avg_rank_col).font = bold_font

        wb.save(output_file)

    def _generate_latex_table(
            self,
            rows: List[Dict[str, Any]],
            algorithm_order: List[str],
            comparison_counts: Dict[str, ComparisonCounts],
            algorithm_ranks: Dict[str, List[int]],
            direction: OptimizationDirection
    ) -> str:
        """..."""
        df = pd.DataFrame(rows)

        # Build table structure
        num_cols = len(algorithm_order) + 2
        col_format = '|'.join(['c'] * num_cols)
        col_format = '|' + col_format + '|'

        # Initialize LaTeX table
        latex_str = "\\begin{table*}[htbp]\n"
        latex_str += "\\renewcommand{\\arraystretch}{1.2}\n"
        latex_str += "\\centering\n"
        latex_str += "\\caption{Your caption here}\n"
        latex_str += "\\label{tab:results}\n"
        latex_str += "\\resizebox{1.0\\textwidth}{!}{\n"
        latex_str += f"\\begin{{tabular}}{{{col_format}}}\n"
        latex_str += "\\hline\n"

        # Header row
        header = "Problem & Task & " + " & ".join(algorithm_order) + " \\\\\n"
        latex_str += header
        latex_str += "\\hline\n"

        # Data rows
        for _, row in df.iterrows():
            best_algo = self._find_best_value_in_row(row, algorithm_order, direction)

            row_str = f"{row['Problem']} & {row['Task']}"
            for algo in algorithm_order:
                cell = row[algo]
                if algo == best_algo:
                    cell = f"\\textbf{{{cell}}}"
                row_str += f" & {cell}"
            row_str += " \\\\\n"
            latex_str += row_str
            latex_str += "\\hline\n"

        # Summary row
        if self.config.rank_sum_test:
            summary_str = "\\multicolumn{2}{|c|}{+/$-$/=}"
            for algo in algorithm_order[:-1]:
                counts = comparison_counts[algo]
                summary_str += f" & {counts.plus}/{counts.minus}/{counts.equal}"
            summary_str += " & Base \\\\\n"
            latex_str += summary_str
            latex_str += "\\hline\n"

        # Average Rank row with best rank highlighted
        avg_rank_str = "\\multicolumn{2}{|c|}{Average Rank}"

        # Calculate average ranks and find the best
        avg_ranks = {}
        for algo in algorithm_order:
            ranks = algorithm_ranks[algo]
            valid_ranks = [r for r in ranks if not np.isnan(r)]
            if valid_ranks:
                avg_ranks[algo] = np.mean(valid_ranks)
            else:
                avg_ranks[algo] = np.nan

        # Find algorithm with best (minimum) average rank
        valid_avg_ranks = {algo: rank for algo, rank in avg_ranks.items() if not np.isnan(rank)}
        best_rank_algo = min(valid_avg_ranks, key=valid_avg_ranks.get) if valid_avg_ranks else None

        # Generate Average Rank row
        for algo in algorithm_order:
            if np.isnan(avg_ranks[algo]):
                cell_content = "N/A"
            else:
                cell_content = f"{avg_ranks[algo]:.2f}"

            # Bold the best rank
            if algo == best_rank_algo:
                cell_content = f"\\textbf{{{cell_content}}}"

            avg_rank_str += f" & {cell_content}"

        avg_rank_str += " \\\\\n"
        latex_str += avg_rank_str
        latex_str += "\\hline\n"

        latex_str += "\\end{tabular}}\n"
        latex_str += "\\end{table*}\n"

        # Save to file
        save_dir = Path(self.config.save_path)
        save_dir.mkdir(parents=True, exist_ok=True)
        output_file = save_dir / f'results_table_{self.config.statistic_type.value}.tex'
        with open(output_file, 'w') as f:
            f.write(latex_str)
        print(f"LaTeX table saved to: {output_file}")

        return latex_str


# =============================================================================
# Plot Generator Module
# =============================================================================

class PlotGenerator:
    """
    Class for generating various visualization plots.
    """

    def __init__(self, config: PlotConfig):
        """
        Initialize PlotGenerator with configuration.

        Parameters
        ----------
        config : PlotConfig
            Configuration object for plot generation.
        """
        self.config = config

    @staticmethod
    def _calculate_legend_fontsize(n_algorithms: int) -> int:
        """
        Calculate legend font size based on number of algorithms.

        Linear interpolation:
        - 2 algorithms → font size 14
        - 15 algorithms → font size 6

        Parameters
        ----------
        n_algorithms : int
            Number of algorithms.

        Returns
        -------
        int
            Calculated legend font size.
        """
        if n_algorithms <= 2:
            return 14
        elif n_algorithms >= 15:
            return 6
        else:
            # Linear interpolation: y = 14 - (14-6)/(15-2) * (x-2)
            return int(round(14 - (8 / 13) * (n_algorithms - 2)))

    def plot_convergence_curves(
            self,
            metric_values: Dict[str, Dict[str, Dict[int, Any]]],
            best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
            max_nfes: Dict[str, Dict[str, List[int]]],
            algorithm_order: List[str],
            metric_name: Optional[str] = None
    ) -> None:
        """
        Generate and save convergence curve plots for all algorithms, problems, and tasks.

        Parameters
        ----------
        metric_values : Dict[str, Dict[str, Dict[int, Any]]]
            Metric values per generation.
            Structure: metric_values[algorithm][problem][run] = List[np.ndarray]
        best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
            Best metric values for representative run selection.
        max_nfes : Dict[str, Dict[str, List[int]]]
            Maximum number of function evaluations per task.
            Structure: max_nfes[algorithm][problem] = List[int]
        algorithm_order : List[str]
            List of algorithm names to plot.
        metric_name : Optional[str], optional
            Metric name for y-axis label.

        Returns
        -------
        None
            Saves figures to disk.
        """
        problems = sorted(metric_values[algorithm_order[0]].keys())
        save_dir = Path(self.config.save_path)
        save_dir.mkdir(parents=True, exist_ok=True)

        if self.config.merge_plots:
            # Merged plot mode: all problems/tasks in one figure
            self._plot_merged_convergence(
                metric_values, best_values, max_nfes,
                algorithm_order, problems, metric_name, save_dir
            )
        else:
            # Separate plot mode: one figure per problem/task
            for prob in problems:
                first_run_data = best_values[algorithm_order[0]][prob][1]
                num_tasks = len(first_run_data)

                for task_idx in range(num_tasks):
                    fig = self._create_convergence_figure(
                        num_tasks, metric_values, best_values, max_nfes,
                        algorithm_order, prob, task_idx, metric_name
                    )

                    if num_tasks == 1:
                        output_file = save_dir / f'{prob}.{self.config.figure_format}'
                    else:
                        output_file = save_dir / f'{prob}-Task{task_idx + 1}.{self.config.figure_format}'

                    fig.savefig(output_file, dpi=300, bbox_inches='tight')
                    plt.close(fig)

            print(f"All convergence plots saved to: {save_dir}")

    def _create_convergence_figure(
            self,
            num_tasks: int,
            metric_values: Dict,
            best_values: Dict,
            max_nfes: Dict,
            algorithm_order: List[str],
            prob: str,
            task_idx: int,
            metric_name: Optional[str],
            ax: Optional[plt.Axes] = None,
            show_legend: bool = True
    ) -> plt.Figure:
        """
        Create a single convergence curve figure.

        Parameters
        ----------
        num_tasks : int
            Total number of tasks.
        metric_values : Dict
            Metric values dictionary.
        best_values : Dict
            Best values dictionary.
        max_nfes : Dict
            Max NFEs dictionary.
        algorithm_order : List[str]
            Algorithm order.
        prob : str
            Problem name.
        task_idx : int
            Task index.
        metric_name : Optional[str]
            Metric name for label.
        ax : Optional[plt.Axes], optional
            Existing axes to plot on. If None, creates new figure.
        show_legend : bool, optional
            Whether to show legend. Default: True.

        Returns
        -------
        plt.Figure
            Matplotlib figure object (None if ax was provided).
        """
        fig = None
        if ax is None:
            fig, ax = plt.subplots(figsize=(5, 3.5))

        # Collect curve data for y-axis range and max NFEs for x-axis formatting
        all_curves = []
        actual_max_nfes = 0

        # Adaptive line width and marker size based on number of algorithms
        n_algos = len(algorithm_order)
        if n_algos <= 4:
            markersize, linewidth = 8, 2.5
        elif n_algos <= 6:
            markersize, linewidth = 7, 2.0
        else:
            markersize, linewidth = 6, 1.6

        for idx, algo in enumerate(algorithm_order):
            if self.config.show_std_band:
                mean_curve, std_curve = self._get_convergence_mean_std(
                    metric_values, algo, prob, task_idx
                )
                curve = mean_curve
            else:
                selected_run = StatisticsCalculator.select_representative_run(
                    best_values, algo, prob, task_idx, self.config.statistic_type
                )
                curve = self._get_convergence_curve(metric_values, algo, prob, task_idx, selected_run)

            if len(curve) == 0:
                continue

            all_curves.append(curve)

            nfes = max_nfes[algo][prob][task_idx]
            actual_max_nfes = max(actual_max_nfes, nfes)
            x = np.linspace(0, nfes, len(curve))
            marker_interval = max(1, len(curve) // 10)

            color = self.config.colors[idx % len(self.config.colors)]

            ax.plot(
                x, curve, label=algo,
                color=color,
                marker=self.config.markers[idx % len(self.config.markers)],
                markevery=marker_interval,
                markersize=markersize, linewidth=linewidth, linestyle='-', alpha=0.7
            )

            if self.config.show_std_band and len(std_curve) > 0:
                ax.fill_between(
                    x, curve - std_curve, curve + std_curve,
                    alpha=0.15, color=color
                )

        # Set axis labels
        y_label = metric_name if metric_name is not None else 'Objective Value'
        ax.set_xlabel('NFEs', fontsize=14)
        ax.set_ylabel(y_label, fontsize=14)

        title = f'{prob}' if num_tasks == 1 else f'{prob} - Task {task_idx + 1}'
        ax.set_title(title, fontsize=14)
        ax.tick_params(axis='both', which='major', labelsize=14)

        # Auto-adjust legend font size based on number of algorithms
        if show_legend:
            legend_fontsize = self._calculate_legend_fontsize(len(algorithm_order))
            ax.legend(loc='best', fontsize=legend_fontsize)

        ax.grid(True, alpha=0.2, linestyle='-')

        # Apply axis formatting after all settings are complete
        if self.config.log_scale:
            ax.set_yscale('log')
            # Check data range; use linear scale if range is too small
            if len(all_curves) > 0:
                all_data = np.concatenate([c for c in all_curves])
                y_min, y_max = np.min(all_data), np.max(all_data)

                # Log scale ineffective for less than one order of magnitude
                if y_max / y_min < 10:
                    print(
                        f"Warning: Data range too small for log scale ({y_min:.4f} to {y_max:.4f}), using linear scale")
                    ax.set_yscale('linear')
                    self._apply_scientific_notation(ax, actual_xmax=actual_max_nfes)
                else:
                    # Use log scale, still need x-axis scientific notation
                    from matplotlib.ticker import LogFormatterSciNotation
                    ax.yaxis.set_major_formatter(LogFormatterSciNotation())
                    # Use scientific notation for x-axis if > 10000
                    if actual_max_nfes > 10000:
                        from matplotlib.ticker import ScalarFormatter
                        formatter = ScalarFormatter(useMathText=True)
                        formatter.set_scientific(True)
                        formatter.set_powerlimits((0, 0))
                        ax.xaxis.set_major_formatter(formatter)
                        ax.xaxis.major.formatter._useMathText = True
        else:
            # Apply scientific notation only for linear scale
            self._apply_scientific_notation(ax, actual_xmax=actual_max_nfes)

        # Disable minor ticks (must be called after set_yscale)
        ax.minorticks_off()

        if fig is not None:
            fig.tight_layout()
        return fig

    def _plot_merged_convergence(
            self,
            metric_values: Dict,
            best_values: Dict,
            max_nfes: Dict,
            algorithm_order: List[str],
            problems: List[str],
            metric_name: Optional[str],
            save_dir: Path
    ) -> None:
        """
        Create a merged figure with all convergence curves.

        Parameters
        ----------
        metric_values : Dict
            Metric values dictionary.
        best_values : Dict
            Best values dictionary.
        max_nfes : Dict
            Max NFEs dictionary.
        algorithm_order : List[str]
            Algorithm order.
        problems : List[str]
            List of problem names.
        metric_name : Optional[str]
            Metric name for label.
        save_dir : Path
            Directory to save the figure.
        """
        # Collect all subplot info (problem, task_idx)
        subplot_info = []
        for prob in problems:
            first_run_data = best_values[algorithm_order[0]][prob][1]
            num_tasks = len(first_run_data)
            for task_idx in range(num_tasks):
                subplot_info.append((prob, task_idx, num_tasks))

        n_plots = len(subplot_info)
        if n_plots == 0:
            return

        n_cols = self.config.merge_columns
        n_rows = (n_plots + n_cols - 1) // n_cols

        # Create figure with subplots
        fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 3.5 * n_rows))

        # Flatten axes array for easy iteration
        if n_rows == 1 and n_cols == 1:
            axes = np.array([[axes]])
        elif n_rows == 1:
            axes = axes.reshape(1, -1)
        elif n_cols == 1:
            axes = axes.reshape(-1, 1)

        axes_flat = axes.flatten()

        # Plot each subplot
        for i, (prob, task_idx, num_tasks) in enumerate(subplot_info):
            ax = axes_flat[i]
            self._create_convergence_figure(
                num_tasks, metric_values, best_values, max_nfes,
                algorithm_order, prob, task_idx, metric_name,
                ax=ax, show_legend=False
            )

        # Hide unused subplots
        for i in range(n_plots, len(axes_flat)):
            axes_flat[i].set_visible(False)

        # Add single legend at the top of the figure
        handles, labels = axes_flat[0].get_legend_handles_labels()
        legend_fontsize = 18

        # Calculate legend columns
        n_legend_cols = min(len(algorithm_order), 6)

        # First apply tight_layout to position subplots
        fig.tight_layout(h_pad=2.0, w_pad=1.5)

        # Get the top position of the first row of subplots (in figure coordinates)
        first_row_top = axes_flat[0].get_position().y1

        # Fixed padding between legend and first row (absolute size in cm)
        legend_padding_cm = 1.0
        fig_height_inch = fig.get_size_inches()[1]
        legend_padding = legend_padding_cm / 2.54 / fig_height_inch  # cm -> inch -> figure coords

        fig.legend(
            handles, labels,
            loc='lower center',
            ncol=n_legend_cols,
            fontsize=legend_fontsize,
            bbox_to_anchor=(0.5, first_row_top + legend_padding)
        )

        output_file = save_dir / f'convergence_merged.{self.config.figure_format}'
        fig.savefig(output_file, dpi=300, bbox_inches='tight')
        plt.close(fig)

        print(f"Merged convergence plot saved to: {output_file}")

    def _get_convergence_curve(
            self,
            metric_values: Dict,
            algo: str,
            prob: str,
            task_idx: int,
            run: Optional[int]
    ) -> np.ndarray:
        """
        Extract convergence curve for a specific configuration.

        Parameters
        ----------
        metric_values : Dict
            Metric values dictionary.
        algo : str
            Algorithm name.
        prob : str
            Problem name.
        task_idx : int
            Task index.
        run : Optional[int]
            Specific run number (None for mean across runs).

        Returns
        -------
        np.ndarray
            Convergence curve values.
        """
        if run is not None:
            return np.array(metric_values[algo][prob][run][task_idx])
        else:
            all_curves = []
            for r in metric_values[algo][prob].keys():
                curve = np.array(metric_values[algo][prob][r][task_idx])
                if len(curve) > 0:
                    all_curves.append(curve)

            if len(all_curves) == 0:
                return np.array([])

            min_len = min(len(c) for c in all_curves)
            truncated_curves = [c[:min_len] for c in all_curves]
            return np.mean(truncated_curves, axis=0)

    def _get_convergence_mean_std(
            self,
            metric_values: Dict,
            algo: str,
            prob: str,
            task_idx: int
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Compute mean and standard deviation of convergence curves across all runs.

        Parameters
        ----------
        metric_values : Dict
            Metric values dictionary.
        algo : str
            Algorithm name.
        prob : str
            Problem name.
        task_idx : int
            Task index.

        Returns
        -------
        Tuple[np.ndarray, np.ndarray]
            (mean_curve, std_curve). Both empty arrays if no data.
        """
        all_curves = []
        for r in metric_values[algo][prob].keys():
            curve = np.array(metric_values[algo][prob][r][task_idx])
            if len(curve) > 0:
                all_curves.append(curve)

        if len(all_curves) < 2:
            return np.array([]), np.array([])

        min_len = min(len(c) for c in all_curves)
        truncated_curves = np.array([c[:min_len] for c in all_curves])
        return np.mean(truncated_curves, axis=0).ravel(), np.std(truncated_curves, axis=0, ddof=1).ravel()

    def _apply_scientific_notation(
            self,
            ax: plt.Axes,
            actual_xmax: Optional[float] = None,
            x_threshold: float = 10000,
            y_threshold: float = 1000
    ) -> None:
        """
        Apply scientific notation to axes if values exceed threshold.

        Parameters
        ----------
        ax : plt.Axes
            Matplotlib axes object.
        actual_xmax : Optional[float], optional
            Actual maximum x value from data (not affected by matplotlib padding).
            If None, uses ax.get_xlim()[1].
        x_threshold : float, optional
            Threshold for x-axis to use scientific notation. Default is 10000.
        y_threshold : float, optional
            Threshold for y-axis to use scientific notation. Default is 1000.
        """
        from matplotlib.ticker import ScalarFormatter

        # Use actual data max to avoid inconsistency from matplotlib padding
        xmax = actual_xmax if actual_xmax is not None else ax.get_xlim()[1]
        ymax = ax.get_ylim()[1]

        # X-axis: use scientific notation if > threshold
        if xmax > x_threshold:
            formatter = ScalarFormatter(useMathText=True)
            formatter.set_scientific(True)
            formatter.set_powerlimits((0, 0))
            ax.xaxis.set_major_formatter(formatter)
            ax.xaxis.major.formatter._useMathText = True

        # Y-axis: use scientific notation if > threshold
        if ymax > y_threshold:
            formatter = ScalarFormatter(useMathText=True)
            formatter.set_scientific(True)
            formatter.set_powerlimits((0, 0))
            ax.yaxis.set_major_formatter(formatter)
            ax.yaxis.major.formatter._useMathText = True

    def plot_runtime(
            self,
            runtime: Dict[str, Dict[str, Dict[int, float]]],
            algorithm_order: List[str]
    ) -> None:
        """
        Generate and save a bar plot showing average runtime comparison.

        Parameters
        ----------
        runtime : Dict[str, Dict[str, Dict[int, float]]]
            Runtime dictionary.
            Structure: runtime[algorithm][problem][run] = float (seconds)
        algorithm_order : List[str]
            List of algorithm names in display order.

        Returns
        -------
        None
            Saves figure to disk.
        """
        problems = sorted(runtime[algorithm_order[0]].keys(),
                          key=lambda x: int(''.join(filter(str.isdigit, x))) if any(c.isdigit() for c in x) else x)
        save_dir = Path(self.config.save_path)
        save_dir.mkdir(parents=True, exist_ok=True)

        fig, ax = plt.subplots(figsize=(6, 3.5))

        n_algorithms = len(algorithm_order)
        n_problems = len(problems)
        bar_width = 0.8 / n_algorithms
        x_groups = np.arange(n_problems)

        for idx, algo in enumerate(algorithm_order):
            means = []
            stds = []

            for prob in problems:
                runtimes = [runtime[algo][prob][run] for run in runtime[algo][prob].keys()]
                means.append(np.mean(runtimes))

                # Only calculate std if there are at least 2 data points
                if len(runtimes) > 1:
                    stds.append(np.std(runtimes, ddof=1))
                else:
                    stds.append(0.0)  # No error bar for single data point

            x_offset = x_groups + (idx - n_algorithms / 2 + 0.5) * bar_width

            ax.bar(
                x_offset, means, bar_width,
                yerr=stds, label=algo,
                color=self.config.colors[idx % len(self.config.colors)],
                alpha=0.8, capsize=2,
                error_kw={'linewidth': 1.2, 'ecolor': 'black', 'alpha': 0.6}
            )

        ax.set_ylabel('Runtime (s)', fontsize=12)
        ax.set_xticks(x_groups)
        ax.set_xticklabels(problems, fontsize=12)
        ax.tick_params(axis='both', which='major', labelsize=10)
        ax.legend(loc='best', fontsize=12, framealpha=0.7)
        ax.grid(True, axis='y', alpha=0.3, linestyle='-')

        fig.tight_layout()

        output_file = save_dir / f'runtime_comparison.{self.config.figure_format}'
        fig.savefig(output_file, dpi=300, bbox_inches='tight')
        plt.close(fig)

        print(f"Runtime plot saved to: {output_file}")

    def plot_nd_solutions(
            self,
            best_values: Dict[str, Dict[str, Dict[int, List[float]]]],
            objective_values: Dict[str, Dict[str, Dict[int, List[np.ndarray]]]],
            algorithm_order: List[str],
            settings: Optional[Dict[str, Any]] = None
    ) -> None:
        """
        Generate and save non-dominated solution plots.

        Parameters
        ----------
        best_values : Dict[str, Dict[str, Dict[int, List[float]]]]
            Best values for representative run selection.
        objective_values : Dict[str, Dict[str, Dict[int, List[np.ndarray]]]]
            Original objective values.
            Structure: objective_values[algorithm][problem][run] = List[np.ndarray]
            where each np.ndarray has shape (n_solutions, n_objectives).
        algorithm_order : List[str]
            List of algorithm names.
        settings : Optional[Dict[str, Any]], optional
            Problem settings for loading true Pareto fronts.

        Returns
        -------
        None
            Saves figures to disk.
        """
        nd_folder = Path(self.config.save_path) / 'ND_Solutions'
        nd_folder.mkdir(parents=True, exist_ok=True)

        problems = list(objective_values[algorithm_order[0]].keys())

        if self.config.merge_plots:
            # Merged plot mode: all algorithms for each problem/task in one figure
            self._plot_merged_nd_solutions(
                best_values, objective_values, algorithm_order, problems, settings, nd_folder
            )
        else:
            # Separate plot mode: one figure per algorithm/problem/task
            for algo in algorithm_order:
                for prob in problems:
                    first_run = list(objective_values[algo][prob].keys())[0]
                    n_tasks = len(objective_values[algo][prob][first_run])

                    for task_idx in range(n_tasks):
                        first_run_objs = objective_values[algo][prob][first_run][task_idx]
                        n_objectives = first_run_objs.shape[1]

                        if n_objectives <= 1:
                            continue

                        selected_run = StatisticsCalculator.select_representative_run(
                            best_values, algo, prob, task_idx, self.config.statistic_type
                        )

                        if selected_run is None:
                            selected_run = 1

                        objectives = objective_values[algo][prob][selected_run][task_idx]

                        if objectives.shape[0] == 0:
                            continue

                        # Filter non-dominated solutions if requested
                        if self.config.show_nd:
                            front_no, _ = nd_sort(objectives, objectives.shape[0])
                            nd_solutions = objectives[front_no == 1]
                        else:
                            nd_solutions = objectives

                        # Load true Pareto front if requested
                        true_pf = None
                        if self.config.show_pf and settings is not None:
                            true_pf = DataUtils.load_reference(settings, prob, task_idx, M=n_objectives)

                        # Create appropriate plot based on number of objectives
                        fig = self._create_nd_plot(nd_solutions, true_pf, n_objectives, n_tasks, prob, task_idx, algo)

                        # Save figure
                        if n_tasks == 1:
                            filename = f'{prob}-{algo}.{self.config.figure_format}'
                        else:
                            filename = f'{prob}-Task{task_idx + 1}-{algo}.{self.config.figure_format}'

                        fig.savefig(nd_folder / filename, dpi=300)
                        plt.close(fig)

            print(f"All non-dominated solutions plots saved to: {nd_folder}\n")

    def _plot_merged_nd_solutions(
            self,
            best_values: Dict,
            objective_values: Dict,
            algorithm_order: List[str],
            problems: List[str],
            settings: Optional[Dict[str, Any]],
            nd_folder: Path
    ) -> None:
        """
        Create merged figures for non-dominated solutions.

        Each merged figure contains all algorithms for a specific problem/task.

        Parameters
        ----------
        best_values : Dict
            Best values dictionary.
        objective_values : Dict
            Objective values dictionary.
        algorithm_order : List[str]
            Algorithm order.
        problems : List[str]
            List of problem names.
        settings : Optional[Dict[str, Any]]
            Problem settings.
        nd_folder : Path
            Output folder path.
        """
        for prob in problems:
            first_algo = algorithm_order[0]
            first_run = list(objective_values[first_algo][prob].keys())[0]
            n_tasks = len(objective_values[first_algo][prob][first_run])

            for task_idx in range(n_tasks):
                # Get n_objectives for this specific task
                task_objs = objective_values[first_algo][prob][first_run][task_idx]
                n_objectives = task_objs.shape[1]

                if n_objectives <= 1:
                    continue

                n_algos = len(algorithm_order)
                # One column per algorithm, single row
                n_cols = n_algos
                n_rows = 1

                # Determine if 3D plot is needed
                is_3d = n_objectives == 3

                if is_3d:
                    fig = plt.figure(figsize=(4.5 * n_cols, 3.5 * n_rows))
                else:
                    fig, axes = plt.subplots(n_rows, n_cols, figsize=(4.5 * n_cols, 3.5 * n_rows))

                    # Flatten axes array
                    if n_cols == 1:
                        axes = np.array([axes])
                    axes_flat = axes.flatten()

                # Load true Pareto front once
                true_pf = None
                if self.config.show_pf and settings is not None:
                    true_pf = DataUtils.load_reference(settings, prob, task_idx, M=n_objectives)

                # Build subplot title prefix
                task_label = f'{prob} - Task {task_idx + 1}' if n_tasks > 1 else prob

                for idx, algo in enumerate(algorithm_order):
                    selected_run = StatisticsCalculator.select_representative_run(
                        best_values, algo, prob, task_idx, self.config.statistic_type
                    )
                    if selected_run is None:
                        selected_run = 1

                    objectives = objective_values[algo][prob][selected_run][task_idx]
                    if objectives.shape[0] == 0:
                        continue

                    # Filter non-dominated solutions
                    if self.config.show_nd:
                        front_no, _ = nd_sort(objectives, objectives.shape[0])
                        nd_solutions = objectives[front_no == 1]
                    else:
                        nd_solutions = objectives

                    # Subplot title: "P1 - Task 1 - RVEA" or "P1 - RVEA"
                    subplot_title = f'{task_label} - {algo}'

                    # Create subplot
                    if is_3d:
                        ax = fig.add_subplot(n_rows, n_cols, idx + 1, projection='3d')
                        self._plot_nd_subplot_3d(ax, nd_solutions, true_pf, subplot_title)
                    else:
                        ax = axes_flat[idx]
                        self._plot_nd_subplot_2d(ax, nd_solutions, true_pf, n_objectives, subplot_title)

                fig.tight_layout()

                # Save figure
                if n_tasks == 1:
                    filename = f'{prob}_merged.{self.config.figure_format}'
                else:
                    filename = f'{prob}-Task{task_idx + 1}_merged.{self.config.figure_format}'

                fig.savefig(nd_folder / filename, dpi=300, bbox_inches='tight')
                plt.close(fig)

        print(f"Merged non-dominated solutions plots saved to: {nd_folder}\n")

    def _plot_nd_subplot_2d(
            self,
            ax: plt.Axes,
            nd_solutions: np.ndarray,
            true_pf: Optional[np.ndarray],
            n_objectives: int,
            title: str
    ) -> None:
        """Plot 2D or parallel coordinates subplot for merged ND solutions."""
        if n_objectives == 2:
            if true_pf is not None and true_pf.shape[1] == 2:
                sort_idx = np.argsort(true_pf[:, 0])
                sorted_pf = true_pf[sort_idx]
                ax.scatter(sorted_pf[:, 0], sorted_pf[:, 1],
                           c='gray', s=2, linewidth=0.1, zorder=1)

            ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1],
                       c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
                       linewidth=0.8, zorder=2)

            ax.set_xlabel('$f_1$', fontsize=12)
            ax.set_ylabel('$f_2$', fontsize=12)
            ax.grid(True, alpha=0.2, linestyle='-')
        else:
            # Parallel coordinates
            for i in range(nd_solutions.shape[0]):
                ax.plot(range(n_objectives), nd_solutions[i, :],
                        'b-', alpha=0.3, linewidth=0.8)
            ax.set_xlabel('Objective', fontsize=12)
            ax.set_ylabel('Value', fontsize=12)
            ax.set_xticks(range(n_objectives))
            ax.set_xticklabels([rf'$f_{{{i + 1}}}$' for i in range(n_objectives)])
            ax.grid(True, alpha=0.3, linestyle='--')

        ax.set_title(title, fontsize=12)

    def _plot_nd_subplot_3d(
            self,
            ax: plt.Axes,
            nd_solutions: np.ndarray,
            true_pf: Optional[np.ndarray],
            title: str
    ) -> None:
        """Plot 3D subplot for merged ND solutions."""
        if true_pf is not None and true_pf.shape[1] == 3:
            ax.scatter(true_pf[:, 0], true_pf[:, 1], true_pf[:, 2],
                       c='gray', s=4, alpha=0.2, zorder=1, depthshade=True)

        ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1], nd_solutions[:, 2],
                   c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
                   linewidth=0.8, zorder=2, depthshade=True)

        ax.set_xlabel('$f_1$', fontsize=10)
        ax.set_ylabel('$f_2$', fontsize=10)
        ax.set_zlabel('$f_3$', fontsize=10)
        ax.set_title(title, fontsize=12)
        ax.view_init(elev=20, azim=60)

    def _create_nd_plot(
            self,
            nd_solutions: np.ndarray,
            true_pf: Optional[np.ndarray],
            n_objectives: int,
            n_tasks: int,
            prob: str,
            task_idx: int,
            algo: str
    ) -> plt.Figure:
        """
        Create a non-dominated solution plot.

        Parameters
        ----------
        nd_solutions : np.ndarray
            Non-dominated solutions array with shape (n_solutions, n_objectives).
        true_pf : Optional[np.ndarray]
            True Pareto front array.
        n_objectives : int
            Number of objectives.
        n_tasks : int
            Total number of tasks.
        prob : str
            Problem name.
        task_idx : int
            Task index.
        algo : str
            Algorithm name.

        Returns
        -------
        plt.Figure
            Matplotlib figure object.
        """
        fig = plt.figure(figsize=(4.5, 3.5))

        if n_objectives == 2:
            ax = fig.add_subplot(111)

            if true_pf is not None and true_pf.shape[1] == 2:
                sort_idx = np.argsort(true_pf[:, 0])
                sorted_pf = true_pf[sort_idx]
                ax.scatter(sorted_pf[:, 0], sorted_pf[:, 1],
                           c='gray', s=2, linewidth=0.1, label='True PF', zorder=1)

            ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1],
                       c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
                       linewidth=0.8, label='ND Solutions', zorder=2)

            ax.set_xlabel('$f_1$', fontsize=12)
            ax.set_ylabel('$f_2$', fontsize=12)
            ax.grid(True, alpha=0.2, linestyle='-')

        elif n_objectives == 3:
            ax = fig.add_subplot(111, projection='3d')

            if true_pf is not None and true_pf.shape[1] == 3:
                ax.scatter(true_pf[:, 0], true_pf[:, 1], true_pf[:, 2],
                           c='gray', s=4, alpha=0.2, label='True PF', zorder=1, depthshade=True)

            ax.scatter(nd_solutions[:, 0], nd_solutions[:, 1], nd_solutions[:, 2],
                       c='dodgerblue', s=60, alpha=0.8, edgecolors='black',
                       linewidth=0.8, label='ND Solutions', zorder=2, depthshade=True)

            ax.set_xlabel('$f_1$', fontsize=12)
            ax.set_ylabel('$f_2$', fontsize=12)
            ax.set_zlabel('$f_3$', fontsize=12)

            ax.view_init(elev=20, azim=60)

        else:
            # Parallel coordinates for many-objective
            ax = fig.add_subplot(111)

            for i in range(nd_solutions.shape[0]):
                ax.plot(range(n_objectives), nd_solutions[i, :],
                        'b-', alpha=0.3, linewidth=0.8)

            ax.set_xlabel('Objective', fontsize=12)
            ax.set_ylabel('Value', fontsize=12)
            ax.set_xticks(range(n_objectives))
            ax.set_xticklabels([rf'$f_{{{i + 1}}}$' for i in range(n_objectives)])
            ax.grid(True, alpha=0.3, linestyle='--')

        title = f'{prob} - {algo}' if n_tasks == 1 else f'{prob} - Task{task_idx + 1} - {algo}'
        plt.title(title, fontsize=10)
        plt.tight_layout()

        return fig


# =============================================================================
# Main Data Analyzer Class
# =============================================================================

[docs] class DataAnalyzer: """ Main class for comprehensive data analysis and visualization of multi-task optimization experiments. This class provides a complete pipeline for: - Scanning data directories to detect algorithms, problems, and runs - Calculating performance metrics (IGD, HV, or objective values) - Generating statistical comparison tables (Excel or LaTeX) - Creating convergence curve plots - Visualizing runtime comparisons - Plotting non-dominated solutions Attributes ---------- data_path : Path Path to the data directory containing experiment results. settings : Optional[Dict[str, Any]] Problem settings including reference definitions and metric configuration. algorithm_order : Optional[List[str]] Custom ordering of algorithms for display. table_config : TableConfig Configuration for table generation. plot_config : PlotConfig Configuration for plot generation. """ def __init__( self, data_path: Union[str, Path] = './Data', settings: Optional[Dict[str, Any]] = None, algorithm_order: Optional[List[str]] = None, save_path: Union[str, Path] = './Results', table_format: str = 'excel', figure_format: str = 'pdf', statistic_type: str = 'mean', significance_level: float = 0.05, rank_sum_test: bool = True, log_scale: bool = False, show_pf: bool = True, show_nd: bool = True, merge_plots: bool = False, merge_columns: int = 3, show_std_band: bool = False, best_so_far: bool = True, clear_results: bool = True, convergence_k: Optional[int] = None ): """ Initialize DataAnalyzer with configuration parameters. Parameters ---------- data_path : Union[str, Path], optional Path to data directory containing algorithm subdirectories. Each subdirectory should contain pickle files named: ALGO_problem_run.pkl Default: './Data' settings : Optional[Dict[str, Any]], optional Problem settings dictionary containing: - Problem names as keys (e.g., 'P1', 'P2') - Task definitions as nested dictionaries - 'metric': str ('IGD' or 'HV') - 'ref_path': str (path to reference files) - 'n_ref': int (number of reference points) Default: None (single-objective mode) algorithm_order : Optional[List[str]], optional Custom ordering of algorithms for display. The last algorithm is used as baseline for statistical tests. Default: None (alphabetical order) save_path : Union[str, Path], optional Directory path to save all output files. Default: './Results' table_format : str, optional Output table format: 'excel' or 'latex'. Default: 'excel' figure_format : str, optional Output figure format: 'pdf', 'png', 'svg', etc. Default: 'pdf' statistic_type : str, optional Type of statistic: 'mean', 'median', 'max', 'min'. Default: 'mean' significance_level : float, optional P-value threshold for statistical significance testing. Default: 0.05 rank_sum_test : bool, optional Whether to perform Wilcoxon rank-sum test. Default: True log_scale : bool, optional Whether to use logarithmic scale for convergence plot y-axis. Default: False show_pf : bool, optional Whether to show true Pareto front in ND solution plots. Default: True show_nd : bool, optional Whether to filter and show only non-dominated solutions. Default: True merge_plots : bool, optional Whether to merge all plots into a single figure. Default: False merge_columns : int, optional Number of columns in merged plot layout. Default: 3 show_std_band : bool, optional Whether to show standard deviation band on convergence curves. Default: False best_so_far : bool, optional Whether to use best-so-far metric values. Default: True clear_results : bool, optional Whether to clear existing results folder before analysis. Default: True convergence_k : Optional[int], optional Number of data points to sample from convergence curves for export. If None, no convergence data is exported. Default: None """ self.data_path = Path(data_path) self.settings = settings self.algorithm_order = algorithm_order self.best_so_far = best_so_far self.clear_results = clear_results # Parse enums stat_type = StatisticType(statistic_type) tbl_format = TableFormat(table_format) # Initialize configurations self.table_config = TableConfig( table_format=tbl_format, statistic_type=stat_type, significance_level=significance_level, rank_sum_test=rank_sum_test, save_path=Path(save_path) ) self.plot_config = PlotConfig( figure_format=figure_format, statistic_type=stat_type, log_scale=log_scale, show_pf=show_pf, show_nd=show_nd, merge_plots=merge_plots, merge_columns=merge_columns, show_std_band=show_std_band, save_path=Path(save_path) ) self.convergence_k = convergence_k # Internal state self._scan_result: Optional[ScanResult] = None self._metric_results: Optional[MetricResults] = None def scan_data(self) -> ScanResult: """ Scan the data directory to detect algorithms, problems, run counts. Returns ------- ScanResult Dataclass containing: - algorithms: List[str] - Sorted list of algorithm names - problems: List[str] - Sorted list of problem names - runs: int - Number of independent runs - data_path: Path - Path to scanned directory Raises ------ FileNotFoundError If data_path does not exist. ValueError If no algorithm directories or pickle files found. """ algorithms = [] problems = [] runs_dict = {} for algo_dir in [d for d in self.data_path.iterdir() if d.is_dir()]: algo = algo_dir.name algorithms.append(algo) runs_dict[algo] = {} for pkl in algo_dir.glob('*.pkl'): stem = pkl.stem prefix = algo + '_' if stem.startswith(prefix): remainder = stem[len(prefix):] last_underscore = remainder.rfind('_') if last_underscore > 0: prob = remainder[:last_underscore] runs_dict[algo].setdefault(prob, []).append(pkl) if prob not in problems: problems.append(prob) algorithms.sort() problems.sort(key=lambda x: int(''.join(filter(str.isdigit, x))) if any(c.isdigit() for c in x) else x) # problems.sort() first_algo = algorithms[0] first_prob = problems[0] runs = len(runs_dict[first_algo][first_prob]) print(f"Found {len(algorithms)} algorithms: {algorithms}") print(f"Found {len(problems)} problems: {problems}") print(f"Run times: {runs}") self._scan_result = ScanResult( algorithms=algorithms, problems=problems, runs=runs, data_path=self.data_path ) return self._scan_result def calculate_metrics(self) -> MetricResults: """ Calculate metric values for all algorithms, problems, and runs. Returns ------- MetricResults Dataclass containing all computed metrics: - metric_values: Metric values per generation - best_values: Final best metric values - objective_values: Original objective values - runtime: Runtime in seconds - max_nfes: Maximum function evaluations - metric_name: Name of metric used Raises ------ RuntimeError If scan_data() has not been called. """ if self._scan_result is None: self.scan_data() scan = self._scan_result algo_order = self.algorithm_order if self.algorithm_order else scan.algorithms metric_name = self.settings.get('metric') if self.settings else None # Initialize storage dictionaries all_values = {algo: {prob: {} for prob in scan.problems} for algo in algo_order} all_values_best_so_far = {algo: {prob: {} for prob in scan.problems} for algo in algo_order} all_best_values = {algo: {prob: {} for prob in scan.problems} for algo in algo_order} original_objective_values = {algo: {prob: {} for prob in scan.problems} for algo in algo_order} all_runtime = {algo: {prob: {} for prob in scan.problems} for algo in algo_order} all_max_nfes = {algo: {prob: None for prob in scan.problems} for algo in algo_order} total = len(algo_order) * len(scan.problems) * scan.runs pbar = tqdm(total=total, desc="Calculating metric values", dynamic_ncols=False, delay=0.2) for algo in algo_order: for prob in scan.problems: for run in range(1, scan.runs + 1): pkl_file = f"{algo}_{prob}_{run}.pkl" pkl_path = self.data_path / algo / pkl_file data = DataUtils.load_pickle(pkl_path) metric_values, metric_values_best_bs = self._get_single_run_metric_value(data, prob) all_values[algo][prob][run] = metric_values all_values_best_so_far[algo][prob][run] = metric_values_best_bs last_vals = [ np.asarray(task_arr).ravel()[-1] if len(task_arr) > 0 else np.nan for task_arr in metric_values_best_bs ] all_best_values[algo][prob][run] = last_vals last_objs = [data['all_objs'][t][-1] for t in range(len(data['all_objs']))] original_objective_values[algo][prob][run] = last_objs all_runtime[algo][prob][run] = data['runtime'] if all_max_nfes[algo][prob] is None: all_max_nfes[algo][prob] = data['max_nfes'] pbar.update(1) pbar.close() selected = all_values_best_so_far if self.best_so_far else all_values self._metric_results = MetricResults( metric_values=selected, best_values=all_best_values, objective_values=original_objective_values, runtime=all_runtime, max_nfes=all_max_nfes, metric_name=metric_name ) return self._metric_results def _get_single_run_metric_value( self, data: Dict[str, Any], prob: str ) -> Tuple[List[np.ndarray], List[np.ndarray]]: """ Calculate metric values for a single run. Parameters ---------- data : Dict[str, Any] Loaded pickle data containing 'all_objs' key. prob : str Problem name for loading references. Returns ------- Tuple[List[np.ndarray], List[np.ndarray]] Tuple of (metric_values, metric_values_best_so_far). Each is a list of arrays, one per task. """ all_decs = data['all_decs'] all_objs = data['all_objs'] all_cons = data.get('all_cons', None) n_tasks = len(all_objs) n_gens_per_task = [len(all_objs[t]) for t in range(n_tasks)] metric_values = [np.zeros((n_gens_per_task[t], 1)) for t in range(n_tasks)] metric_values_best_so_far = [np.zeros((n_gens_per_task[t], 1)) for t in range(n_tasks)] for t in range(n_tasks): task_key = f'T{t + 1}' best_so_far = None reference = None if self.settings is not None and n_gens_per_task[t] > 0: M = all_objs[t][0].shape[1] D = all_decs[t][0].shape[1] C = all_cons[t][0].shape[1] if all_cons is not None else 0 reference = DataUtils.load_reference( self.settings, prob, task_key, M=M, D=D, C=C ) for gen in range(n_gens_per_task[t]): objs_tgen = all_objs[t][gen] cons_tgen = all_cons[t][gen] if all_cons is not None else None M = objs_tgen.shape[1] if M == 1: metric_value = np.min(objs_tgen[:, 0]) sign = -1 else: if self.settings is None: raise ValueError('Multi-objective metric calculation requires settings parameter') metric_name = self.settings.get('metric') if metric_name == 'IGD': metric_instance = IGD() metric_value = metric_instance.calculate(objs_tgen, reference) sign = metric_instance.sign elif metric_name == 'HV': metric_instance = HV() # If reference is 1D or single row, treat as ref point; otherwise as PF if reference.ndim == 1 or reference.shape[0] == 1: ref_point = reference.flatten() metric_value = metric_instance.calculate(objs_tgen, reference=ref_point) else: metric_value = metric_instance.calculate(objs_tgen, pf=reference) sign = metric_instance.sign elif metric_name == 'IGDp': metric_instance = IGDp() metric_value = metric_instance.calculate(objs_tgen, reference) sign = metric_instance.sign elif metric_name == 'GD': metric_instance = GD() metric_value = metric_instance.calculate(objs_tgen, reference) sign = metric_instance.sign elif metric_name == 'DeltaP': metric_instance = DeltaP() metric_value = metric_instance.calculate(objs_tgen, reference) sign = metric_instance.sign elif metric_name == 'Spacing': metric_instance = Spacing() metric_value = metric_instance.calculate(objs_tgen) sign = metric_instance.sign elif metric_name == 'Spread': metric_instance = Spread() metric_value = metric_instance.calculate(objs_tgen, reference) sign = metric_instance.sign elif metric_name == 'FR': if cons_tgen is None: raise ValueError('FR metric requires constraint data, but all_cons is not available') metric_instance = FR() metric_value = metric_instance.calculate(cons_tgen) sign = metric_instance.sign elif metric_name == 'CV': if cons_tgen is None: raise ValueError('CV metric requires constraint data, but all_cons is not available') metric_instance = CV() metric_value = metric_instance.calculate(cons_tgen) sign = metric_instance.sign else: raise ValueError(f'Unsupported metric: {metric_name}') metric_values[t][gen, 0] = metric_value if best_so_far is None: best_so_far = metric_value else: if sign == -1: best_so_far = min(best_so_far, metric_value) else: best_so_far = max(best_so_far, metric_value) metric_values_best_so_far[t][gen, 0] = best_so_far return metric_values, metric_values_best_so_far def generate_tables(self) -> Union[pd.DataFrame, str]: """ Generate comparison tables with statistical analysis. Returns ------- Union[pd.DataFrame, str] DataFrame for Excel format, LaTeX string for LaTeX format. Raises ------ RuntimeError If calculate_metrics() has not been called. """ if self._metric_results is None: self.calculate_metrics() algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms table_gen = TableGenerator(self.table_config) return table_gen.generate( self._metric_results.best_values, algo_order, self._metric_results.metric_name ) def generate_convergence_plots(self) -> None: """ Generate and save convergence curve plots. Returns ------- None Saves figures to disk at configured save_path. Raises ------ RuntimeError If calculate_metrics() has not been called. """ if self._metric_results is None: self.calculate_metrics() algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms plot_gen = PlotGenerator(self.plot_config) plot_gen.plot_convergence_curves( self._metric_results.metric_values, self._metric_results.best_values, self._metric_results.max_nfes, algo_order, self._metric_results.metric_name ) def generate_runtime_plots(self) -> None: """ Generate and save runtime comparison bar plots. Returns ------- None Saves figure to disk at configured save_path. Raises ------ RuntimeError If calculate_metrics() has not been called. """ if self._metric_results is None: self.calculate_metrics() algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms plot_gen = PlotGenerator(self.plot_config) plot_gen.plot_runtime(self._metric_results.runtime, algo_order) def generate_nd_solution_plots(self) -> None: """ Generate and save non-dominated solution visualization plots. Returns ------- None Saves figures to disk at configured save_path/ND_Solutions/. Raises ------ RuntimeError If calculate_metrics() has not been called. """ if self._metric_results is None: self.calculate_metrics() algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms plot_gen = PlotGenerator(self.plot_config) plot_gen.plot_nd_solutions( self._metric_results.best_values, self._metric_results.objective_values, algo_order, self.settings ) def export_convergence_data(self, k: Optional[int] = None) -> None: """ Export convergence curve data to text files. For each problem-task combination, exports a file containing evaluation counts paired with convergence values for all algorithms. Parameters ---------- k : Optional[int], optional Number of data points to sample from each convergence curve. If None, uses self.convergence_k. If both are None, exports all points. """ if self._metric_results is None: self.calculate_metrics() k = k if k is not None else self.convergence_k algo_order = self.algorithm_order if self.algorithm_order else self._scan_result.algorithms metric_values = self._metric_results.metric_values best_values = self._metric_results.best_values max_nfes = self._metric_results.max_nfes problems = sorted(metric_values[algo_order[0]].keys()) save_dir = Path(self.plot_config.save_path) / 'Convergence_Data' save_dir.mkdir(parents=True, exist_ok=True) plot_gen = PlotGenerator(self.plot_config) for prob_idx, prob in enumerate(problems): first_run_data = best_values[algo_order[0]][prob][1] num_tasks = len(first_run_data) for task_idx in range(num_tasks): filename = f'Problem{prob_idx + 1}_task{task_idx + 1}.txt' filepath = save_dir / filename with open(filepath, 'w') as f: for algo in algo_order: selected_run = StatisticsCalculator.select_representative_run( best_values, algo, prob, task_idx, self.plot_config.statistic_type ) curve = plot_gen._get_convergence_curve( metric_values, algo, prob, task_idx, selected_run ) if len(curve) == 0: continue nfes = max_nfes[algo][prob][task_idx] x = np.linspace(0, nfes, len(curve)) # Sample k points if requested if k is not None and k > 0 and len(curve) > k: indices = np.linspace(0, len(curve) - 1, k, dtype=int) x = x[indices] curve = curve[indices] f.write(f'# Algorithm: {algo}\n') f.write(f'# NFEs\tValue\n') for xi, yi in zip(x, curve): f.write(f'{float(xi):.6g}\t{float(yi):.6g}\n') f.write('\n') print(f"Convergence data exported to: {save_dir}")
[docs] def run(self) -> MetricResults: """ Execute the complete analysis pipeline. This method runs all analysis steps in sequence: 1. Clear existing results (if configured) 2. Scan data directory 3. Calculate metrics 4. Generate statistical tables 5. Generate convergence plots 6. Generate runtime plots 7. Generate non-dominated solution plots Returns ------- MetricResults Complete metric results from the analysis. """ print("=" * 60) print('🚀🚀🚀 Starting Data Analysis Pipeline! 🚀🚀🚀') print("=" * 60) # Step 0: Clear results folder if requested if self.clear_results: results_path = self.table_config.save_path if results_path.exists(): print(f'\n♻️ Clearing existing results folder: {results_path}') shutil.rmtree(results_path) results_path.mkdir(parents=True, exist_ok=True) # Step 1: Scan data print('\n🔍 Scanning data directory...') self.scan_data() # Step 2: Calculate metrics print('\n📊 Calculating metric values...') self.calculate_metrics() # Step 3: Generate tables print('\n📋 Generating statistical tables...') self.generate_tables() # Step 4: Plot convergence curves print('\n📈 Plotting convergence curves...') self.generate_convergence_plots() # Step 4.5: Export convergence data if self.convergence_k is not None: print('\n📂 Exporting convergence data...') self.export_convergence_data() # Step 5: Plot runtime print('\n⏱️ Plotting runtime comparison...') self.generate_runtime_plots() # Step 6: Plot non-dominated solutions print('\n🎯 Plotting non-dominated solutions...') self.generate_nd_solution_plots() print("=" * 60) print('🎉🎉🎉 Data Analysis Completed! 🎉🎉🎉') print("=" * 60) return self._metric_results
# ============================================================================= # Module Entry Point and Usage Examples # ============================================================================= if __name__ == '__main__': """ Usage Examples for DataAnalyzer Module ====================================== This module provides a comprehensive analysis pipeline for multi-task optimization experiments. Below are various usage patterns. Example 1: Quick Start - Full Pipeline -------------------------------------- Run complete analysis with default settings:: from data_analyzer import DataAnalyzer analyzer = DataAnalyzer(data_path='./Data') results = analyzer.run() Example 2: Multi-Objective Optimization with Custom Settings ------------------------------------------------------------ Analyze multi-objective results with IGD metric:: from data_analyzer import DataAnalyzer # Define problem settings with Pareto front references SETTINGS = { 'metric': 'IGD', 'ref_path': './MOReference', 'n_ref': 10000, 'P1': { 'T1': 'P1_T1_ref.npy', 'T2': 'P1_T2_ref.npy', }, 'P2': { 'T1': lambda n, m: generate_pf(n, m), # Callable reference } } analyzer = DataAnalyzer( data_path='./Data', settings=SETTINGS, save_path='./Results', table_format='latex', figure_format='pdf' ) results = analyzer.run() Example 3: Step-by-Step Analysis -------------------------------- Execute individual analysis steps for fine-grained control:: from data_analyzer import DataAnalyzer analyzer = DataAnalyzer( data_path='./Data', settings=SETTINGS, algorithm_order=['NSGA-II', 'MOEA/D', 'MyAlgo'], # Last is baseline clear_results=False ) # Step 1: Scan data directory scan_result = analyzer.scan_data() print(f"Found algorithms: {scan_result.algorithms}") print(f"Found problems: {scan_result.problems}") # Step 2: Calculate metrics metric_results = analyzer.calculate_metrics() # Step 3: Generate only specific outputs analyzer.generate_tables() # Statistical comparison tables analyzer.generate_convergence_plots() # Convergence curves analyzer.generate_runtime_plots() # Runtime bar charts analyzer.generate_nd_solution_plots() # Pareto front visualizations Example 4: Custom Table Generation ---------------------------------- Generate tables with specific statistical settings:: from data_analyzer import ( DataAnalyzer, TableGenerator, TableConfig, TableFormat, StatisticType ) # Create custom table configuration table_config = TableConfig( table_format=TableFormat.LATEX, statistic_type=StatisticType.MEDIAN, significance_level=0.01, rank_sum_test=True, save_path=Path('./CustomResults') ) # Use with analyzer analyzer = DataAnalyzer(data_path='./Data', settings=SETTINGS) analyzer.scan_data() analyzer.calculate_metrics() # Generate table with custom config table_gen = TableGenerator(table_config) latex_table = table_gen.generate( analyzer._metric_results.best_values, algorithm_order=['Algo1', 'Algo2', 'Baseline'], metric_name='IGD' ) Example 5: Custom Plot Generation --------------------------------- Create plots with specific visual settings:: from data_analyzer import DataAnalyzer, PlotGenerator, PlotConfig, StatisticType # Create custom plot configuration plot_config = PlotConfig( figure_format='png', statistic_type=StatisticType.MEDIAN, log_scale=True, show_pf=True, show_nd=True, save_path=Path('./Figures'), colors=['#E41A1C', '#377EB8', '#4DAF4A'], # Custom colors markers=['o', 's', '^'] ) analyzer = DataAnalyzer(data_path='./Data', settings=SETTINGS) analyzer.calculate_metrics() # Generate plots with custom config plot_gen = PlotGenerator(plot_config) plot_gen.plot_convergence_curves( analyzer._metric_results.metric_values, analyzer._metric_results.best_values, analyzer._metric_results.max_nfes, algorithm_order=['Algo1', 'Algo2'], metric_name='IGD' ) Example 6: Customizing Plot Font Sizes and Legend ------------------------------------------------- Control font sizes, line styles, and legend appearance in convergence plots. The following parameters can be customized in PlotGenerator._create_convergence_figure(): Font sizes (hardcoded, modify in source if needed):: ax.set_xlabel('NFEs', fontsize=14) # X-axis label font size ax.set_ylabel(y_label, fontsize=14) # Y-axis label font size ax.set_title(title, fontsize=14) # Title font size ax.tick_params(..., labelsize=14) # Tick label font size Line width and marker size (adaptive based on algorithm count):: # 1-4 algorithms: markersize=8, linewidth=2.5 # 5-6 algorithms: markersize=7, linewidth=2.0 # 7+ algorithms: markersize=6, linewidth=1.6 Legend font size (adaptive, see _calculate_legend_fontsize()):: # 2 algorithms: fontsize=14 # 15 algorithms: fontsize=6 # Linear interpolation for values in between Merged plot legend (fixed size):: # In _plot_combined_convergence_for_problem(): legend_fontsize = 18 # Fixed legend font size legend_padding_cm = 0.3 # Gap between legend and plots (cm) n_legend_cols = min(len(algorithms), 6) # Max 6 columns in legend To modify these values, edit the corresponding methods in PlotGenerator class. Example 7: Access Raw Results ----------------------------- Access computed metrics for custom analysis:: from data_analyzer import DataAnalyzer analyzer = DataAnalyzer(data_path='./Data', settings=SETTINGS) results = analyzer.run() # Access metric values # Structure: results.metric_values[algo][problem][run][task_idx] algo1_p1_run1_task0 = results.metric_values['Algo1']['P1'][1][0] # Access best values # Structure: results.best_values[algo][problem][run] = [task0_val, task1_val, ...] best_vals = results.best_values['Algo1']['P1'][1] # Access objective values (Pareto solutions) # Structure: results.objective_values[algo][problem][run][task_idx] = np.ndarray pareto_solutions = results.objective_values['Algo1']['P1'][1][0] # Access runtime runtime_seconds = results.runtime['Algo1']['P1'][1] # Access max NFEs per task max_nfes_list = results.max_nfes['Algo1']['P1'] Example 8: Using Utility Classes Directly ----------------------------------------- Use statistics and data utilities independently:: from data_analyzer import ( StatisticsCalculator, DataUtils, StatisticType, OptimizationDirection ) import numpy as np # Calculate statistics data = [1.2, 1.5, 1.1, 1.3, 1.4] mean, std = StatisticsCalculator.calculate_statistic(data, StatisticType.MEAN) # Perform statistical comparison algo_data = [1.0, 1.1, 0.9, 1.2] base_data = [2.0, 2.1, 1.9, 2.2] result = StatisticsCalculator.perform_rank_sum_test( algo_data, base_data, significance_level=0.05, direction=OptimizationDirection.MINIMIZE ) print(f"Comparison: {result.symbol}, p-value: {result.p_value}") # Load reference data reference = DataUtils.load_reference( settings=SETTINGS, problem='P1', task_identifier=0, # or 'T1' n_objectives=2 ) Data Directory Structure ------------------------ Expected directory structure for input data:: ./Data/ ├── Algorithm1/ │ ├── Algorithm1_Problem1_1.pkl │ ├── Algorithm1_Problem1_2.pkl │ ├── Algorithm1_Problem2_1.pkl │ └── ... ├── Algorithm2/ │ ├── Algorithm2_Problem1_1.pkl │ └── ... └── ... Each .pkl file should contain a dictionary with keys: - 'all_objs': List[List[np.ndarray]] - Objectives per task per generation - 'runtime': float - Total runtime in seconds - 'max_nfes': List[int] - Max function evaluations per task Output Structure ---------------- Generated output files:: ./Results/ ├── results_table_mean.xlsx # or .tex for LaTeX ├── Problem1.pdf # Convergence plot (single task) ├── Problem2-Task1.pdf # Convergence plot (multi-task) ├── Problem2-Task2.pdf ├── runtime_comparison.pdf # Runtime bar chart └── ND_Solutions/ ├── Problem1-Algorithm1.pdf # Pareto front plot ├── Problem1-Algorithm2.pdf └── ... """ # Demo: Run analysis with sample configuration print("DataAnalyzer Module - Demo Run") print("=" * 50) # Example configuration (modify paths as needed) analyzer = DataAnalyzer( data_path='./Data', save_path='./Results', table_format='excel', figure_format='pdf', statistic_type='mean', significance_level=0.05, rank_sum_test=True, log_scale=False, show_pf=True, show_nd=True, clear_results=True ) # Run complete analysis pipeline results = analyzer.run()