Skip to content

Datasets

Synthetic Streams

onlinecml.datasets.linear_causal.LinearCausalStream

Synthetic streaming dataset with a linear DGP and constant ATE.

Generates a stream of (features, treatment, outcome, true_cate) tuples from a linear data-generating process with confounding. The true CATE is constant (equal to true_ate) for all units.

Parameters:

Name Type Description Default
n int

Number of observations to generate. Default 1000.

1000
n_features int

Number of covariates. Default 5.

5
true_ate float

The true constant Average Treatment Effect. Default 2.0.

2.0
confounding_strength float

Controls how strongly covariates predict treatment assignment. 0.0 = no confounding (RCT), 1.0 = strong confounding. Default 0.5.

0.5
noise_std float

Standard deviation of the outcome noise. Default 1.0.

1.0
seed int or None

Random seed for reproducibility. If None, results are random.

None
Notes

Data-generating process:

  • beta ~ N(0, I_p) — fixed coefficient vector per stream iteration
  • X_i ~ N(0, I_p) — independent covariates
  • logit(P(W=1|X)) = confounding_strength * X @ beta / sqrt(p)
  • W_i ~ Bernoulli(sigmoid(logit_p))
  • Y_i = X @ beta + W_i * true_ate + eps, eps ~ N(0, noise_std^2)

The coefficient vector beta is re-sampled once per __iter__ call. Re-iterating with the same seed gives the same stream.

Because CATE is constant, true_cate == true_ate for all units.

Examples:

>>> stream = LinearCausalStream(n=500, n_features=3, true_ate=2.0, seed=42)
>>> for x, w, y, tau in stream:
...     pass  # process one observation at a time
>>> len(stream)
500
Source code in onlinecml/datasets/linear_causal.py
class LinearCausalStream:
    """Synthetic streaming dataset with a linear DGP and constant ATE.

    Generates a stream of (features, treatment, outcome, true_cate) tuples
    from a linear data-generating process with confounding. The true CATE
    is constant (equal to ``true_ate``) for all units.

    Parameters
    ----------
    n : int
        Number of observations to generate. Default 1000.
    n_features : int
        Number of covariates. Default 5.
    true_ate : float
        The true constant Average Treatment Effect. Default 2.0.
    confounding_strength : float
        Controls how strongly covariates predict treatment assignment.
        0.0 = no confounding (RCT), 1.0 = strong confounding. Default 0.5.
    noise_std : float
        Standard deviation of the outcome noise. Default 1.0.
    seed : int or None
        Random seed for reproducibility. If None, results are random.

    Notes
    -----
    Data-generating process:

    - ``beta ~ N(0, I_p)`` — fixed coefficient vector per stream iteration
    - ``X_i ~ N(0, I_p)`` — independent covariates
    - ``logit(P(W=1|X)) = confounding_strength * X @ beta / sqrt(p)``
    - ``W_i ~ Bernoulli(sigmoid(logit_p))``
    - ``Y_i = X @ beta + W_i * true_ate + eps``, ``eps ~ N(0, noise_std^2)``

    The coefficient vector ``beta`` is re-sampled once per ``__iter__``
    call. Re-iterating with the same seed gives the same stream.

    Because CATE is constant, ``true_cate == true_ate`` for all units.

    Examples
    --------
    >>> stream = LinearCausalStream(n=500, n_features=3, true_ate=2.0, seed=42)
    >>> for x, w, y, tau in stream:
    ...     pass  # process one observation at a time
    >>> len(stream)
    500
    """

    def __init__(
        self,
        n: int = 1000,
        n_features: int = 5,
        true_ate: float = 2.0,
        confounding_strength: float = 0.5,
        noise_std: float = 1.0,
        seed: int | None = None,
    ) -> None:
        self.n = n
        self.n_features = n_features
        self.true_ate = true_ate
        self.confounding_strength = confounding_strength
        self.noise_std = noise_std
        self.seed = seed

    def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
        """Iterate over the stream, yielding one observation at a time.

        Yields
        ------
        x : dict
            Feature dictionary with keys ``'x0'``, ``'x1'``, ...
        treatment : int
            Treatment indicator (0 or 1).
        outcome : float
            Observed outcome.
        true_cate : float
            The true CATE for this unit (equals ``true_ate`` for all units).
        """
        rng = np.random.default_rng(self.seed)
        beta = rng.standard_normal(self.n_features)
        norm = math.sqrt(self.n_features)

        for _ in range(self.n):
            X_i = rng.standard_normal(self.n_features)
            logit_p = self.confounding_strength * (X_i @ beta) / norm
            p = 1.0 / (1.0 + math.exp(-logit_p))
            W_i = int(rng.binomial(1, p))
            eps = rng.normal(0.0, self.noise_std)
            Y_i = float(X_i @ beta) + W_i * self.true_ate + eps
            x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
            yield x_dict, W_i, Y_i, float(self.true_ate)

    def __len__(self) -> int:
        """Total number of observations in this stream."""
        return self.n

__iter__()

Iterate over the stream, yielding one observation at a time.

Yields:

Name Type Description
x dict

Feature dictionary with keys 'x0', 'x1', ...

treatment int

Treatment indicator (0 or 1).

outcome float

Observed outcome.

true_cate float

The true CATE for this unit (equals true_ate for all units).

Source code in onlinecml/datasets/linear_causal.py
def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
    """Iterate over the stream, yielding one observation at a time.

    Yields
    ------
    x : dict
        Feature dictionary with keys ``'x0'``, ``'x1'``, ...
    treatment : int
        Treatment indicator (0 or 1).
    outcome : float
        Observed outcome.
    true_cate : float
        The true CATE for this unit (equals ``true_ate`` for all units).
    """
    rng = np.random.default_rng(self.seed)
    beta = rng.standard_normal(self.n_features)
    norm = math.sqrt(self.n_features)

    for _ in range(self.n):
        X_i = rng.standard_normal(self.n_features)
        logit_p = self.confounding_strength * (X_i @ beta) / norm
        p = 1.0 / (1.0 + math.exp(-logit_p))
        W_i = int(rng.binomial(1, p))
        eps = rng.normal(0.0, self.noise_std)
        Y_i = float(X_i @ beta) + W_i * self.true_ate + eps
        x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
        yield x_dict, W_i, Y_i, float(self.true_ate)

