Source code for trop.cv

from __future__ import annotations

from typing import Iterable, Optional, Sequence, Tuple, Union, List

import numpy as np
from joblib import Parallel, delayed

from .estimator import TROP_TWFE_average


ArrayLike = Union[np.ndarray, Sequence[Sequence[float]]]


def _validate_panel(Y: np.ndarray, treated_periods: int) -> Tuple[int, int]:
    """Validate panel dimensions and treated_periods. Returns (N, T)."""
    if Y.ndim != 2:
        raise ValueError("Y must be a 2D array of shape (N, T).")
    N, T = Y.shape
    if treated_periods <= 0 or treated_periods >= T:
        raise ValueError(
            f"treated_periods must be in [1, T-1]. Got treated_periods={treated_periods}, T={T}."
        )
    if N < 2:
        raise ValueError(f"Y must have at least 2 units (N>=2). Got N={N}.")
    return int(N), int(T)


def _as_list(grid: Iterable[float], *, name: str = "lambda_grid") -> List[float]:
    """Convert an iterable of grid values to a non-empty list of floats."""
    grid_list = list(grid)
    if len(grid_list) == 0:
        raise ValueError(f"{name} must be non-empty.")
    return [float(x) for x in grid_list]


def _validate_and_normalize_cv_sampling(
    N: int,
    cv_sampling_method: str,
    *,
    n_treated_units: Optional[int],
    n_trials: Optional[int],
    K: Optional[int],
) -> Tuple[str, Optional[int], Optional[int], Optional[int]]:
    """
    Validate placebo-set sampling arguments and return normalized (method, n_treated_units, n_trials, K).

    Rules
    -----
    - resample: requires n_treated_units and n_trials; enforces 1 <= n_treated_units <= N-1, n_trials > 0
    - kfold   : requires K; enforces 2 <= K <= N (K>N would create empty folds)
    """
    method = str(cv_sampling_method).lower().strip()
    if method not in {"resample", "kfold"}:
        raise ValueError("cv_sampling_method must be one of {'resample', 'kfold'}.")

    def _as_int(name: str, x: Optional[int]) -> Optional[int]:
        if x is None:
            return None
        if isinstance(x, bool):
            raise TypeError(f"{name} must be an integer, not bool.")
        try:
            xi = int(x)
        except (TypeError, ValueError) as e:
            raise TypeError(f"{name} must be an integer.") from e
        if xi != x:
            raise TypeError(f"{name} must be an integer.")
        return xi

    n_treated_units_i = _as_int("n_treated_units", n_treated_units)
    n_trials_i = _as_int("n_trials", n_trials)
    K_i = _as_int("K", K)

    if method == "resample":
        if n_treated_units_i is None or n_trials_i is None:
            raise ValueError("resample requires both n_treated_units and n_trials.")
        if n_trials_i <= 0:
            raise ValueError(f"n_trials must be positive. Got n_trials={n_trials_i}.")
        if n_treated_units_i <= 0 or n_treated_units_i >= N:
            raise ValueError(
                f"n_treated_units must be in [1, N-1]. Got n_treated_units={n_treated_units_i}, N={N}."
            )
        return method, n_treated_units_i, n_trials_i, None

    if K_i is None:
        raise ValueError("kfold requires K.")
    if K_i < 2:
        raise ValueError(f"K must be at least 2 for kfold. Got K={K_i}.")
    if K_i > N:
        raise ValueError(f"K cannot exceed N for kfold (would create empty folds). Got K={K_i}, N={N}.")
    return method, None, None, K_i


def _generate_placebo_sets(
    N: int,
    cv_sampling_method: str = "resample",
    *,
    n_treated_units: Optional[int] = None,
    n_trials: Optional[int] = None,
    K: Optional[int] = None,
    random_state: Optional[int] = None,
) -> List[np.ndarray]:
    """
    Generate placebo treatment sets for cross-validation.

    - "resample": random placebo assignment repeated `n_trials` times
    - "kfold": K-fold cross-validation (each fold treated once)
    """
    method, n_treated_units_i, n_trials_i, K_i = _validate_and_normalize_cv_sampling(
        N, cv_sampling_method, n_treated_units=n_treated_units, n_trials=n_trials, K=K
    )

    rng = np.random.default_rng(random_state)
    units = np.arange(N)

    if method == "resample":
        assert n_treated_units_i is not None and n_trials_i is not None
        return [
            rng.choice(units, size=n_treated_units_i, replace=False).astype(int, copy=False)
            for _ in range(n_trials_i)
        ]

    assert K_i is not None
    shuffled = rng.permutation(units)
    folds = np.array_split(shuffled, K_i)
    return [fold.astype(int, copy=False) for fold in folds]


