Source code for ddmtolab.Methods.mtop

import numpy as np
from typing import Callable, List, Tuple, Optional, Dict, Any, Union


class ObjectiveFunctionWrapper:
    """
    Pickle-compatible objective function wrapper for cross-platform parallel execution.

    This wrapper ensures that objective functions can be serialized and passed
    between processes in parallel computing environments across different platforms.

    Parameters
    ----------
    func : Callable
        The objective function to wrap.
    dim : int
        Dimension of the decision variables.

    Attributes
    ----------
    func : Callable
        The wrapped objective function.
    dim : int
        Dimension of the decision variables.

    Notes
    -----
    The wrapper normalizes function outputs to a consistent 2D array format
    with shape (n_samples, n_objectives), handling both vectorized and
    non-vectorized function implementations.

    Examples
    --------
    >>> def my_objective(x):
    ...     return np.sum(x**2, axis=1)
    >>> wrapper = ObjectiveFunctionWrapper(my_objective, dim=3)
    >>> X = np.array([[1, 2, 3], [4, 5, 6]])
    >>> result = wrapper(X)
    >>> result.shape
    (2, 1)
    """

    def __init__(self, func: Callable, dim: int):
        self.func = func
        self.dim = dim

    def __call__(self, X: np.ndarray) -> np.ndarray:
        """
        Evaluate the objective function on input samples.

        Parameters
        ----------
        X : np.ndarray
            Input array of shape (n_samples, dim) or (dim,).

        Returns
        -------
        np.ndarray
            Objective values of shape (n_samples, n_objectives).

        Raises
        ------
        ValueError
            If the output cannot be aligned to the input batch size.
        """
        X = np.atleast_2d(X)
        n = X.shape[0]
        try:
            out = self.func(X)  # try vectorized call
        except Exception:
            # try per-row
            rows = []
            for i in range(n):
                r = self.func(X[i])
                rows.append(np.atleast_1d(r))
            out = np.vstack(rows)

        out = np.asarray(out)
        # Normalize to 2D with n rows
        if out.ndim == 0:
            out = np.full((n, 1), float(out))
        elif out.ndim == 1:
            # ambiguous: if length == n -> (n,1); else if n==1 -> (1,len)
            if out.shape[0] == n:
                out = out.reshape(n, 1)
            else:
                if n == 1:
                    out = out.reshape(1, -1)
                else:
                    # cannot align
                    raise ValueError("Objective returned 1D array that cannot be aligned to input batch.")
        elif out.ndim == 2:
            if out.shape[0] != n:
                # maybe user returned shape (n_obj, n) accidental -> try transpose
                if out.shape[1] == n:
                    out = out.T
                else:
                    raise ValueError("Objective returned 2D array with incompatible number of rows.")
        else:
            raise ValueError("Objective returned array with ndim > 2, unsupported.")
        return out.astype(np.float64)


class ConstraintFunctionWrapper:
    """
    Pickle-compatible constraint function wrapper for cross-platform parallel execution.

    This wrapper ensures that constraint functions can be serialized and passed
    between processes in parallel computing environments across different platforms.

    Parameters
    ----------
    func : Callable
        The constraint function to wrap.
    k_local : int
        Number of constraints returned by this function.

    Attributes
    ----------
    func : Callable
        The wrapped constraint function.
    k_local : int
        Number of constraints.

    Notes
    -----
    The wrapper normalizes function outputs to a consistent 2D array format
    with shape (n_samples, k_local), handling both vectorized and
    non-vectorized function implementations.

    Examples
    --------
    >>> def my_constraint(x):
    ...     return x[0] - 0.5
    >>> wrapper = ConstraintFunctionWrapper(my_constraint, k_local=1)
    >>> X = np.array([[0.3, 0.4], [0.6, 0.7]])
    >>> result = wrapper(X)
    >>> result.shape
    (2, 1)
    """

    def __init__(self, func: Callable, k_local: int):
        self.func = func
        self.k_local = k_local

    def __call__(self, X: np.ndarray) -> np.ndarray:
        """
        Evaluate the constraint function on input samples.

        Parameters
        ----------
        X : np.ndarray
            Input array of shape (n_samples, dim) or (dim,).

        Returns
        -------
        np.ndarray
            Constraint values of shape (n_samples, k_local).

        Raises
        ------
        ValueError
            If the output cannot be aligned to the input batch size.
        """
        X = np.atleast_2d(X)
        n = X.shape[0]
        try:
            out = self.func(X)
        except Exception:
            rows = []
            for i in range(n):
                r = self.func(X[i])
                rows.append(np.atleast_1d(r))
            out = np.vstack(rows)
        out = np.asarray(out)
        # normalize to (n, k_local)
        if out.ndim == 0:
            out = np.full((n, 1), float(out))
        elif out.ndim == 1:
            if out.shape[0] == n:
                out = out.reshape(n, 1)
            else:
                if n == 1:
                    out = out.reshape(1, -1)
                else:
                    # if can reshape to (n, k_local), try:
                    if out.size == n * self.k_local:
                        out = out.reshape(n, self.k_local)
                    else:
                        raise ValueError("Constraint returned 1D that cannot be aligned to inputs.")
        elif out.ndim == 2:
            if out.shape[0] != n:
                if out.shape[1] == n:
                    out = out.T
                else:
                    raise ValueError("Constraint returned 2D with incompatible rows.")
        else:
            raise ValueError("Constraint returned ndim > 2, unsupported.")
        return out.astype(np.float64)