__len__()

Total number of observations in this stream.

Source code in onlinecml/datasets/linear_causal.py
def __len__(self) -> int:
    """Total number of observations in this stream."""
    return self.n

onlinecml.datasets.heterogeneous_causal.HeterogeneousCausalStream

Synthetic streaming dataset with heterogeneous treatment effects.

Generates a stream of (features, treatment, outcome, true_cate) tuples where the CATE varies across units according to a specified functional form. Designed for benchmarking CATE estimators.

Parameters:

Name Type Description Default
n int

Number of observations to generate. Default 1000.

1000
n_features int

Number of covariates (at least 2 required for nonlinear DGP). Default 5.

5
true_ate float

Base treatment effect. The marginal average E[tau(X)] = true_ate for all heterogeneity types when covariates are standard normal. Default 2.0.

2.0
heterogeneity str

Functional form of the CATE:

  • 'linear'tau(X) = true_ate * (1 + 0.5 * X[0])
  • 'nonlinear'tau(X) = true_ate + X[0] + sin(X[1]) * 0.5
  • 'step'tau(X) = 2 * true_ate if X[0] > 0 else 0.5 * true_ate

Default 'nonlinear'.

'nonlinear'
confounding_strength float

Controls how strongly covariates predict treatment assignment. Default 0.5.

0.5
noise_std float

Standard deviation of the outcome noise. Default 1.0.

1.0
seed int or None

Random seed for reproducibility.

None
Notes

For standard-normal covariates, E[tau(X)] = true_ate for all three heterogeneity types:

  • 'linear': E[true_ate * (1 + 0.5*X0)] = true_ate * (1 + 0) = true_ate
  • 'nonlinear': E[true_ate + X0 + sin(X1)*0.5] = true_ate + 0 + 0 = true_ate
  • 'step': E[2*tau * 1{X0>0} + 0.5*tau * 1{X0<=0}] = true_ate * 1.25 (step DGP does NOT have population ATE exactly equal to true_ate)

Use population_ate() to get the theoretical marginal ATE.

Examples:

>>> stream = HeterogeneousCausalStream(n=500, true_ate=2.0, seed=42)
>>> x, w, y, tau = next(iter(stream))
>>> isinstance(tau, float)
True
Source code in onlinecml/datasets/heterogeneous_causal.py
class HeterogeneousCausalStream:
    """Synthetic streaming dataset with heterogeneous treatment effects.

    Generates a stream of (features, treatment, outcome, true_cate) tuples
    where the CATE varies across units according to a specified functional
    form. Designed for benchmarking CATE estimators.

    Parameters
    ----------
    n : int
        Number of observations to generate. Default 1000.
    n_features : int
        Number of covariates (at least 2 required for nonlinear DGP).
        Default 5.
    true_ate : float
        Base treatment effect. The marginal average ``E[tau(X)] = true_ate``
        for all heterogeneity types when covariates are standard normal.
        Default 2.0.
    heterogeneity : str
        Functional form of the CATE:

        - ``'linear'`` — ``tau(X) = true_ate * (1 + 0.5 * X[0])``
        - ``'nonlinear'`` — ``tau(X) = true_ate + X[0] + sin(X[1]) * 0.5``
        - ``'step'`` — ``tau(X) = 2 * true_ate if X[0] > 0 else 0.5 * true_ate``

        Default ``'nonlinear'``.
    confounding_strength : float
        Controls how strongly covariates predict treatment assignment.
        Default 0.5.
    noise_std : float
        Standard deviation of the outcome noise. Default 1.0.
    seed : int or None
        Random seed for reproducibility.

    Notes
    -----
    For standard-normal covariates, ``E[tau(X)] = true_ate`` for all
    three heterogeneity types:

    - ``'linear'``: ``E[true_ate * (1 + 0.5*X0)] = true_ate * (1 + 0) = true_ate``
    - ``'nonlinear'``: ``E[true_ate + X0 + sin(X1)*0.5] = true_ate + 0 + 0 = true_ate``
    - ``'step'``: ``E[2*tau * 1{X0>0} + 0.5*tau * 1{X0<=0}] = true_ate * 1.25``
      (step DGP does NOT have population ATE exactly equal to ``true_ate``)

    Use ``population_ate()`` to get the theoretical marginal ATE.

    Examples
    --------
    >>> stream = HeterogeneousCausalStream(n=500, true_ate=2.0, seed=42)
    >>> x, w, y, tau = next(iter(stream))
    >>> isinstance(tau, float)
    True
    """

    def __init__(
        self,
        n: int = 1000,
        n_features: int = 5,
        true_ate: float = 2.0,
        heterogeneity: str = "nonlinear",
        confounding_strength: float = 0.5,
        noise_std: float = 1.0,
        seed: int | None = None,
    ) -> None:
        if heterogeneity not in ("linear", "nonlinear", "step"):
            raise ValueError(
                f"heterogeneity must be 'linear', 'nonlinear', or 'step'; got {heterogeneity!r}"
            )
        if n_features < 2 and heterogeneity == "nonlinear":
            raise ValueError("nonlinear heterogeneity requires at least 2 features")
        self.n = n
        self.n_features = n_features
        self.true_ate = true_ate
        self.heterogeneity = heterogeneity
        self.confounding_strength = confounding_strength
        self.noise_std = noise_std
        self.seed = seed

    def _tau(self, X_i: np.ndarray) -> float:
        """Compute the individual CATE for a feature vector.

        Parameters
        ----------
        X_i : np.ndarray
            Feature vector of shape ``(n_features,)``.

        Returns
        -------
        float
            Individual CATE for this unit.
        """
        if self.heterogeneity == "linear":
            return float(self.true_ate * (1.0 + 0.5 * X_i[0]))
        elif self.heterogeneity == "nonlinear":
            return float(self.true_ate + X_i[0] + math.sin(X_i[1]) * 0.5)
        else:  # 'step'
            return float(2.0 * self.true_ate if X_i[0] > 0 else 0.5 * self.true_ate)

    def population_ate(self) -> float:
        """Return the theoretical marginal ATE, ``E[tau(X)]``.

        For ``'linear'`` and ``'nonlinear'`` DGPs with standard-normal
        covariates, this equals ``true_ate``. For ``'step'``, it equals
        ``1.25 * true_ate``.

        Returns
        -------
        float
            Theoretical population average treatment effect.
        """
        if self.heterogeneity in ("linear", "nonlinear"):
            return float(self.true_ate)
        else:  # 'step': E[tau] = 2*tau*0.5 + 0.5*tau*0.5 = 1.25*tau
            return float(1.25 * self.true_ate)

    def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
        """Iterate over the stream, yielding one observation at a time.

        Yields
        ------
        x : dict
            Feature dictionary with keys ``'x0'``, ``'x1'``, ...
        treatment : int
            Treatment indicator (0 or 1).
        outcome : float
            Observed outcome.
        true_cate : float
            The true individual CATE for this unit.
        """
        rng = np.random.default_rng(self.seed)
        beta = rng.standard_normal(self.n_features)
        norm = math.sqrt(self.n_features)

        for _ in range(self.n):
            X_i = rng.standard_normal(self.n_features)
            logit_p = self.confounding_strength * (X_i @ beta) / norm
            p = 1.0 / (1.0 + math.exp(-logit_p))
            W_i = int(rng.binomial(1, p))
            tau_i = self._tau(X_i)
            eps = rng.normal(0.0, self.noise_std)
            Y_i = float(X_i @ beta) + W_i * tau_i + eps
            x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
            yield x_dict, W_i, Y_i, tau_i

    def __len__(self) -> int:
        """Total number of observations in this stream."""
        return self.n