def _simulate_ate(
    Y: np.ndarray,
    treated_units: np.ndarray,
    treated_periods: int,
    lambda_unit: float,
    lambda_time: float,
    lambda_nn: float,
    solver: Optional[str] = None,
    verbose: bool = False,
) -> float:
    """Simulate a single placebo ATE given indices of placebo-treated units."""
    W = np.zeros_like(Y, dtype=float)
    W[treated_units, -treated_periods:] = 1.0

    return TROP_TWFE_average(
        Y=Y,
        W=W,
        treated_units=treated_units,
        lambda_unit=lambda_unit,
        lambda_time=lambda_time,
        lambda_nn=lambda_nn,
        treated_periods=treated_periods,
        solver=solver,
        verbose=verbose,
    )


def _placebo_rmse_for_lambdas(
    *,
    Y: np.ndarray,
    placebo_sets: Sequence[np.ndarray],
    treated_periods: int,
    lambda_unit: float,
    lambda_time: float,
    lambda_nn: float,
    n_jobs: int,
    prefer: str,
    solver: Optional[str],
    verbose: bool,
) -> Optional[float]:
    """
    Compute placebo RMSE for a given (lambda_unit, lambda_time, lambda_nn).
    Returns None if *all* placebo trials are non-finite (e.g., solver failures).
    """
    ates = Parallel(n_jobs=n_jobs, prefer=prefer)(
        delayed(_simulate_ate)(
            Y,
            treated_units,
            treated_periods,
            lambda_unit,
            lambda_time,
            lambda_nn,
            solver,
            verbose,
        )
        for treated_units in placebo_sets
    )

    ates_arr = np.asarray(ates, dtype=float)
    ates_arr = ates_arr[np.isfinite(ates_arr)]
    if ates_arr.size == 0:
        return None
    return float(np.sqrt(np.mean(ates_arr ** 2)))