[docs] class MTOP: """ Multi-Task Optimization Problem (MTOP) definition and management. This class allows defining multiple optimization tasks, each with decision variables, objectives, and constraints. It handles vectorized and non-vectorized objective/constraint functions, normalizes outputs to consistent 2D arrays, and provides unified evaluation interfaces. Parameters ---------- unified_eval_mode : bool, optional If True, evaluation results will be padded to maximum dimensions across all tasks (default is False). fill_value : float, optional Value used for padding in unified evaluation mode (default is 0.0). Attributes ---------- tasks : List[Dict[str, Any]] List of task dictionaries containing function wrappers and metadata. dims : List[int] List of decision variable dimensions for each task. bounds : List[Tuple[np.ndarray, np.ndarray]] List of (lower_bound, upper_bound) tuples for each task. n_tasks : int Total number of tasks. m_max : int Maximum number of objectives across all tasks. c_max : int Maximum number of constraints across all tasks. unified_eval_mode : bool Whether unified evaluation mode is enabled. fill_value : float Fill value for padding in unified evaluation mode. Examples -------- Create a simple MTOP with two tasks: >>> def sphere(x): ... return np.sum(x**2, axis=1) >>> def rosenbrock(x): ... x = np.atleast_2d(x) ... return np.sum(100*(x[:, 1:] - x[:, :-1]**2)**2 + (1 - x[:, :-1])**2, axis=1) >>> >>> mtop = MTOP() >>> mtop.add_task(sphere, dim=3) 0 >>> mtop.add_task(rosenbrock, dim=5, lower_bound=-5, upper_bound=5) 1 >>> >>> # Evaluate first task >>> X = np.random.rand(10, 3) >>> obj, con = mtop.evaluate_task(0, X) >>> obj.shape (10, 1) See Also -------- ObjectiveFunctionWrapper : Wrapper for objective functions ConstraintFunctionWrapper : Wrapper for constraint functions """ def __init__(self, unified_eval_mode: bool = False, fill_value: float = 0.0): self.tasks: List[Dict[str, Any]] = [] self.dims: List[int] = [] self.bounds: List[Tuple[np.ndarray, np.ndarray]] = [] # Unified evaluation mode settings self._unified_eval_mode = unified_eval_mode self._fill_value = fill_value # ------------------------- # Unified evaluation mode settings # -------------------------
[docs] def set_unified_eval_mode(self, enabled: bool, fill_value: float = 0.0) -> None: """ Set unified evaluation mode configuration. In unified evaluation mode, all task evaluations are padded to have the same dimensions (m_max objectives and c_max constraints). Parameters ---------- enabled : bool Enable or disable unified evaluation mode. fill_value : float, optional Value used for padding (default is 0.0). Examples -------- >>> mtop = MTOP() >>> mtop.set_unified_eval_mode(enabled=True, fill_value=0.0) >>> mtop.unified_eval_mode True """ self._unified_eval_mode = enabled self._fill_value = fill_value
@property def unified_eval_mode(self) -> bool: """ Check if unified evaluation mode is enabled. Returns ------- bool True if unified evaluation mode is enabled, False otherwise. """ return self._unified_eval_mode @property def fill_value(self) -> float: """ Get the fill value for unified evaluation mode. Returns ------- float The fill value used for padding. """ return self._fill_value @property def m_max(self) -> int: """ Maximum number of objectives across all tasks. Returns ------- int Maximum number of objectives, or 0 if no tasks are defined. """ if not self.tasks: return 0 return max(t['n_objectives'] for t in self.tasks) @property def c_max(self) -> int: """ Maximum number of constraints across all tasks. Returns ------- int Maximum number of constraints, or 0 if no tasks are defined. """ if not self.tasks: return 0 return max(t['n_constraints'] for t in self.tasks) # ------------------------- # Task addition interfaces # -------------------------
[docs] def add_task( self, objective_func: Union[Callable[[np.ndarray], Any], Tuple[Callable, ...]], dim: Union[int, Tuple[int, ...]], constraint_func: Optional[Union[Callable, List[Callable], Tuple[List[Callable], ...]]] = None, lower_bound: Optional[ Union[float, List[float], np.ndarray, Tuple[Union[float, List[float], np.ndarray], ...]]] = None, upper_bound: Optional[ Union[float, List[float], np.ndarray, Tuple[Union[float, List[float], np.ndarray], ...]]] = None ) -> Union[int, List[int]]: """ Add one or more tasks to MTOP. This method provides a flexible interface for adding tasks with various configurations. It supports both single task and multiple task additions. Parameters ---------- objective_func : Callable or Tuple[Callable, ...] Objective function(s) to evaluate. Can be: - A single callable: adds one task - A tuple of callables: adds multiple tasks Each function should accept X with shape (n, dim) and return objective values. dim : int or Tuple[int, ...] Dimension(s) of decision variables. Can be: - A single int: dimension for one task (or broadcast to all if multiple funcs) - A tuple of ints: dimensions for each task in objective_func tuple constraint_func : Callable, List[Callable], Tuple[List[Callable], ...], optional Constraint function(s). Can be: - None: no constraints (default) - A single callable: one constraint function - A list of callables: multiple constraint functions for one task - A tuple: constraint functions for each task (when adding multiple) lower_bound : float, List[float], np.ndarray, Tuple[...], optional Lower bound(s) for decision variables. Can be: - None: defaults to zeros array with length dim - float: broadcasts to all dimensions - array: must have length dim upper_bound : float, List[float], np.ndarray, Tuple[...], optional Upper bound(s) for decision variables. Can be: - None: defaults to ones array with length dim - float: broadcasts to all dimensions - array: must have length dim Returns ------- int or List[int] Task index (single task) or list of task indices (multiple tasks). Raises ------ ValueError If dimensions mismatch or bounds are incompatible. Examples -------- Add a single task with default bounds [0, 1]: >>> def sphere(x): ... return np.sum(x**2, axis=1) >>> mtop = MTOP() >>> idx = mtop.add_task(sphere, dim=3) >>> idx 0 Add a single task with custom bounds (scalar): >>> idx = mtop.add_task(sphere, dim=5, lower_bound=-5, upper_bound=5) >>> idx 1 Add multiple tasks at once: >>> def f1(x): return np.sum(x**2, axis=1) >>> def f2(x): return np.sum((x-1)**2, axis=1) >>> indices = mtop.add_task( ... objective_func=(f1, f2), ... dim=(3, 4), ... lower_bound=([-1]*3, [-2]*4), ... upper_bound=([1]*3, [2]*4) ... ) >>> indices [2, 3] Add task with constraints: >>> def con(x): return x[0] - 0.5 >>> idx = mtop.add_task(sphere, dim=2, constraint_func=con) >>> idx 4 """ if isinstance(objective_func, tuple): return self._add_multiple_tasks(objective_func, dim, constraint_func, lower_bound, upper_bound) else: return self._add_single_task(objective_func, dim, constraint_func, lower_bound, upper_bound)
def _add_single_task( self, objective_func: Callable[[np.ndarray], Any], dim: int, constraint_func: Optional[Union[Callable, List[Callable]]] = None, lower_bound: Optional[Union[float, List[float], np.ndarray]] = None, upper_bound: Optional[Union[float, List[float], np.ndarray]] = None ) -> int: """ Add a single task to MTOP. Parameters ---------- objective_func : Callable Objective function to evaluate. dim : int Dimension of decision variables. constraint_func : Callable or List[Callable], optional Constraint function(s) (default is None). lower_bound : float, List[float], or np.ndarray, optional Lower bound (default is zeros array with length dim). If scalar, broadcasts to all dimensions. upper_bound : float, List[float], or np.ndarray, optional Upper bound (default is ones array with length dim). If scalar, broadcasts to all dimensions. Returns ------- int Task index. Raises ------ ValueError If bounds size doesn't match dim. """ # Default bounds to [0, 1] if not provided if lower_bound is None: lower_bound = np.zeros(dim) else: lb = np.asarray(lower_bound, dtype=np.float64).reshape(-1) if lb.size == 1: # Broadcast scalar to all dimensions lower_bound = np.full(dim, lb[0]) else: lower_bound = lb if upper_bound is None: upper_bound = np.ones(dim) else: ub = np.asarray(upper_bound, dtype=np.float64).reshape(-1) if ub.size == 1: # Broadcast scalar to all dimensions upper_bound = np.full(dim, ub[0]) else: upper_bound = ub lb = np.asarray(lower_bound, dtype=np.float64).reshape(-1) ub = np.asarray(upper_bound, dtype=np.float64).reshape(-1) if lb.size != dim or ub.size != dim: raise ValueError(f"Bounds must be length {dim}") # Wrap objective and constraints (using independent wrapper classes, can be pickled) wrapped_obj = self._wrap_objective_func(objective_func, dim) # Infer n_objectives by testing a single sample test_out = wrapped_obj(np.zeros((1, dim))) if test_out.ndim != 2: raise ValueError("Wrapped objective must produce 2D array") n_obj = test_out.shape[1] constraint_wrappers, n_constraints = self._process_constraints(constraint_func, dim) task = { 'raw_objective': objective_func, 'objective': wrapped_obj, # callable: X (n,dim) -> (n, n_obj) 'n_objectives': n_obj, 'constraints': constraint_wrappers, # list of callables X -> (n, k_i) 'n_constraints': n_constraints } self.tasks.append(task) self.dims.append(dim) self.bounds.append((lb.reshape(1, -1), ub.reshape(1, -1))) return len(self.tasks) - 1 def _add_multiple_tasks( self, objective_funcs: Tuple[Callable, ...], dims: Union[int, Tuple[int, ...]], constraint_func: Optional[Union[List[Callable], Tuple[List[Callable], ...]]] = None, lower_bounds: Optional[Tuple[Union[float, List[float], np.ndarray], ...]] = None, upper_bounds: Optional[Tuple[Union[float, List[float], np.ndarray], ...]] = None ) -> List[int]: """ Add multiple tasks to MTOP. Parameters ---------- objective_funcs : Tuple[Callable, ...] Tuple of objective functions. dims : int or Tuple[int, ...] Single dimension (broadcast to all) or tuple of dimensions. constraint_func : List[Callable] or Tuple[List[Callable], ...], optional Constraint function(s) for each task (default is None for all). lower_bounds : Tuple[Union[float, List[float], np.ndarray], ...], optional Tuple of lower bounds for each task (default is zeros for each). Each can be scalar or array. upper_bounds : Tuple[Union[float, List[float], np.ndarray], ...], optional Tuple of upper bounds for each task (default is ones for each). Each can be scalar or array. Returns ------- List[int] List of task indices. Raises ------ ValueError If tuple lengths don't match number of objective functions. """ n_tasks = len(objective_funcs) # Broadcast dims if necessary if isinstance(dims, int): dims = (dims,) * n_tasks elif len(dims) != n_tasks: raise ValueError("dims length must match number of objective_funcs") # Prepare constraint_func tuple/list if constraint_func is None: constraint_func = (None,) * n_tasks elif not isinstance(constraint_func, (list, tuple)): constraint_func = tuple([constraint_func] * n_tasks) elif len(constraint_func) != n_tasks: raise ValueError("constraint_func length must match number of objective_funcs") # Handle bounds - allow None for default [0, 1] if lower_bounds is None: lower_bounds = (None,) * n_tasks elif len(lower_bounds) != n_tasks: raise ValueError("lower_bounds length must match number of objective_funcs") if upper_bounds is None: upper_bounds = (None,) * n_tasks elif len(upper_bounds) != n_tasks: raise ValueError("upper_bounds length must match number of objective_funcs") indices = [] for i in range(n_tasks): idx = self._add_single_task( objective_func=objective_funcs[i], dim=dims[i], constraint_func=constraint_func[i], lower_bound=lower_bounds[i], upper_bound=upper_bounds[i] ) indices.append(idx) return indices
[docs] def add_tasks(self, tasks_config: List[Dict[str, Any]]) -> List[int]: """ Add multiple tasks from configuration dictionaries. Parameters ---------- tasks_config : List[Dict[str, Any]] List of task configuration dictionaries. Each dict must contain: - 'objective_func' : Callable (required) - 'dim' : int (required) - 'constraint_func' : Callable or List[Callable] (optional) - 'lower_bound' : float, List[float], or np.ndarray (optional, default zeros) - 'upper_bound' : float, List[float], or np.ndarray (optional, default ones) Returns ------- List[int] List of task indices. Raises ------ TypeError If tasks_config is not a list. ValueError If any config dict is missing required keys. Examples -------- >>> def f1(x): return np.sum(x**2, axis=1) >>> def f2(x): return np.sum((x-1)**2, axis=1) >>> configs = [ ... {'objective_func': f1, 'dim': 3}, ... {'objective_func': f2, 'dim': 5, 'lower_bound': -5, 'upper_bound': 5} ... ] >>> mtop = MTOP() >>> indices = mtop.add_tasks(configs) >>> indices [0, 1] """ if not isinstance(tasks_config, list): raise TypeError("tasks_config must be a list of dicts") indices = [] for cfg in tasks_config: if 'objective_func' not in cfg: raise ValueError("Each task config must contain 'objective_func'") if 'dim' not in cfg: raise ValueError("Each task config must contain 'dim'") idx = self.add_task( objective_func=cfg['objective_func'], dim=cfg['dim'], constraint_func=cfg.get('constraint_func', None), lower_bound=cfg.get('lower_bound', None), upper_bound=cfg.get('upper_bound', None) ) indices.append(idx) return indices
# ------------------------- # Wrapping helpers # ------------------------- def _wrap_objective_func(self, func: Callable, dim: int) -> Callable[[np.ndarray], np.ndarray]: """ Wrap objective function with pickle-compatible wrapper. Parameters ---------- func : Callable Objective function to wrap. dim : int Dimension of decision variables. Returns ------- Callable Wrapped function that accepts X with shape (n, dim) and returns array with shape (n, n_objectives). Notes ----- Uses ObjectiveFunctionWrapper class to support pickle serialization for cross-platform parallel compatibility. """ return ObjectiveFunctionWrapper(func, dim) def _process_constraints(self, constraint_func, dim: int) -> Tuple[List[Callable[[np.ndarray], np.ndarray]], int]: """ Normalize constraint functions into list of pickle-compatible wrappers. Parameters ---------- constraint_func : None, Callable, or List[Callable] Constraint function(s) to process. dim : int Dimension of decision variables. Returns ------- Tuple[List[Callable], int] Tuple of (wrappers_list, total_n_constraints). If constraint_func is None, returns ([], 0). Raises ------ TypeError If constraint_func has invalid type. Notes ----- Uses ConstraintFunctionWrapper class to support pickle serialization for cross-platform parallel compatibility. """ if constraint_func is None: return [], 0 # If single callable provided, wrap into list if callable(constraint_func) and not isinstance(constraint_func, (list, tuple)): funcs = [constraint_func] elif isinstance(constraint_func, (list, tuple)): funcs = list(constraint_func) if not all(callable(f) for f in funcs): raise TypeError("All elements in constraint_func must be callables") else: raise TypeError("constraint_func must be callable or list/tuple of callables or None") wrappers = [] total = 0 for f in funcs: # probe to detect per-call output size try: probe = f(np.zeros(dim)) probe = np.atleast_1d(np.asarray(probe)) k = probe.size except Exception: # if probe fails, assume scalar per-call k = 1 total += k wrappers.append(ConstraintFunctionWrapper(f, k)) return wrappers, total # ------------------------- # Evaluation # -------------------------
[docs] def evaluate_task( self, task_idx: int, X: np.ndarray, eval_objectives: Union[bool, int, List[int]] = True, eval_constraints: Union[bool, int, List[int]] = True ) -> Tuple[np.ndarray, np.ndarray]: """ Evaluate a task with selective evaluation support. Parameters ---------- task_idx : int Index of the task to evaluate. X : np.ndarray Input array of shape (n_samples, dim) or (dim,). eval_objectives : bool, int, or List[int], optional Evaluation mode for objectives (default is True): - True: evaluate all objectives - False: skip objective evaluation, return empty array - int: evaluate only the i-th objective - List[int]: evaluate specified objectives by indices eval_constraints : bool, int, or List[int], optional Evaluation mode for constraints (default is True): - True: evaluate all constraints - False: skip constraint evaluation, return empty array - int: evaluate only the i-th constraint - List[int]: evaluate specified constraints by indices Returns ------- Tuple[np.ndarray, np.ndarray] Tuple of (objectives, constraints) as 2D numpy arrays: - objectives: shape (n_samples, n_evaluated_objectives) or padded to (n_samples, m_max) if unified_eval_mode is True - constraints: shape (n_samples, n_evaluated_constraints) or padded to (n_samples, c_max) if unified_eval_mode is True Raises ------ ValueError If task_idx is out of range or input dimension mismatch. Examples -------- Evaluate all objectives and constraints: >>> def sphere(x): ... return np.sum(x**2, axis=1) >>> mtop = MTOP() >>> mtop.add_task(sphere, dim=3) 0 >>> X = np.random.rand(10, 3) >>> obj, con = mtop.evaluate_task(0, X) >>> obj.shape (10, 1) Evaluate only specific objectives: >>> def multi_obj(x): ... x = np.atleast_2d(x) ... f1 = np.sum(x**2, axis=1) ... f2 = np.sum((x-1)**2, axis=1) ... f3 = np.sum(x, axis=1) ... return np.column_stack([f1, f2, f3]) >>> mtop2 = MTOP() >>> mtop2.add_task(multi_obj, dim=3) 0 >>> X = np.random.rand(10, 3) >>> obj, con = mtop2.evaluate_task(0, X, eval_objectives=[0, 2]) >>> obj.shape (10, 2) Skip constraint evaluation: >>> mtop3 = MTOP() >>> mtop3.add_task(sphere, dim=3) 0 >>> X = np.random.rand(10, 3) >>> obj, con = mtop3.evaluate_task(0, X, eval_constraints=False) >>> con.shape (10, 0) """ if not 0 <= task_idx < len(self.tasks): raise ValueError("task_idx out of range") X = np.atleast_2d(X) dim = self.dims[task_idx] if X.shape[1] != dim: raise ValueError(f"Input dimension mismatch: expected {dim}, got {X.shape[1]}") task = self.tasks[task_idx] n = X.shape[0] # Evaluate objectives obj = self._evaluate_objectives(task, X, eval_objectives) # Evaluate constraints cons = self._evaluate_constraints(task, X, eval_constraints) # Apply unified evaluation mode if enabled if self._unified_eval_mode: obj = self._pad_to_max(obj, self.m_max, axis=1) cons = self._pad_to_max(cons, self.c_max, axis=1) return obj, cons
def _evaluate_objectives( self, task: Dict, X: np.ndarray, eval_objectives: Union[bool, int, List[int]] ) -> np.ndarray: """ Evaluate objectives based on eval_objectives parameter. Parameters ---------- task : Dict Task dictionary containing objective function wrapper. X : np.ndarray Input array of shape (n_samples, dim). eval_objectives : bool, int, or List[int] Evaluation mode specification. Returns ------- np.ndarray Objective values array. Raises ------ ValueError If objective index is out of range. TypeError If eval_objectives has invalid type. """ n = X.shape[0] n_obj = task['n_objectives'] if eval_objectives is False: return np.empty((n, 0), dtype=np.float64) # Evaluate all objectives first all_obj = task['objective'](X) if all_obj.ndim != 2: raise RuntimeError("Internal objective wrapper failed to produce 2D array") if eval_objectives is True: return all_obj # Selective evaluation if isinstance(eval_objectives, int): if not 0 <= eval_objectives < n_obj: raise ValueError(f"Objective index {eval_objectives} out of range [0, {n_obj})") return all_obj[:, eval_objectives:eval_objectives + 1] if isinstance(eval_objectives, (list, tuple)): indices = list(eval_objectives) for idx in indices: if not 0 <= idx < n_obj: raise ValueError(f"Objective index {idx} out of range [0, {n_obj})") return all_obj[:, indices] raise TypeError("eval_objectives must be bool, int, or list of int") def _evaluate_constraints( self, task: Dict, X: np.ndarray, eval_constraints: Union[bool, int, List[int]] ) -> np.ndarray: """ Evaluate constraints based on eval_constraints parameter. Parameters ---------- task : Dict Task dictionary containing constraint function wrappers. X : np.ndarray Input array of shape (n_samples, dim). eval_constraints : bool, int, or List[int] Evaluation mode specification. Returns ------- np.ndarray Constraint values array. Raises ------ ValueError If constraint index is out of range. TypeError If eval_constraints has invalid type. """ n = X.shape[0] n_cons = task['n_constraints'] if eval_constraints is False: return np.empty((n, 0), dtype=np.float64) # No constraints defined -> return zeros with shape (n, 1) if n_cons == 0: return np.zeros((n, 1), dtype=np.float64) # Evaluate all constraints parts = [w(X) for w in task['constraints']] all_cons = np.hstack(parts) if len(parts) > 1 else parts[0] if all_cons.ndim != 2: raise RuntimeError("Internal constraint wrappers failed to produce 2D array") if eval_constraints is True: return all_cons # Selective evaluation if isinstance(eval_constraints, int): if not 0 <= eval_constraints < n_cons: raise ValueError(f"Constraint index {eval_constraints} out of range [0, {n_cons})") return all_cons[:, eval_constraints:eval_constraints + 1] if isinstance(eval_constraints, (list, tuple)): indices = list(eval_constraints) for idx in indices: if not 0 <= idx < n_cons: raise ValueError(f"Constraint index {idx} out of range [0, {n_cons})") return all_cons[:, indices] raise TypeError("eval_constraints must be bool, int, or list of int") def _pad_to_max(self, arr: np.ndarray, max_size: int, axis: int = 1) -> np.ndarray: """ Pad array to max_size along specified axis. Parameters ---------- arr : np.ndarray Array to pad. max_size : int Target size along the specified axis. axis : int, optional Axis along which to pad (default is 1). Returns ------- np.ndarray Padded array. Notes ----- Uses constant padding with the value specified by self._fill_value. """ if arr.shape[axis] >= max_size: return arr pad_width = [(0, 0)] * arr.ndim pad_width[axis] = (0, max_size - arr.shape[axis]) return np.pad(arr, pad_width, mode='constant', constant_values=self._fill_value)
[docs] def evaluate_tasks( self, task_indices: List[int], X_list: List[np.ndarray], eval_objectives: Union[bool, int, List[int], List[Union[bool, int, List[int]]]] = True, eval_constraints: Union[bool, int, List[int], List[Union[bool, int, List[int]]]] = True ) -> Tuple[List[np.ndarray], List[np.ndarray]]: """ Evaluate multiple tasks simultaneously. Parameters ---------- task_indices : List[int] List of task indices to evaluate. X_list : List[np.ndarray] List of input arrays, one for each task. eval_objectives : bool, int, List[int], or List[Union[...]], optional Evaluation mode for objectives (default is True): - Single mode: applied to all tasks - List of modes: per-task evaluation modes eval_constraints : bool, int, List[int], or List[Union[...]], optional Evaluation mode for constraints (default is True): - Single mode: applied to all tasks - List of modes: per-task evaluation modes Returns ------- Tuple[List[np.ndarray], List[np.ndarray]] Tuple of (list of objective arrays, list of constraint arrays). Raises ------ ValueError If task_indices and X_list length mismatch. Examples -------- >>> def f1(x): return np.sum(x**2, axis=1) >>> def f2(x): return np.sum((x-1)**2, axis=1) >>> mtop = MTOP() >>> mtop.add_task(f1, dim=3) 0 >>> mtop.add_task(f2, dim=4) 1 >>> mtop.add_task(f1, dim=5) 2 >>> task_indices = [0, 1, 2] >>> X_list = [np.random.rand(10, 3), np.random.rand(10, 4), np.random.rand(10, 5)] >>> objs, cons = mtop.evaluate_tasks(task_indices, X_list) >>> len(objs) 3 """ if len(task_indices) != len(X_list): raise ValueError("task_indices and X_list length mismatch") # Handle per-task evaluation modes if isinstance(eval_objectives, list) and len(eval_objectives) == len(task_indices): obj_modes = eval_objectives else: obj_modes = [eval_objectives] * len(task_indices) if isinstance(eval_constraints, list) and len(eval_constraints) == len(task_indices): con_modes = eval_constraints else: con_modes = [eval_constraints] * len(task_indices) objs, cons = [], [] for idx, X, obj_mode, con_mode in zip(task_indices, X_list, obj_modes, con_modes): o, c = self.evaluate_task(idx, X, eval_objectives=obj_mode, eval_constraints=con_mode) objs.append(o) cons.append(c) return objs, cons
# ------------------------- # Query / info # ------------------------- def get_n_objectives(self, task_idx: int) -> int: """ Get the number of objectives for a specific task. Parameters ---------- task_idx : int Index of the task. Returns ------- int Number of objectives. Raises ------ ValueError If task_idx is out of range. """ if not 0 <= task_idx < len(self.tasks): raise ValueError("task_idx out of range") return self.tasks[task_idx]['n_objectives'] def get_all_n_objectives(self) -> List[int]: """ Get the number of objectives for all tasks. Returns ------- List[int] List of number of objectives for each task. """ return [self.get_n_objectives(i) for i in range(self.n_tasks)] def get_n_constraints(self, task_idx: int) -> int: """ Get the number of constraints for a specific task. Parameters ---------- task_idx : int Index of the task. Returns ------- int Number of constraints. Raises ------ ValueError If task_idx is out of range. """ if not 0 <= task_idx < len(self.tasks): raise ValueError("task_idx out of range") return self.tasks[task_idx]['n_constraints'] def get_all_n_constraints(self) -> List[int]: """ Get the number of constraints for all tasks. Returns ------- List[int] List of number of constraints for each task. """ return [self.get_n_constraints(i) for i in range(self.n_tasks)]
[docs] def get_task_info(self, task_idx: int) -> Dict[str, Any]: """ Get comprehensive information about a specific task. Parameters ---------- task_idx : int Index of the task. Returns ------- Dict[str, Any] Dictionary containing task information: - 'dimension' : int - Decision variable dimension - 'n_objectives' : int - Number of objectives - 'n_constraints' : int - Number of constraints - 'lower_bounds' : np.ndarray - Lower bounds - 'upper_bounds' : np.ndarray - Upper bounds - 'objective_func' : Callable - Raw objective function - 'constraint_funcs' : List[Callable] - Constraint function wrappers Raises ------ ValueError If task_idx is out of range. Examples -------- >>> def sphere(x): return np.sum(x**2, axis=1) >>> mtop = MTOP() >>> mtop.add_task(sphere, dim=3) 0 >>> info = mtop.get_task_info(0) >>> print(f"Task 0 has {info['n_objectives']} objectives") Task 0 has 1 objectives """ if not 0 <= task_idx < len(self.tasks): raise ValueError("task_idx out of range") lb, ub = self.bounds[task_idx] t = self.tasks[task_idx] return { 'dimension': self.dims[task_idx], 'n_objectives': t['n_objectives'], 'n_constraints': t['n_constraints'], 'lower_bounds': lb, 'upper_bounds': ub, 'objective_func': t['raw_objective'], 'constraint_funcs': [w for w in t['constraints']] }
def __str__(self) -> str: """ Generate string representation of MTOP. Returns ------- str Multi-line string describing the MTOP configuration. """ lines = [f"MTOP with {len(self.tasks)} tasks:"] lines.append(f" Unified eval mode: {self._unified_eval_mode} (fill_value={self._fill_value})") lines.append(f" Max number of objectives (m_max): {self.m_max}") lines.append(f" Max number of constraints (c_max): {self.c_max}") for i, t in enumerate(self.tasks): lb, ub = self.bounds[i] lines.append( f" Task {i}: dim={self.dims[i]}, n_objs={t['n_objectives']}, n_cons={t['n_constraints']}, bounds=[{lb.min()}..{ub.max()}]") return "\n".join(lines) @property def n_tasks(self) -> int: """ Total number of tasks. Returns ------- int Number of tasks in the MTOP. """ return len(self.tasks) @property def n_objs(self) -> List[int]: """ List of number of objectives for all tasks. Returns ------- List[int] Number of objectives for each task. """ return self.get_all_n_objectives() @property def n_cons(self) -> List[int]: """ List of number of constraints for all tasks. Returns ------- List[int] Number of constraints for each task. """ return self.get_all_n_constraints()
# ------------------------- # Example usage and testing # ------------------------- if __name__ == "__main__": print("=" * 60) print("MTOP v2.1 - Testing (Cross-platform compatible)") print("=" * 60) # Define test functions def obj1(x): """Single objective: f(x) = sum(x^2)""" x = np.atleast_2d(x) return np.sum(x ** 2, axis=1) def obj2(x): """Multi-objective: f1 = sum(x^2), f2 = sum((x-1)^2), f3 = sum(x)""" x = np.atleast_2d(x) f1 = np.sum(x ** 2, axis=1) f2 = np.sum((x - 1) ** 2, axis=1) f3 = np.sum(x, axis=1) return np.column_stack([f1, f2, f3]) def con1(x): """Single constraint: g(x) = sum(x) - 1 <= 0""" x = np.atleast_1d(x) return np.sum(x) - 1 def con2(x): """Two constraints: g1 = x[0] - 0.5, g2 = x[1] - 0.5""" x = np.atleast_1d(x) return np.array([x[0] - 0.5, x[1] - 0.5]) # Create MTOP instance print("\n1. Creating MTOP with tasks using default bounds...") mtop = MTOP(unified_eval_mode=False) # Add task with explicit bounds idx1 = mtop.add_task( objective_func=obj1, dim=3, constraint_func=con1, lower_bound=[-5, -5, -5], upper_bound=[5, 5, 5] ) print(f"Task {idx1}: explicit bounds [-5, 5]") # Add task with default bounds [0, 1] idx2 = mtop.add_task( objective_func=obj2, dim=2, constraint_func=con2 # No bounds provided -> defaults to [0, 1] ) print(f"Task {idx2}: default bounds [0, 1]") # Add task with only lower bound (upper defaults to 1) idx3 = mtop.add_task( objective_func=obj1, dim=2, lower_bound=[-1, -1] # upper_bound defaults to [1, 1] ) print(f"Task {idx3}: lower_bound=[-1,-1], upper_bound defaults to [1,1]") # Add task with scalar bounds idx4 = mtop.add_task( objective_func=obj1, dim=5, lower_bound=-5, # scalar, broadcasts to [-5, -5, -5, -5, -5] upper_bound=5 # scalar, broadcasts to [5, 5, 5, 5, 5] ) print(f"Task {idx4}: scalar bounds -5 and 5 broadcast to all dimensions") print("\n" + str(mtop)) # Test evaluation print("\n2. Testing evaluation with default bounds...") # Task with default bounds [0, 1] X_default = np.array([[0.5, 0.5], [0.0, 1.0]]) obj_vals, con_vals = mtop.evaluate_task(1, X_default) print(f"\nTask 1 (default bounds [0,1]):") print(f" Input X:\n{X_default}") print(f" Objectives: {obj_vals}") print(f" Constraints: {con_vals}") # Verify bounds print("\n3. Verifying bounds...") for i in range(mtop.n_tasks): info = mtop.get_task_info(i) print(f"Task {i}: lower={info['lower_bounds'].flatten()}, upper={info['upper_bounds'].flatten()}") # Test add_tasks with default bounds print("\n4. Testing add_tasks with mixed bounds...") mtop2 = MTOP() configs = [ {'objective_func': obj1, 'dim': 3}, # default [0,1] {'objective_func': obj1, 'dim': 2, 'lower_bound': [-2, -2], 'upper_bound': [2, 2]}, {'objective_func': obj2, 'dim': 2} # default [0,1] ] indices = mtop2.add_tasks(configs) print(mtop2) # Test tuple-style add_task with default bounds print("\n5. Testing tuple-style add_task with default bounds...") mtop4 = MTOP() indices = mtop4.add_task( objective_func=(obj1, obj2), dim=(3, 2) # No bounds -> all default to [0, 1] ) print(mtop4) print("\n" + "=" * 60) print("All tests completed successfully!") print("Cross-platform compatibility verified!") print("=" * 60)