__iter__()

Iterate over the stream, yielding one observation at a time.

Yields:

Name Type Description
x dict

Feature dictionary with keys 'x0', 'x1', ...

treatment int

Treatment indicator (0 or 1).

outcome float

Observed outcome.

true_cate float

The true individual CATE for this unit.

Source code in onlinecml/datasets/heterogeneous_causal.py
def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
    """Iterate over the stream, yielding one observation at a time.

    Yields
    ------
    x : dict
        Feature dictionary with keys ``'x0'``, ``'x1'``, ...
    treatment : int
        Treatment indicator (0 or 1).
    outcome : float
        Observed outcome.
    true_cate : float
        The true individual CATE for this unit.
    """
    rng = np.random.default_rng(self.seed)
    beta = rng.standard_normal(self.n_features)
    norm = math.sqrt(self.n_features)

    for _ in range(self.n):
        X_i = rng.standard_normal(self.n_features)
        logit_p = self.confounding_strength * (X_i @ beta) / norm
        p = 1.0 / (1.0 + math.exp(-logit_p))
        W_i = int(rng.binomial(1, p))
        tau_i = self._tau(X_i)
        eps = rng.normal(0.0, self.noise_std)
        Y_i = float(X_i @ beta) + W_i * tau_i + eps
        x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
        yield x_dict, W_i, Y_i, tau_i

__len__()

Total number of observations in this stream.

Source code in onlinecml/datasets/heterogeneous_causal.py
def __len__(self) -> int:
    """Total number of observations in this stream."""
    return self.n

population_ate()

Return the theoretical marginal ATE, E[tau(X)].

For 'linear' and 'nonlinear' DGPs with standard-normal covariates, this equals true_ate. For 'step', it equals 1.25 * true_ate.

Returns:

Type Description
float

Theoretical population average treatment effect.

Source code in onlinecml/datasets/heterogeneous_causal.py
def population_ate(self) -> float:
    """Return the theoretical marginal ATE, ``E[tau(X)]``.

    For ``'linear'`` and ``'nonlinear'`` DGPs with standard-normal
    covariates, this equals ``true_ate``. For ``'step'``, it equals
    ``1.25 * true_ate``.

    Returns
    -------
    float
        Theoretical population average treatment effect.
    """
    if self.heterogeneity in ("linear", "nonlinear"):
        return float(self.true_ate)
    else:  # 'step': E[tau] = 2*tau*0.5 + 0.5*tau*0.5 = 1.25*tau
        return float(1.25 * self.true_ate)

onlinecml.datasets.drifting_causal.DriftingCausalStream

Synthetic streaming dataset where the ATE shifts at a known changepoint.

Generates observations from a linear DGP with confounding. The true ATE is true_ate for the first changepoint observations, then shifts to shifted_ate for the remainder. The changepoint is announced via the changepoint attribute so that downstream drift monitors can be evaluated against the known ground truth.

Parameters:

Name Type Description Default
n int

Total number of observations to generate. Default 2000.

2000
n_features int

Number of covariates. Default 5.

5
true_ate float

ATE before the changepoint. Default 2.0.

2.0
shifted_ate float

ATE after the changepoint. Default -1.0.

-1.0
changepoint int or None

Index (0-based) at which the ATE shifts. Defaults to n // 2.

None
confounding_strength float

Strength of confounding. 0.0 = RCT, 1.0 = strong. Default 0.5.

0.5
noise_std float

Standard deviation of outcome noise. Default 1.0.

1.0
seed int or None

Random seed for reproducibility.

None
Notes

The true CATE returned per observation reflects the current ATE segment: true_ate before the changepoint and shifted_ate after.

Examples:

>>> stream = DriftingCausalStream(n=1000, true_ate=2.0, shifted_ate=-1.0, seed=0)
>>> for x, w, y, tau in stream:
...     pass  # tau shifts from 2.0 to -1.0 at step 500
Source code in onlinecml/datasets/drifting_causal.py
class DriftingCausalStream:
    """Synthetic streaming dataset where the ATE shifts at a known changepoint.

    Generates observations from a linear DGP with confounding. The true ATE
    is ``true_ate`` for the first ``changepoint`` observations, then shifts
    to ``shifted_ate`` for the remainder. The changepoint is announced via the
    ``changepoint`` attribute so that downstream drift monitors can be evaluated
    against the known ground truth.

    Parameters
    ----------
    n : int
        Total number of observations to generate. Default 2000.
    n_features : int
        Number of covariates. Default 5.
    true_ate : float
        ATE before the changepoint. Default 2.0.
    shifted_ate : float
        ATE after the changepoint. Default -1.0.
    changepoint : int or None
        Index (0-based) at which the ATE shifts. Defaults to ``n // 2``.
    confounding_strength : float
        Strength of confounding. 0.0 = RCT, 1.0 = strong. Default 0.5.
    noise_std : float
        Standard deviation of outcome noise. Default 1.0.
    seed : int or None
        Random seed for reproducibility.

    Notes
    -----
    The true CATE returned per observation reflects the current ATE segment:
    ``true_ate`` before the changepoint and ``shifted_ate`` after.

    Examples
    --------
    >>> stream = DriftingCausalStream(n=1000, true_ate=2.0, shifted_ate=-1.0, seed=0)
    >>> for x, w, y, tau in stream:
    ...     pass  # tau shifts from 2.0 to -1.0 at step 500
    """

    def __init__(
        self,
        n: int = 2000,
        n_features: int = 5,
        true_ate: float = 2.0,
        shifted_ate: float = -1.0,
        changepoint: int | None = None,
        confounding_strength: float = 0.5,
        noise_std: float = 1.0,
        seed: int | None = None,
    ) -> None:
        self.n = n
        self.n_features = n_features
        self.true_ate = true_ate
        self.shifted_ate = shifted_ate
        self.changepoint = changepoint if changepoint is not None else n // 2
        self.confounding_strength = confounding_strength
        self.noise_std = noise_std
        self.seed = seed

    def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
        """Iterate over the stream, yielding one observation at a time.

        Yields
        ------
        x : dict
            Feature dictionary with keys ``'x0'``, ``'x1'``, ...
        treatment : int
            Treatment indicator (0 or 1).
        outcome : float
            Observed outcome.
        true_cate : float
            True CATE for this unit. Equals ``true_ate`` before the changepoint
            and ``shifted_ate`` from the changepoint onward.
        """
        rng = np.random.default_rng(self.seed)
        beta = rng.standard_normal(self.n_features)
        norm = math.sqrt(self.n_features)

        for i in range(self.n):
            ate = self.true_ate if i < self.changepoint else self.shifted_ate
            X_i = rng.standard_normal(self.n_features)
            logit_p = self.confounding_strength * (X_i @ beta) / norm
            p = 1.0 / (1.0 + math.exp(-logit_p))
            W_i = int(rng.binomial(1, p))
            eps = rng.normal(0.0, self.noise_std)
            Y_i = float(X_i @ beta) + W_i * ate + eps
            x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
            yield x_dict, W_i, Y_i, float(ate)

    def __len__(self) -> int:
        """Total number of observations in this stream."""
        return self.n

__iter__()

Iterate over the stream, yielding one observation at a time.

Yields:

Name Type Description
x dict

Feature dictionary with keys 'x0', 'x1', ...

treatment int

Treatment indicator (0 or 1).

outcome float

Observed outcome.

true_cate float

True CATE for this unit. Equals true_ate before the changepoint and shifted_ate from the changepoint onward.

Source code in onlinecml/datasets/drifting_causal.py
def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
    """Iterate over the stream, yielding one observation at a time.

    Yields
    ------
    x : dict
        Feature dictionary with keys ``'x0'``, ``'x1'``, ...
    treatment : int
        Treatment indicator (0 or 1).
    outcome : float
        Observed outcome.
    true_cate : float
        True CATE for this unit. Equals ``true_ate`` before the changepoint
        and ``shifted_ate`` from the changepoint onward.
    """
    rng = np.random.default_rng(self.seed)
    beta = rng.standard_normal(self.n_features)
    norm = math.sqrt(self.n_features)

    for i in range(self.n):
        ate = self.true_ate if i < self.changepoint else self.shifted_ate
        X_i = rng.standard_normal(self.n_features)
        logit_p = self.confounding_strength * (X_i @ beta) / norm
        p = 1.0 / (1.0 + math.exp(-logit_p))
        W_i = int(rng.binomial(1, p))
        eps = rng.normal(0.0, self.noise_std)
        Y_i = float(X_i @ beta) + W_i * ate + eps
        x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
        yield x_dict, W_i, Y_i, float(ate)

__len__()

Total number of observations in this stream.

Source code in onlinecml/datasets/drifting_causal.py
def __len__(self) -> int:
    """Total number of observations in this stream."""
    return self.n

onlinecml.datasets.unbalanced_causal.UnbalancedCausalStream

Synthetic streaming dataset with extreme treatment probabilities.

Generates observations where most units are assigned to one arm, creating near-positivity violations. Designed to stress-test overlap diagnostics and the stability of IPW-based estimators under extreme propensity scores.

Parameters:

Name Type Description Default
n int

Number of observations to generate. Default 1000.

1000
n_features int

Number of covariates. Default 5.

5
true_ate float

True Average Treatment Effect. Default 2.0.

2.0
treatment_rate float

Target marginal probability of treatment. Values close to 0 or 1 create the most severe positivity violations. Default 0.1.

0.1
confounding_strength float

Controls how strongly X predicts treatment on top of the marginal imbalance. 0.0 = only marginal imbalance, 1.0 = strong additional confounding. Default 1.5.

1.5
noise_std float

Standard deviation of outcome noise. Default 1.0.

1.0
seed int or None

Random seed for reproducibility.

None
Notes

The logit for treatment assignment is:

.. math::

\text{logit}(P(W=1|X)) = \text{logit}(\text{treatment\_rate})
    + \text{confounding\_strength} \cdot X\beta / \sqrt{p}

This ensures that the marginal treatment rate is approximately treatment_rate while adding covariate-driven confounding.

Examples:

>>> stream = UnbalancedCausalStream(n=500, treatment_rate=0.1, seed=42)
>>> rates = [w for _, w, _, _ in stream]
>>> abs(sum(rates) / len(rates) - 0.1) < 0.1  # roughly 10% treated
True
Source code in onlinecml/datasets/unbalanced_causal.py
class UnbalancedCausalStream:
    """Synthetic streaming dataset with extreme treatment probabilities.

    Generates observations where most units are assigned to one arm, creating
    near-positivity violations. Designed to stress-test overlap diagnostics
    and the stability of IPW-based estimators under extreme propensity scores.

    Parameters
    ----------
    n : int
        Number of observations to generate. Default 1000.
    n_features : int
        Number of covariates. Default 5.
    true_ate : float
        True Average Treatment Effect. Default 2.0.
    treatment_rate : float
        Target marginal probability of treatment. Values close to 0 or 1
        create the most severe positivity violations. Default 0.1.
    confounding_strength : float
        Controls how strongly X predicts treatment on top of the marginal
        imbalance. 0.0 = only marginal imbalance, 1.0 = strong additional
        confounding. Default 1.5.
    noise_std : float
        Standard deviation of outcome noise. Default 1.0.
    seed : int or None
        Random seed for reproducibility.

    Notes
    -----
    The logit for treatment assignment is:

    .. math::

        \\text{logit}(P(W=1|X)) = \\text{logit}(\\text{treatment\\_rate})
            + \\text{confounding\\_strength} \\cdot X\\beta / \\sqrt{p}

    This ensures that the marginal treatment rate is approximately
    ``treatment_rate`` while adding covariate-driven confounding.

    Examples
    --------
    >>> stream = UnbalancedCausalStream(n=500, treatment_rate=0.1, seed=42)
    >>> rates = [w for _, w, _, _ in stream]
    >>> abs(sum(rates) / len(rates) - 0.1) < 0.1  # roughly 10% treated
    True
    """

    def __init__(
        self,
        n: int = 1000,
        n_features: int = 5,
        true_ate: float = 2.0,
        treatment_rate: float = 0.1,
        confounding_strength: float = 1.5,
        noise_std: float = 1.0,
        seed: int | None = None,
    ) -> None:
        if not 0.0 < treatment_rate < 1.0:
            raise ValueError("treatment_rate must be strictly between 0 and 1.")
        self.n = n
        self.n_features = n_features
        self.true_ate = true_ate
        self.treatment_rate = treatment_rate
        self.confounding_strength = confounding_strength
        self.noise_std = noise_std
        self.seed = seed

    def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
        """Iterate over the stream, yielding one observation at a time.

        Yields
        ------
        x : dict
            Feature dictionary with keys ``'x0'``, ``'x1'``, ...
        treatment : int
            Treatment indicator (0 or 1).
        outcome : float
            Observed outcome.
        true_cate : float
            True CATE for this unit (constant, equals ``true_ate``).
        """
        rng = np.random.default_rng(self.seed)
        beta = rng.standard_normal(self.n_features)
        norm = math.sqrt(self.n_features)
        # Intercept that sets marginal treatment rate
        intercept = math.log(self.treatment_rate / (1.0 - self.treatment_rate))

        for _ in range(self.n):
            X_i = rng.standard_normal(self.n_features)
            logit_p = intercept + self.confounding_strength * (X_i @ beta) / norm
            p = 1.0 / (1.0 + math.exp(-logit_p))
            W_i = int(rng.binomial(1, p))
            eps = rng.normal(0.0, self.noise_std)
            Y_i = float(X_i @ beta) + W_i * self.true_ate + eps
            x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
            yield x_dict, W_i, Y_i, float(self.true_ate)

    def __len__(self) -> int:
        """Total number of observations in this stream."""
        return self.n

__iter__()

Iterate over the stream, yielding one observation at a time.

Yields:

Name Type Description
x dict

Feature dictionary with keys 'x0', 'x1', ...

treatment int

Treatment indicator (0 or 1).

outcome float

Observed outcome.

true_cate float

True CATE for this unit (constant, equals true_ate).

Source code in onlinecml/datasets/unbalanced_causal.py
def __iter__(self) -> Iterator[tuple[dict, int, float, float]]:
    """Iterate over the stream, yielding one observation at a time.

    Yields
    ------
    x : dict
        Feature dictionary with keys ``'x0'``, ``'x1'``, ...
    treatment : int
        Treatment indicator (0 or 1).
    outcome : float
        Observed outcome.
    true_cate : float
        True CATE for this unit (constant, equals ``true_ate``).
    """
    rng = np.random.default_rng(self.seed)
    beta = rng.standard_normal(self.n_features)
    norm = math.sqrt(self.n_features)
    # Intercept that sets marginal treatment rate
    intercept = math.log(self.treatment_rate / (1.0 - self.treatment_rate))

    for _ in range(self.n):
        X_i = rng.standard_normal(self.n_features)
        logit_p = intercept + self.confounding_strength * (X_i @ beta) / norm
        p = 1.0 / (1.0 + math.exp(-logit_p))
        W_i = int(rng.binomial(1, p))
        eps = rng.normal(0.0, self.noise_std)
        Y_i = float(X_i @ beta) + W_i * self.true_ate + eps
        x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
        yield x_dict, W_i, Y_i, float(self.true_ate)

__len__()

Total number of observations in this stream.

Source code in onlinecml/datasets/unbalanced_causal.py
def __len__(self) -> int:
    """Total number of observations in this stream."""
    return self.n

onlinecml.datasets.continuous_treatment.ContinuousTreatmentStream

Synthetic streaming dataset with a continuous treatment (dose-response).

Generates observations where the treatment W is a continuous random variable (uniform or normal) rather than binary. The outcome follows a dose-response model Y = X @ beta + g(W) + noise, where g is a known dose-response function.

The fourth element yielded per observation is the marginal causal effect dE[Y]/dW at the observed dose W:

  • 'linear'g(W) = true_effect * W; marginal = true_effect
  • 'quadratic'g(W) = true_effect * W^2; marginal = 2 * true_effect * W
  • 'threshold'g(W) = true_effect * (W > 0.0); marginal = 0 (non-differentiable, yields g(W))

Parameters:

Name Type Description Default
n int

Number of observations. Default 1000.

1000
n_features int

Number of covariates. Default 5.

5
true_effect float

Scaling of the dose-response function. Default 1.0.

1.0
dose_response str

One of 'linear', 'quadratic', 'threshold'. Default 'linear'.

'linear'
w_distribution str

Treatment distribution: 'uniform' or 'normal'. Default 'uniform'.

'uniform'
w_min float

Lower bound for uniform treatment draw. Default -1.0.

-1.0
w_max float

Upper bound for uniform treatment draw. Default 1.0.

1.0
w_mean float

Mean for normal treatment draw. Default 0.0.

0.0
w_std float

Standard deviation for normal treatment draw. Default 1.0.

1.0
confounding_strength float

How much X shifts the expected treatment value. 0 = exogenous. Default 0.3.

0.3
noise_std float

Outcome noise standard deviation. Default 1.0.

1.0
seed int or None