[docs] def TROP_cv_single( Y_control: ArrayLike, treated_periods: int, fixed_lambdas: Tuple[float, float] = (0.0, 0.0), lambda_grid: Optional[Iterable[float]] = None, lambda_cv: str = "unit", *, cv_sampling_method: str = "resample", n_trials: Optional[int] = 200, n_treated_units: Optional[int] = 1, K: Optional[int] = None, n_jobs: int = -1, prefer: str = "threads", random_seed: int = 0, solver: Optional[str] = None, verbose: bool = False, ) -> float: """ Tune one TROP tuning parameter by placebo cross-validation on a control-only panel. For each candidate value in ``lambda_grid``, this routine assigns placebo treatment in the final ``treated_periods`` columns, computes the corresponding TROP estimate, and selects the lambda minimizing the RMSE of placebo ATEs (targeting zero). Parameters ---------- Y_control : array-like of shape (n_units, n_periods) Control-only outcome panel used for placebo cross-validation. treated_periods : int Number of placebo treated (post) periods, taken as the final columns. Must satisfy ``1 <= treated_periods <= n_periods - 1``. fixed_lambdas : tuple of float, default=(0.0, 0.0) Values held fixed for the two lambdas not being tuned. Interpretation depends on ``lambda_cv``: - if ``lambda_cv="unit"``: ``(lambda_time, lambda_nn)`` - if ``lambda_cv="time"``: ``(lambda_unit, lambda_nn)`` - if ``lambda_cv="nn"`` : ``(lambda_unit, lambda_time)`` lambda_grid : iterable of float or None, default=None Candidate values for the lambda being tuned. If None, uses ``np.arange(0.0, 2.0, 0.2)``. lambda_cv : {"unit", "time", "nn"}, default="unit" Which lambda to tune. cv_sampling_method : {"resample", "kfold"}, default="resample" Method used to construct placebo treated sets. - ``"resample"``: draw ``n_trials`` placebo sets, each containing ``n_treated_units`` units sampled without replacement. - ``"kfold"``: split units into ``K`` folds (after shuffling); treat each fold once. Placebo sets are generated once using ``random_seed`` and reused across all candidate lambdas. n_trials : int, default=200 Number of placebo trials when ``cv_sampling_method="resample"``. Required for ``"resample"``. Ignored for ``"kfold"``. n_treated_units : int, default=1 Number of placebo treated units per trial when ``cv_sampling_method="resample"``. Required for ``"resample"``. Ignored for ``"kfold"``. K : int, default=None Number of folds when ``cv_sampling_method="kfold"``. Required for ``"kfold"``. Ignored for ``"resample"``. n_jobs : int, default=-1 Number of parallel jobs. ``-1`` uses all available cores. prefer : {"threads", "processes"}, default="threads" Joblib backend preference. random_seed : int, default=0 Random seed used to generate placebo sets (and fold shuffling for kfold). solver : str or None, default=None CVXPY solver passed to ``TROP_TWFE_average``. verbose : bool, default=False Verbosity flag passed to ``TROP_TWFE_average``. Returns ------- best_lambda : float Selected lambda value minimizing the placebo RMSE. Raises ------ ValueError If inputs are invalid (panel shape, treated_periods bounds, empty grid, or inconsistent CV sampling arguments). RuntimeError If all placebo trials/folds fail or return non-finite ATEs for a candidate lambda (e.g., solver failures). """ Y = np.asarray(Y_control, dtype=float) N, _ = _validate_panel(Y, treated_periods) if lambda_cv not in {"unit", "time", "nn"}: raise ValueError("lambda_cv must be one of {'unit','time','nn'}.") if lambda_grid is None: lambda_grid_list = _as_list(np.arange(0.0, 2.0, 0.2), name="lambda_grid") else: lambda_grid_list = _as_list(lambda_grid, name="lambda_grid") if n_jobs == 0 or n_jobs < -1: raise ValueError("n_jobs must be -1 or a positive integer.") placebo_sets = _generate_placebo_sets( N, cv_sampling_method=cv_sampling_method, n_treated_units=n_treated_units, n_trials=n_trials, K=K, random_state=random_seed, ) scores: List[float] = [] for lamb in lambda_grid_list: if lamb < 0: raise ValueError("Lambda values must be nonnegative.") if lambda_cv == "unit": lambda_unit, lambda_time, lambda_nn = float(lamb), float(fixed_lambdas[0]), float(fixed_lambdas[1]) elif lambda_cv == "time": lambda_unit, lambda_time, lambda_nn = float(fixed_lambdas[0]), float(lamb), float(fixed_lambdas[1]) else: lambda_unit, lambda_time, lambda_nn = float(fixed_lambdas[0]), float(fixed_lambdas[1]), float(lamb) score = _placebo_rmse_for_lambdas( Y=Y, placebo_sets=placebo_sets, treated_periods=treated_periods, lambda_unit=lambda_unit, lambda_time=lambda_time, lambda_nn=lambda_nn, n_jobs=n_jobs, prefer=prefer, solver=solver, verbose=verbose, ) if score is None: raise RuntimeError( "All placebo trials failed or returned non-finite ATEs for " f"lambda={lamb} (lambda_cv='{lambda_cv}'). Consider changing solver/settings." ) scores.append(score) best_idx = int(np.argmin(scores)) return float(lambda_grid_list[best_idx])
[docs] def TROP_cv_cycle( Y_control: ArrayLike, treated_periods: int, unit_grid: Sequence[float], time_grid: Sequence[float], nn_grid: Sequence[float], lambdas_init: Optional[Tuple[float, float, float]] = None, *, max_iter: int = 50, cv_sampling_method: str = "resample", n_trials: Optional[int] = 200, n_treated_units: Optional[int] = 1, K: Optional[int] = None, n_jobs: int = -1, prefer: str = "threads", random_seed: int = 0, solver: Optional[str] = None, verbose: bool = False, ) -> Tuple[float, float, float]: """ Tune (lambda_unit, lambda_time, lambda_nn) by coordinate-descent placebo cross-validation. Iteratively updates one tuning parameter at a time using ``TROP_cv_single`` while holding the other two fixed, until the selected triplet stops changing or ``max_iter`` is reached. Each coordinate update selects the lambda minimizing the RMSE of placebo ATEs on a control-only panel (targeting zero placebo effects). Parameters ---------- Y_control : array-like of shape (n_units, n_periods) Control-only outcome panel used for placebo cross-validation. treated_periods : int Number of placebo treated (post) periods, taken as the final columns. Must satisfy ``1 <= treated_periods <= n_periods - 1``. unit_grid : sequence of float Candidate values for ``lambda_unit`` (unit-distance decay). time_grid : sequence of float Candidate values for ``lambda_time`` (time-distance decay). nn_grid : sequence of float Candidate values for ``lambda_nn`` (nuclear-norm penalty). lambdas_init : tuple of float or None, default=None Initial values ``(lambda_unit, lambda_time, lambda_nn)``. If None, each parameter is initialized to the mean of its grid. max_iter : int, default=50 Maximum number of coordinate-descent iterations. cv_sampling_method : {"resample", "kfold"}, default="resample" Method used to construct placebo treated sets for each coordinate update. - ``"resample"``: draw ``n_trials`` placebo sets, each containing ``n_treated_units`` units sampled without replacement. - ``"kfold"``: split units into ``K`` folds (after shuffling); treat each fold once. Placebo sets are generated using ``random_seed`` for each call to ``TROP_cv_single`` (so each coordinate uses the same placebo assignments across its grid). n_trials : int, default=200 Number of placebo trials when ``cv_sampling_method="resample"``. Ignored when ``cv_sampling_method="kfold"``. n_treated_units : int, default=1 Number of placebo treated units per trial when ``cv_sampling_method="resample"``. Ignored when ``cv_sampling_method="kfold"``. K : int, default=None Number of folds when ``cv_sampling_method="kfold"``. Required for ``"kfold"``. Ignored when ``cv_sampling_method="resample"``. n_jobs : int, default=-1 Number of parallel jobs. ``-1`` uses all available cores. prefer : {"threads", "processes"}, default="threads" Joblib backend preference. random_seed : int, default=0 Random seed used for placebo-set generation (and fold shuffling for kfold). solver : str or None, default=None CVXPY solver passed to ``TROP_TWFE_average``. verbose : bool, default=False Verbosity flag passed to ``TROP_TWFE_average``. Returns ------- lambda_unit : float Selected value for ``lambda_unit``. lambda_time : float Selected value for ``lambda_time``. lambda_nn : float Selected value for ``lambda_nn``. Raises ------ RuntimeError If the procedure does not converge (no fixed point) within ``max_iter``. """ Y = np.asarray(Y_control, dtype=float) _validate_panel(Y, treated_periods) unit_grid_list = _as_list(unit_grid, name="unit_grid") time_grid_list = _as_list(time_grid, name="time_grid") nn_grid_list = _as_list(nn_grid, name="nn_grid") if lambdas_init is None: lambda_unit = float(np.mean(unit_grid_list)) lambda_time = float(np.mean(time_grid_list)) lambda_nn = float(np.mean(nn_grid_list)) else: lambda_unit, lambda_time, lambda_nn = map(float, lambdas_init) for _ in range(max_iter): old = (lambda_unit, lambda_time, lambda_nn) lambda_unit = TROP_cv_single( Y, treated_periods, fixed_lambdas=(lambda_time, lambda_nn), lambda_grid=unit_grid_list, lambda_cv="unit", cv_sampling_method=cv_sampling_method, n_trials=n_trials, n_treated_units=n_treated_units, K=K, n_jobs=n_jobs, prefer=prefer, random_seed=random_seed, solver=solver, verbose=verbose, ) lambda_time = TROP_cv_single( Y, treated_periods, fixed_lambdas=(lambda_unit, lambda_nn), lambda_grid=time_grid_list, lambda_cv="time", cv_sampling_method=cv_sampling_method, n_trials=n_trials, n_treated_units=n_treated_units, K=K, n_jobs=n_jobs, prefer=prefer, random_seed=random_seed, solver=solver, verbose=verbose, ) lambda_nn = TROP_cv_single( Y, treated_periods, fixed_lambdas=(lambda_unit, lambda_time), lambda_grid=nn_grid_list, lambda_cv="nn", cv_sampling_method=cv_sampling_method, n_trials=n_trials, n_treated_units=n_treated_units, K=K, n_jobs=n_jobs, prefer=prefer, random_seed=random_seed, solver=solver, verbose=verbose, ) new = (lambda_unit, lambda_time, lambda_nn) if new == old: return new raise RuntimeError("TROP_cv_cycle did not converge (no fixed point) within max_iter.")
[docs] def TROP_cv_joint( Y_control: ArrayLike, treated_periods: int, unit_grid: Sequence[float], time_grid: Sequence[float], nn_grid: Sequence[float], *, cv_sampling_method: str = "resample", n_trials: Optional[int] = 200, n_treated_units: Optional[int] = 1, K: Optional[int] = None, n_jobs: int = -1, prefer: str = "threads", random_seed: int = 0, solver: Optional[str] = None, verbose: bool = False, ) -> Tuple[float, float, float]: """ Select (lambda_unit, lambda_time, lambda_nn) by joint placebo cross-validation. Performs a full grid search over ``unit_grid`` × ``time_grid`` × ``nn_grid``. For each candidate triple, repeatedly assigns placebo treatment to a subset of units and computes the corresponding TROP estimate. The selected triple is the one minimizing the root-mean-square error (RMSE) of placebo ATEs (targeting zero). Parameters ---------- Y_control : array-like of shape (n_units, n_periods) Control-only outcome panel used for placebo cross-validation. treated_periods : int Number of placebo treated (post) periods, taken as the final columns. Must satisfy ``1 <= treated_periods <= n_periods - 1``. unit_grid : sequence of float Candidate values for ``lambda_unit`` (unit-distance decay). time_grid : sequence of float Candidate values for ``lambda_time`` (time-distance decay). nn_grid : sequence of float Candidate values for ``lambda_nn`` (nuclear-norm penalty). cv_sampling_method : {"resample", "kfold"}, default="resample" Method used to construct placebo treated sets. - ``"resample"``: draw ``n_trials`` placebo sets, each containing ``n_treated_units`` units sampled without replacement. - ``"kfold"``: split units into ``K`` folds (after shuffling); treat each fold once. Placebo sets are generated once using ``random_seed`` and reused across all candidate triples. n_trials : int, default=200 Number of placebo trials when ``cv_sampling_method="resample"``. Ignored when ``cv_sampling_method="kfold"``. n_treated_units : int, default=1 Number of placebo treated units per trial when ``cv_sampling_method="resample"``. Ignored when ``cv_sampling_method="kfold"``. K : int, default=None Number of folds when ``cv_sampling_method="kfold"``. Required for ``"kfold"``. Ignored when ``cv_sampling_method="resample"``. n_jobs : int, default=-1 Number of parallel jobs. ``-1`` uses all available cores. prefer : {"threads", "processes"}, default="threads" Joblib backend preference. random_seed : int, default=0 Random seed used to generate placebo sets (and fold shuffling for kfold). solver : str or None, default=None CVXPY solver passed to ``TROP_TWFE_average``. verbose : bool, default=False Verbosity flag passed to ``TROP_TWFE_average``. Returns ------- lambda_unit : float Selected value for ``lambda_unit``. lambda_time : float Selected value for ``lambda_time``. lambda_nn : float Selected value for ``lambda_nn``. Raises ------ ValueError If inputs are invalid (panel shape, treated_periods bounds, empty grids, or inconsistent CV sampling arguments). RuntimeError If all parameter combinations fail (e.g., all placebo evaluations return non-finite ATEs due to solver failures). """ Y = np.asarray(Y_control, dtype=float) N, _ = _validate_panel(Y, treated_periods) unit_grid_list = _as_list(unit_grid, name="unit_grid") time_grid_list = _as_list(time_grid, name="time_grid") nn_grid_list = _as_list(nn_grid, name="nn_grid") placebo_sets = _generate_placebo_sets( N, cv_sampling_method=cv_sampling_method, n_treated_units=n_treated_units, n_trials=n_trials, K=K, random_state=random_seed, ) best_params: Optional[Tuple[float, float, float]] = None best_score: float = float("inf") for lambda_unit in unit_grid_list: for lambda_time in time_grid_list: for lambda_nn in nn_grid_list: score = _placebo_rmse_for_lambdas( Y=Y, placebo_sets=placebo_sets, treated_periods=treated_periods, lambda_unit=float(lambda_unit), lambda_time=float(lambda_time), lambda_nn=float(lambda_nn), n_jobs=n_jobs, prefer=prefer, solver=solver, verbose=verbose, ) if score is None: continue if score < best_score: best_score = score best_params = (float(lambda_unit), float(lambda_time), float(lambda_nn)) if best_params is None: raise RuntimeError("All parameter combinations failed during joint CV. Check solver/settings.") return best_params