Random seed for reproducibility.

None

Examples:

>>> stream = ContinuousTreatmentStream(n=200, dose_response='linear', seed=0)
>>> for x, w, y, marginal in stream:
...     assert isinstance(w, float)
...     assert isinstance(marginal, float)
Source code in onlinecml/datasets/continuous_treatment.py
class ContinuousTreatmentStream:
    """Synthetic streaming dataset with a continuous treatment (dose-response).

    Generates observations where the treatment ``W`` is a continuous random
    variable (uniform or normal) rather than binary. The outcome follows a
    dose-response model ``Y = X @ beta + g(W) + noise``, where ``g`` is a
    known dose-response function.

    The fourth element yielded per observation is the **marginal causal
    effect** ``dE[Y]/dW`` at the observed dose ``W``:

    - ``'linear'``    → ``g(W) = true_effect * W``;          marginal = ``true_effect``
    - ``'quadratic'`` → ``g(W) = true_effect * W^2``;        marginal = ``2 * true_effect * W``
    - ``'threshold'`` → ``g(W) = true_effect * (W > 0.0)``; marginal = ``0`` (non-differentiable, yields ``g(W)``)

    Parameters
    ----------
    n : int
        Number of observations. Default 1000.
    n_features : int
        Number of covariates. Default 5.
    true_effect : float
        Scaling of the dose-response function. Default 1.0.
    dose_response : str
        One of ``'linear'``, ``'quadratic'``, ``'threshold'``. Default ``'linear'``.
    w_distribution : str
        Treatment distribution: ``'uniform'`` or ``'normal'``. Default ``'uniform'``.
    w_min : float
        Lower bound for uniform treatment draw. Default -1.0.
    w_max : float
        Upper bound for uniform treatment draw. Default 1.0.
    w_mean : float
        Mean for normal treatment draw. Default 0.0.
    w_std : float
        Standard deviation for normal treatment draw. Default 1.0.
    confounding_strength : float
        How much ``X`` shifts the expected treatment value. 0 = exogenous.
        Default 0.3.
    noise_std : float
        Outcome noise standard deviation. Default 1.0.
    seed : int or None
        Random seed for reproducibility.

    Examples
    --------
    >>> stream = ContinuousTreatmentStream(n=200, dose_response='linear', seed=0)
    >>> for x, w, y, marginal in stream:
    ...     assert isinstance(w, float)
    ...     assert isinstance(marginal, float)
    """

    _DOSE_RESPONSES = ("linear", "quadratic", "threshold")
    _DISTRIBUTIONS  = ("uniform", "normal")

    def __init__(
        self,
        n: int = 1000,
        n_features: int = 5,
        true_effect: float = 1.0,
        dose_response: str = "linear",
        w_distribution: str = "uniform",
        w_min: float = -1.0,
        w_max: float = 1.0,
        w_mean: float = 0.0,
        w_std: float = 1.0,
        confounding_strength: float = 0.3,
        noise_std: float = 1.0,
        seed: int | None = None,
    ) -> None:
        if dose_response not in self._DOSE_RESPONSES:
            raise ValueError(f"dose_response must be one of {self._DOSE_RESPONSES}.")
        if w_distribution not in self._DISTRIBUTIONS:
            raise ValueError(f"w_distribution must be one of {self._DISTRIBUTIONS}.")
        self.n = n
        self.n_features = n_features
        self.true_effect = true_effect
        self.dose_response = dose_response
        self.w_distribution = w_distribution
        self.w_min = w_min
        self.w_max = w_max
        self.w_mean = w_mean
        self.w_std = w_std
        self.confounding_strength = confounding_strength
        self.noise_std = noise_std
        self.seed = seed

    # ------------------------------------------------------------------

    def _g(self, w: float) -> float:
        """Dose-response function value at dose ``w``."""
        if self.dose_response == "linear":
            return self.true_effect * w
        if self.dose_response == "quadratic":
            return self.true_effect * w ** 2
        # threshold
        return self.true_effect * float(w > 0.0)

    def _marginal(self, w: float) -> float:
        """Marginal causal effect dE[Y]/dW at dose ``w``."""
        if self.dose_response == "linear":
            return self.true_effect
        if self.dose_response == "quadratic":
            return 2.0 * self.true_effect * w
        # threshold is non-differentiable; yield g(W) as the "effect"
        return self._g(w)

    # ------------------------------------------------------------------

    def __iter__(self) -> Iterator[tuple[dict, float, float, float]]:
        """Iterate over the stream, yielding one observation at a time.

        Yields
        ------
        x : dict
            Feature dictionary ``{'x0': ..., 'x1': ..., ...}``.
        w : float
            Continuous treatment dose.
        y : float
            Observed outcome.
        marginal_effect : float
            True marginal causal effect ``dE[Y]/dW`` at the observed dose.
        """
        rng = np.random.default_rng(self.seed)
        beta = rng.standard_normal(self.n_features)
        norm = math.sqrt(self.n_features)

        for _ in range(self.n):
            X_i = rng.standard_normal(self.n_features)
            x_effect = float(X_i @ beta) / norm  # covariate signal

            # Treatment draw (possibly confounded)
            if self.w_distribution == "uniform":
                shift = self.confounding_strength * x_effect * (self.w_max - self.w_min) * 0.5
                W_i = float(rng.uniform(self.w_min + shift, self.w_max + shift))
            else:
                shift = self.confounding_strength * x_effect * self.w_std
                W_i = float(rng.normal(self.w_mean + shift, self.w_std))

            eps = rng.normal(0.0, self.noise_std)
            Y_i = x_effect + self._g(W_i) + eps
            x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
            yield x_dict, W_i, Y_i, self._marginal(W_i)

    def __len__(self) -> int:
        """Total number of observations in this stream."""
        return self.n

__iter__()

Iterate over the stream, yielding one observation at a time.

Yields:

Name Type Description
x dict

Feature dictionary {'x0': ..., 'x1': ..., ...}.

w float

Continuous treatment dose.

y float

Observed outcome.

marginal_effect float

True marginal causal effect dE[Y]/dW at the observed dose.

Source code in onlinecml/datasets/continuous_treatment.py
def __iter__(self) -> Iterator[tuple[dict, float, float, float]]:
    """Iterate over the stream, yielding one observation at a time.

    Yields
    ------
    x : dict
        Feature dictionary ``{'x0': ..., 'x1': ..., ...}``.
    w : float
        Continuous treatment dose.
    y : float
        Observed outcome.
    marginal_effect : float
        True marginal causal effect ``dE[Y]/dW`` at the observed dose.
    """
    rng = np.random.default_rng(self.seed)
    beta = rng.standard_normal(self.n_features)
    norm = math.sqrt(self.n_features)

    for _ in range(self.n):
        X_i = rng.standard_normal(self.n_features)
        x_effect = float(X_i @ beta) / norm  # covariate signal

        # Treatment draw (possibly confounded)
        if self.w_distribution == "uniform":
            shift = self.confounding_strength * x_effect * (self.w_max - self.w_min) * 0.5
            W_i = float(rng.uniform(self.w_min + shift, self.w_max + shift))
        else:
            shift = self.confounding_strength * x_effect * self.w_std
            W_i = float(rng.normal(self.w_mean + shift, self.w_std))

        eps = rng.normal(0.0, self.noise_std)
        Y_i = x_effect + self._g(W_i) + eps
        x_dict = {f"x{j}": float(X_i[j]) for j in range(self.n_features)}
        yield x_dict, W_i, Y_i, self._marginal(W_i)

__len__()

Total number of observations in this stream.

Source code in onlinecml/datasets/continuous_treatment.py
def __len__(self) -> int:
    """Total number of observations in this stream."""
    return self.n

Real-World Loaders

onlinecml.datasets.real_world

Real-world causal inference benchmark dataset loaders.

All loaders return River-compatible iterators that yield (x_dict, treatment, outcome, true_cate) tuples, where true_cate is None for datasets without known individual treatment effects.

Datasets are downloaded on first use and cached in ~/.onlinecml/data/.

load_ihdp(split=1, shuffle=False, seed=None)

Load the IHDP (Infant Health Development Program) dataset.

Semi-synthetic benchmark for CATE estimation. True individual treatment effects are available for this dataset.

Parameters:

Name Type Description Default
split int

Dataset split (1–10). Default 1.

1
shuffle bool

If True, shuffle rows before iterating.

False
seed int or None

Random seed for shuffling.

None

Yields:

Name Type Description
x dict

25 covariates (x1–x25).

treatment int

1 = received intensive intervention, 0 = control.

outcome float

Observed outcome (cognitive test score).

true_cate float

True individual treatment effect for this unit.

References

Hill, J.L. (2011). Bayesian nonparametric modeling for causal inference. Journal of Computational and Graphical Statistics, 20(1), 217-240.

Source code in onlinecml/datasets/real_world.py
def load_ihdp(split: int = 1, shuffle: bool = False, seed: int | None = None) -> Iterator:
    """Load the IHDP (Infant Health Development Program) dataset.

    Semi-synthetic benchmark for CATE estimation. True individual
    treatment effects are available for this dataset.

    Parameters
    ----------
    split : int
        Dataset split (1–10). Default 1.
    shuffle : bool
        If True, shuffle rows before iterating.
    seed : int or None
        Random seed for shuffling.

    Yields
    ------
    x : dict
        25 covariates (x1–x25).
    treatment : int
        1 = received intensive intervention, 0 = control.
    outcome : float
        Observed outcome (cognitive test score).
    true_cate : float
        True individual treatment effect for this unit.

    References
    ----------
    Hill, J.L. (2011). Bayesian nonparametric modeling for causal
    inference. Journal of Computational and Graphical Statistics,
    20(1), 217-240.
    """
    filename = f"ihdp_npci_{split}.csv"
    cache = _get_cache_path(filename)
    url = _IHDP_URL.replace("ihdp_npci_1.csv", filename)

    if not cache.exists():
        try:
            _download(url, cache)
        except Exception as e:
            raise RuntimeError(
                f"Failed to download IHDP split {split}: {e}\n"
                f"Try manually placing the file at {cache}"
            ) from e

    rows = []
    with open(cache, newline="") as f:
        reader = csv.reader(f)
        header = next(reader)
        for row in reader:
            rows.append(row)

    if shuffle:
        import random
        rng = random.Random(seed)
        rng.shuffle(rows)

    # IHDP CSV columns: treatment, y_factual, y_cfactual, mu0, mu1, x1..x25
    for row in rows:
        treatment = int(float(row[0]))
        y_factual = float(row[1])
        y_cf = float(row[2])
        mu0 = float(row[3])
        mu1 = float(row[4])
        true_cate = mu1 - mu0
        x = {f"x{i+1}": float(row[5 + i]) for i in range(25)}
        yield x, treatment, y_factual, true_cate

load_lalonde(shuffle=False, seed=None)

Load the LaLonde (1986) National Supported Work dataset.

A classic benchmark for causal inference. The treatment is participation in a job training program (NSW). The outcome is real earnings in 1978.

Parameters:

Name Type Description Default
shuffle bool

If True, shuffle the rows before iterating. Default False.

False
seed int or None

Random seed for shuffling.

None

Yields:

Name Type Description
x dict

Covariates: age, education, black, hispanic, married, nodegree, re74 (earnings 1974), re75 (earnings 1975).

treatment int

1 = participated in job training, 0 = control.

outcome float

Real earnings in 1978.

true_cate None

Individual CATE is not known for observational studies.

Notes

The dataset is downloaded on first use and cached in ~/.onlinecml/data/.

References

LaLonde, R.J. (1986). Evaluating the econometric evaluations of training programs with experimental data. American Economic Review, 76(4), 604-620.

Source code in onlinecml/datasets/real_world.py
def load_lalonde(shuffle: bool = False, seed: int | None = None) -> Iterator:
    """Load the LaLonde (1986) National Supported Work dataset.

    A classic benchmark for causal inference. The treatment is
    participation in a job training program (NSW). The outcome is
    real earnings in 1978.

    Parameters
    ----------
    shuffle : bool
        If True, shuffle the rows before iterating. Default False.
    seed : int or None
        Random seed for shuffling.

    Yields
    ------
    x : dict
        Covariates: age, education, black, hispanic, married, nodegree,
        re74 (earnings 1974), re75 (earnings 1975).
    treatment : int
        1 = participated in job training, 0 = control.
    outcome : float
        Real earnings in 1978.
    true_cate : None
        Individual CATE is not known for observational studies.

    Notes
    -----
    The dataset is downloaded on first use and cached in
    ``~/.onlinecml/data/``.

    References
    ----------
    LaLonde, R.J. (1986). Evaluating the econometric evaluations of
    training programs with experimental data. American Economic Review,
    76(4), 604-620.
    """
    cache = _get_cache_path("lalonde_nsw_dw.csv")
    if not cache.exists():
        try:
            _download(_LALONDE_URL, cache)
        except Exception as e:
            raise RuntimeError(
                f"Failed to download LaLonde dataset: {e}\n"
                f"Try manually placing the file at {cache}"
            ) from e

    rows = []
    with open(cache, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            rows.append(row)

    if shuffle:
        import random
        rng = random.Random(seed)
        rng.shuffle(rows)

    for row in rows:
        x = {
            "age": float(row["age"]),
            "educ": float(row["educ"]),
            "black": float(row["black"]),
            "hisp": float(row.get("hisp", row.get("hispanic", 0))),
            "married": float(row["married"]),
            "nodegree": float(row["nodegree"]),
            "re74": float(row["re74"]),
            "re75": float(row["re75"]),
        }
        treatment = int(float(row["treat"]))
        outcome = float(row["re78"])
        yield x, treatment, outcome, None

load_news(n=None, seed=None)

Load a synthetic high-dimensional approximation of the News dataset.

Because the original News dataset requires proprietary preprocessing, this loader generates a synthetic version with matching statistical properties: ~3000 features (sparse), binary treatment, continuous outcome, and heterogeneous treatment effects.

Parameters:

Name Type Description Default
n int or None

Number of observations. Default 5000.

None
seed int or None

Random seed.

None

Yields:

Name Type Description
x dict

Sparse feature dictionary (100 non-zero features out of 3000).

treatment int

Binary treatment indicator.

outcome float

Continuous outcome.

true_cate float

True individual CATE.

Notes

This is a synthetic proxy. For the original dataset processing code, see Johansson et al. (2016).

Source code in onlinecml/datasets/real_world.py
def load_news(n: int | None = None, seed: int | None = None) -> Iterator:
    """Load a synthetic high-dimensional approximation of the News dataset.

    Because the original News dataset requires proprietary preprocessing,
    this loader generates a synthetic version with matching statistical
    properties: ~3000 features (sparse), binary treatment, continuous
    outcome, and heterogeneous treatment effects.

    Parameters
    ----------
    n : int or None
        Number of observations. Default 5000.
    seed : int or None
        Random seed.

    Yields
    ------
    x : dict
        Sparse feature dictionary (100 non-zero features out of 3000).
    treatment : int
        Binary treatment indicator.
    outcome : float
        Continuous outcome.
    true_cate : float
        True individual CATE.

    Notes
    -----
    This is a synthetic proxy. For the original dataset processing code,
    see Johansson et al. (2016).
    """
    import math
    import numpy as np

    n = n or 5000
    rng = np.random.default_rng(seed)
    n_features = 3000
    n_nonzero = 100

    for _ in range(n):
        # Sparse feature vector: 100 non-zero out of 3000
        idx = rng.choice(n_features, size=n_nonzero, replace=False)
        vals = rng.exponential(1.0, size=n_nonzero)
        x = {f"f{i}": float(v) for i, v in zip(idx, vals)}

        # Confounded treatment
        score = float(sum(vals[:10])) - 5.0
        p = 1.0 / (1.0 + math.exp(-0.3 * score))
        treatment = int(rng.binomial(1, p))

        # Heterogeneous CATE: depends on first non-zero feature
        tau = float(vals[0]) * 0.5
        outcome = score * 0.1 + treatment * tau + rng.normal(0, 0.5)
        yield x, treatment, float(outcome), tau

load_twins(shuffle=False, seed=None)

Load the Twin births dataset.

Each observation is a pair of twins. The treatment is being the heavier twin (weight ≥ 2000g). The outcome is 1-year mortality. True individual treatment effects are not available.

Parameters:

Name Type Description Default
shuffle bool

If True, shuffle rows before iterating.

False
seed int or None

Random seed for shuffling.

None

Yields:

Name Type Description
x dict

30 covariates (birth characteristics, demographics).

treatment int

1 = heavier twin, 0 = lighter twin.

outcome float

1-year mortality (0 = alive, 1 = deceased).

true_cate None

Not available for real-world twin data.

References

Louizos, C. et al. (2017). Causal effect inference with deep latent-variable models. NeurIPS 2017.

Source code in onlinecml/datasets/real_world.py
def load_twins(shuffle: bool = False, seed: int | None = None) -> Iterator:
    """Load the Twin births dataset.

    Each observation is a pair of twins. The treatment is being the
    heavier twin (weight ≥ 2000g). The outcome is 1-year mortality.
    True individual treatment effects are not available.

    Parameters
    ----------
    shuffle : bool
        If True, shuffle rows before iterating.
    seed : int or None
        Random seed for shuffling.

    Yields
    ------
    x : dict
        30 covariates (birth characteristics, demographics).
    treatment : int
        1 = heavier twin, 0 = lighter twin.
    outcome : float
        1-year mortality (0 = alive, 1 = deceased).
    true_cate : None
        Not available for real-world twin data.

    References
    ----------
    Louizos, C. et al. (2017). Causal effect inference with deep
    latent-variable models. NeurIPS 2017.
    """
    cache = _get_cache_path("twins.csv")

    if not cache.exists():
        try:
            _download(_TWINS_URL, cache)
        except Exception as e:
            raise RuntimeError(
                f"Failed to download Twins dataset: {e}\n"
                f"Try manually placing the file at {cache}"
            ) from e

    rows = []
    with open(cache, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            rows.append(row)

    if shuffle:
        import random
        rng = random.Random(seed)
        rng.shuffle(rows)

    for row in rows:
        try:
            treatment = int(float(row.get("t", row.get("treat", 0))))
            outcome = float(row.get("y", row.get("mort", 0)))
            x = {k: float(v) for k, v in row.items() if k not in ("t", "treat", "y", "mort")}
            yield x, treatment, outcome, None
        except (ValueError, KeyError):
            continue