Skip to content

Meta-Learners

onlinecml.metalearners.s_learner.OnlineSLearner

Bases: BaseOnlineEstimator

Online S-Learner for CATE estimation via a single augmented model.

Trains a single outcome model on features augmented with the treatment indicator. CATE is estimated as the difference in predictions when the treatment indicator is set to 1 vs 0, holding covariates fixed.

Parameters:

Name Type Description Default
model Regressor or None

A River regressor. Must support learn_one(x, y) and predict_one(x). Defaults to LinearRegression().

None
treatment_feature str

Name of the synthetic treatment feature added to x. Default '__treatment__'. Change if this key conflicts with existing feature names.

'__treatment__'
Notes

Predict-then-learn: At each step, CATE is predicted before the model is updated. This avoids look-ahead bias in the running ATE.

Limitation: Because treatment is just another feature, the S-Learner can under-regularize the treatment effect and produce biased CATE estimates when treatment is rare or the model is mis-specified. For better CATE estimates, prefer OnlineTLearner or OnlineRLearner.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> from river.linear_model import LinearRegression
>>> model = OnlineSLearner(LinearRegression())
>>> for x, w, y, _ in LinearCausalStream(n=200, seed=42):
...     model.learn_one(x, w, y)
>>> isinstance(model.predict_ate(), float)
True
Source code in onlinecml/metalearners/s_learner.py
class OnlineSLearner(BaseOnlineEstimator):
    """Online S-Learner for CATE estimation via a single augmented model.

    Trains a single outcome model on features augmented with the treatment
    indicator. CATE is estimated as the difference in predictions when the
    treatment indicator is set to 1 vs 0, holding covariates fixed.

    Parameters
    ----------
    model : river.base.Regressor or None
        A River regressor. Must support ``learn_one(x, y)`` and
        ``predict_one(x)``. Defaults to ``LinearRegression()``.
    treatment_feature : str
        Name of the synthetic treatment feature added to ``x``.
        Default ``'__treatment__'``. Change if this key conflicts with
        existing feature names.

    Notes
    -----
    **Predict-then-learn:** At each step, CATE is predicted *before* the
    model is updated. This avoids look-ahead bias in the running ATE.

    **Limitation:** Because treatment is just another feature, the
    S-Learner can under-regularize the treatment effect and produce
    biased CATE estimates when treatment is rare or the model is
    mis-specified. For better CATE estimates, prefer ``OnlineTLearner``
    or ``OnlineRLearner``.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> from river.linear_model import LinearRegression
    >>> model = OnlineSLearner(LinearRegression())
    >>> for x, w, y, _ in LinearCausalStream(n=200, seed=42):
    ...     model.learn_one(x, w, y)
    >>> isinstance(model.predict_ate(), float)
    True
    """

    def __init__(
        self,
        model=None,
        treatment_feature: str = "__treatment__",
    ) -> None:
        self.model = model if model is not None else LinearRegression()
        self.treatment_feature = treatment_feature
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update the outcome model.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            Not used by S-Learner; included for API compatibility.
        """
        # Step 1: predict CATE before updating the model (predict-then-learn)
        x_treated = {**x, self.treatment_feature: 1}
        x_control = {**x, self.treatment_feature: 0}
        cate = self.model.predict_one(x_treated) - self.model.predict_one(x_control)

        # Step 2: update running ATE
        self._ate_stats.update(cate)
        self._n_seen += 1

        # Step 3: update outcome model with the actual treatment assignment
        x_augmented = {**x, self.treatment_feature: treatment}
        self.model.learn_one(x_augmented, outcome)

    def predict_one(self, x: dict) -> float:
        """Predict the CATE for a single unit.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE: ``model(x, W=1) - model(x, W=0)``.
        """
        x_treated = {**x, self.treatment_feature: 1}
        x_control = {**x, self.treatment_feature: 0}
        return self.model.predict_one(x_treated) - self.model.predict_one(x_control)

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update the outcome model.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

Not used by S-Learner; included for API compatibility.

None
Source code in onlinecml/metalearners/s_learner.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update the outcome model.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        Not used by S-Learner; included for API compatibility.
    """
    # Step 1: predict CATE before updating the model (predict-then-learn)
    x_treated = {**x, self.treatment_feature: 1}
    x_control = {**x, self.treatment_feature: 0}
    cate = self.model.predict_one(x_treated) - self.model.predict_one(x_control)

    # Step 2: update running ATE
    self._ate_stats.update(cate)
    self._n_seen += 1

    # Step 3: update outcome model with the actual treatment assignment
    x_augmented = {**x, self.treatment_feature: treatment}
    self.model.learn_one(x_augmented, outcome)

predict_one(x)

Predict the CATE for a single unit.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE: model(x, W=1) - model(x, W=0).

Source code in onlinecml/metalearners/s_learner.py
def predict_one(self, x: dict) -> float:
    """Predict the CATE for a single unit.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE: ``model(x, W=1) - model(x, W=0)``.
    """
    x_treated = {**x, self.treatment_feature: 1}
    x_control = {**x, self.treatment_feature: 0}
    return self.model.predict_one(x_treated) - self.model.predict_one(x_control)

onlinecml.metalearners.t_learner.OnlineTLearner

Bases: BaseOnlineEstimator

Online T-Learner for CATE estimation via two separate outcome models.

Trains one model on treated units and one on control units. CATE is estimated as the difference in predictions from the two models.

Parameters:

Name Type Description Default
treated_model Regressor or None

Model for treated units E[Y | X, W=1]. Defaults to LinearRegression().

None
control_model Regressor or None

Model for control units E[Y | X, W=0]. Defaults to LinearRegression(). Must be a different object from treated_model.

None
Notes

Predict-then-learn: CATE is predicted from both models before either model is updated. This avoids look-ahead bias.

IPW correction: If propensity is passed to learn_one, the sample weight 1/p (treated) or 1/(1-p) (control) is passed as w to the model's learn_one call. This corrects for treatment imbalance. If the River model does not support sample weights, the weight is silently ignored.

Single-arm cold start: If only one treatment arm has been seen, the other model returns 0 (River default for untrained regression). A UserWarning is emitted on predict_one when either model has seen no data.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> from river.linear_model import LinearRegression
>>> model = OnlineTLearner(LinearRegression(), LinearRegression())
>>> for x, w, y, _ in LinearCausalStream(n=200, seed=42):
...     model.learn_one(x, w, y)
>>> isinstance(model.predict_one({"x0": 0.5}), float)
True
Source code in onlinecml/metalearners/t_learner.py
class OnlineTLearner(BaseOnlineEstimator):
    """Online T-Learner for CATE estimation via two separate outcome models.

    Trains one model on treated units and one on control units. CATE is
    estimated as the difference in predictions from the two models.

    Parameters
    ----------
    treated_model : river.base.Regressor or None
        Model for treated units ``E[Y | X, W=1]``. Defaults to
        ``LinearRegression()``.
    control_model : river.base.Regressor or None
        Model for control units ``E[Y | X, W=0]``. Defaults to
        ``LinearRegression()``. Must be a different object from
        ``treated_model``.

    Notes
    -----
    **Predict-then-learn:** CATE is predicted from both models *before*
    either model is updated. This avoids look-ahead bias.

    **IPW correction:** If ``propensity`` is passed to ``learn_one``,
    the sample weight ``1/p`` (treated) or ``1/(1-p)`` (control) is
    passed as ``w`` to the model's ``learn_one`` call. This corrects
    for treatment imbalance. If the River model does not support sample
    weights, the weight is silently ignored.

    **Single-arm cold start:** If only one treatment arm has been seen,
    the other model returns 0 (River default for untrained regression).
    A ``UserWarning`` is emitted on ``predict_one`` when either model
    has seen no data.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> from river.linear_model import LinearRegression
    >>> model = OnlineTLearner(LinearRegression(), LinearRegression())
    >>> for x, w, y, _ in LinearCausalStream(n=200, seed=42):
    ...     model.learn_one(x, w, y)
    >>> isinstance(model.predict_one({"x0": 0.5}), float)
    True
    """

    def __init__(
        self,
        treated_model=None,
        control_model=None,
    ) -> None:
        self.treated_model = treated_model if treated_model is not None else LinearRegression()
        self.control_model = control_model if control_model is not None else LinearRegression()
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()
        self._n_treated: int = 0
        self._n_control: int = 0
        self._warned_treated: bool = False
        self._warned_control: bool = False

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update the appropriate arm model.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            If provided, computes IPW weight ``1/p`` (treated) or
            ``1/(1-p)`` (control) and passes it as sample weight ``w``
            to the model's ``learn_one`` call.
        """
        # Step 1: predict CATE before updating either model (predict-then-learn)
        cate = self.treated_model.predict_one(x) - self.control_model.predict_one(x)

        # Step 2: update running ATE
        self._ate_stats.update(cate)
        self._n_seen += 1

        # Step 3: compute sample weight from propensity if provided
        if propensity is not None:
            p = max(1e-6, min(1.0 - 1e-6, propensity))
            w = 1.0 / p if treatment == 1 else 1.0 / (1.0 - p)
        else:
            w = 1.0

        # Step 4: update the appropriate arm model
        if treatment == 1:
            self._n_treated += 1
            try:
                self.treated_model.learn_one(x, outcome, w=w)
            except TypeError:
                self.treated_model.learn_one(x, outcome)
        else:
            self._n_control += 1
            try:
                self.control_model.learn_one(x, outcome, w=w)
            except TypeError:
                self.control_model.learn_one(x, outcome)

    def predict_one(self, x: dict) -> float:
        """Predict the CATE for a single unit.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE: ``treated_model(x) - control_model(x)``.

        Warns
        -----
        UserWarning
            If either arm model has not seen any data yet, the prediction
            from that model defaults to 0 (River's untrained regressor
            default), which may bias the CATE estimate.
        """
        if self._n_treated == 0 and not self._warned_treated:
            warnings.warn(
                "treated_model has not seen any data yet; CATE estimate may be biased.",
                UserWarning,
                stacklevel=2,
            )
            self._warned_treated = True
        if self._n_control == 0 and not self._warned_control:
            warnings.warn(
                "control_model has not seen any data yet; CATE estimate may be biased.",
                UserWarning,
                stacklevel=2,
            )
            self._warned_control = True
        return self.treated_model.predict_one(x) - self.control_model.predict_one(x)

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update the appropriate arm model.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

If provided, computes IPW weight 1/p (treated) or 1/(1-p) (control) and passes it as sample weight w to the model's learn_one call.

None
Source code in onlinecml/metalearners/t_learner.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update the appropriate arm model.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        If provided, computes IPW weight ``1/p`` (treated) or
        ``1/(1-p)`` (control) and passes it as sample weight ``w``
        to the model's ``learn_one`` call.
    """
    # Step 1: predict CATE before updating either model (predict-then-learn)
    cate = self.treated_model.predict_one(x) - self.control_model.predict_one(x)

    # Step 2: update running ATE
    self._ate_stats.update(cate)
    self._n_seen += 1

    # Step 3: compute sample weight from propensity if provided
    if propensity is not None:
        p = max(1e-6, min(1.0 - 1e-6, propensity))
        w = 1.0 / p if treatment == 1 else 1.0 / (1.0 - p)
    else:
        w = 1.0

    # Step 4: update the appropriate arm model
    if treatment == 1:
        self._n_treated += 1
        try:
            self.treated_model.learn_one(x, outcome, w=w)
        except TypeError:
            self.treated_model.learn_one(x, outcome)
    else:
        self._n_control += 1
        try:
            self.control_model.learn_one(x, outcome, w=w)
        except TypeError:
            self.control_model.learn_one(x, outcome)

predict_one(x)

Predict the CATE for a single unit.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE: treated_model(x) - control_model(x).

Warns:

Type Description
UserWarning

If either arm model has not seen any data yet, the prediction from that model defaults to 0 (River's untrained regressor default), which may bias the CATE estimate.

Source code in onlinecml/metalearners/t_learner.py
def predict_one(self, x: dict) -> float:
    """Predict the CATE for a single unit.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE: ``treated_model(x) - control_model(x)``.

    Warns
    -----
    UserWarning
        If either arm model has not seen any data yet, the prediction
        from that model defaults to 0 (River's untrained regressor
        default), which may bias the CATE estimate.
    """
    if self._n_treated == 0 and not self._warned_treated:
        warnings.warn(
            "treated_model has not seen any data yet; CATE estimate may be biased.",
            UserWarning,
            stacklevel=2,
        )
        self._warned_treated = True
    if self._n_control == 0 and not self._warned_control:
        warnings.warn(
            "control_model has not seen any data yet; CATE estimate may be biased.",
            UserWarning,
            stacklevel=2,
        )
        self._warned_control = True
    return self.treated_model.predict_one(x) - self.control_model.predict_one(x)

onlinecml.metalearners.x_learner.OnlineXLearner

Bases: BaseOnlineEstimator

Online X-Learner for CATE estimation in unbalanced treatment groups.

Implements a three-stage pipeline adapted for the online setting:

  1. Stage 1 (T-Learner base): Train two outcome models mu1(x) = E[Y|X, W=1] and mu0(x) = E[Y|X, W=0].

  2. Stage 2 (Imputed effects): For each treated unit, impute a control potential outcome: D1 = Y - mu0(X). For each control unit: D0 = mu1(X) - Y.

  3. Stage 3 (CATE models): Train two CATE models — tau1 on treated units' imputed effects and tau0 on control units'. The final CATE is a propensity-weighted combination: CATE(x) = p(x) * tau0(x) + (1-p(x)) * tau1(x).

Parameters:

Name Type Description Default
mu1_model Regressor or None

Outcome model for treated units. Defaults to LinearRegression().

None
mu0_model Regressor or None

Outcome model for control units. Defaults to LinearRegression().

None
tau1_model Regressor or None

CATE model trained on treated units' imputed effects. Defaults to LinearRegression().

None
tau0_model Regressor or None

CATE model trained on control units' imputed effects. Defaults to LinearRegression().

None
ps_model OnlinePropensityScore or None

Propensity score model for the weighted combination. Defaults to OnlinePropensityScore(LogisticRegression()).

None
Notes

Predict-first-then-learn: All five models predict before any are updated. This approximates the cross-fitting required for stage 2.

The X-Learner is most effective when treatment groups are substantially unbalanced — it leverages information from the larger group to improve CATE estimates for the smaller group.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> model = OnlineXLearner()
>>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
...     model.learn_one(x, w, y)
>>> isinstance(model.predict_one({"x0": 0.5}), float)
True
Source code in onlinecml/metalearners/x_learner.py
class OnlineXLearner(BaseOnlineEstimator):
    """Online X-Learner for CATE estimation in unbalanced treatment groups.

    Implements a three-stage pipeline adapted for the online setting:

    1. **Stage 1 (T-Learner base):** Train two outcome models
       ``mu1(x) = E[Y|X, W=1]`` and ``mu0(x) = E[Y|X, W=0]``.

    2. **Stage 2 (Imputed effects):** For each treated unit, impute a
       control potential outcome: ``D1 = Y - mu0(X)``. For each control
       unit: ``D0 = mu1(X) - Y``.

    3. **Stage 3 (CATE models):** Train two CATE models — ``tau1`` on
       treated units' imputed effects and ``tau0`` on control units'. The
       final CATE is a propensity-weighted combination:
       ``CATE(x) = p(x) * tau0(x) + (1-p(x)) * tau1(x)``.

    Parameters
    ----------
    mu1_model : river.base.Regressor or None
        Outcome model for treated units. Defaults to ``LinearRegression()``.
    mu0_model : river.base.Regressor or None
        Outcome model for control units. Defaults to ``LinearRegression()``.
    tau1_model : river.base.Regressor or None
        CATE model trained on treated units' imputed effects.
        Defaults to ``LinearRegression()``.
    tau0_model : river.base.Regressor or None
        CATE model trained on control units' imputed effects.
        Defaults to ``LinearRegression()``.
    ps_model : OnlinePropensityScore or None
        Propensity score model for the weighted combination.
        Defaults to ``OnlinePropensityScore(LogisticRegression())``.

    Notes
    -----
    **Predict-first-then-learn:** All five models predict before any are
    updated. This approximates the cross-fitting required for stage 2.

    The X-Learner is most effective when treatment groups are substantially
    unbalanced — it leverages information from the larger group to improve
    CATE estimates for the smaller group.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> model = OnlineXLearner()
    >>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
    ...     model.learn_one(x, w, y)
    >>> isinstance(model.predict_one({"x0": 0.5}), float)
    True
    """

    def __init__(
        self,
        mu1_model=None,
        mu0_model=None,
        tau1_model=None,
        tau0_model=None,
        ps_model: OnlinePropensityScore | None = None,
    ) -> None:
        self.mu1_model = mu1_model if mu1_model is not None else LinearRegression()
        self.mu0_model = mu0_model if mu0_model is not None else LinearRegression()
        self.tau1_model = tau1_model if tau1_model is not None else LinearRegression()
        self.tau0_model = tau0_model if tau0_model is not None else LinearRegression()
        self.ps_model = ps_model if ps_model is not None else OnlinePropensityScore(
            LogisticRegression()
        )
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update all internal models.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            If provided, used for the weighted combination instead of
            the internal propensity model.
        """
        # Stage 1: predict all models before any updates (predict-first)
        mu1 = self.mu1_model.predict_one(x)
        mu0 = self.mu0_model.predict_one(x)

        # Stage 2: compute imputed individual treatment effect
        if treatment == 1:
            d = outcome - mu0   # treated: observed - counterfactual control
        else:
            d = mu1 - outcome   # control: counterfactual treated - observed

        # Stage 3: predict CATE models (before updating them)
        tau1 = self.tau1_model.predict_one(x)
        tau0 = self.tau0_model.predict_one(x)

        # Propensity for weighted combination
        if propensity is not None:
            p = max(1e-6, min(1.0 - 1e-6, propensity))
        else:
            p = self.ps_model.predict_one(x)

        # Weighted combination: p * tau0 + (1-p) * tau1
        cate = p * tau0 + (1.0 - p) * tau1
        self._ate_stats.update(cate)
        self._n_seen += 1

        # Update all models after pseudo-outcome computation
        if treatment == 1:
            self.mu1_model.learn_one(x, outcome)
            self.tau1_model.learn_one(x, d)
        else:
            self.mu0_model.learn_one(x, outcome)
            self.tau0_model.learn_one(x, d)

        if propensity is None:
            self.ps_model.learn_one(x, treatment)

    def predict_one(self, x: dict) -> float:
        """Predict the CATE for a single unit.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE: propensity-weighted combination of
            ``tau0(x)`` and ``tau1(x)``.
        """
        p = self.ps_model.predict_one(x)
        tau1 = self.tau1_model.predict_one(x)
        tau0 = self.tau0_model.predict_one(x)
        return p * tau0 + (1.0 - p) * tau1

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update all internal models.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

If provided, used for the weighted combination instead of the internal propensity model.

None
Source code in onlinecml/metalearners/x_learner.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update all internal models.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        If provided, used for the weighted combination instead of
        the internal propensity model.
    """
    # Stage 1: predict all models before any updates (predict-first)
    mu1 = self.mu1_model.predict_one(x)
    mu0 = self.mu0_model.predict_one(x)

    # Stage 2: compute imputed individual treatment effect
    if treatment == 1:
        d = outcome - mu0   # treated: observed - counterfactual control
    else:
        d = mu1 - outcome   # control: counterfactual treated - observed

    # Stage 3: predict CATE models (before updating them)
    tau1 = self.tau1_model.predict_one(x)
    tau0 = self.tau0_model.predict_one(x)

    # Propensity for weighted combination
    if propensity is not None:
        p = max(1e-6, min(1.0 - 1e-6, propensity))
    else:
        p = self.ps_model.predict_one(x)

    # Weighted combination: p * tau0 + (1-p) * tau1
    cate = p * tau0 + (1.0 - p) * tau1
    self._ate_stats.update(cate)
    self._n_seen += 1

    # Update all models after pseudo-outcome computation
    if treatment == 1:
        self.mu1_model.learn_one(x, outcome)
        self.tau1_model.learn_one(x, d)
    else:
        self.mu0_model.learn_one(x, outcome)
        self.tau0_model.learn_one(x, d)

    if propensity is None:
        self.ps_model.learn_one(x, treatment)

predict_one(x)

Predict the CATE for a single unit.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE: propensity-weighted combination of tau0(x) and tau1(x).

Source code in onlinecml/metalearners/x_learner.py
def predict_one(self, x: dict) -> float:
    """Predict the CATE for a single unit.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE: propensity-weighted combination of
        ``tau0(x)`` and ``tau1(x)``.
    """
    p = self.ps_model.predict_one(x)
    tau1 = self.tau1_model.predict_one(x)
    tau0 = self.tau0_model.predict_one(x)
    return p * tau0 + (1.0 - p) * tau1

onlinecml.metalearners.r_learner.OnlineRLearner

Bases: BaseOnlineEstimator

Online R-Learner for CATE via the Robinson (1988) transformation.

Estimates CATE by orthogonalizing the treatment assignment and outcome with respect to their conditional means. The residualized targets are:

.. math::

\tilde{W}_i = W_i - \hat{p}(X_i)

\tilde{Y}_i = Y_i - \hat{m}(X_i)

The CATE model is then trained on the pseudo-outcome tilde_Y / tilde_W weighted by tilde_W^2:

.. math::

\hat{\tau}(x) = \arg\min_\tau
\mathbb{E}[(\tilde{Y}_i - \tau(X_i) \tilde{W}_i)^2]

This approach is the theoretical foundation of Double Machine Learning (DML) and produces nearly oracle-rate CATE estimates when both nuisance models are consistent.

Parameters:

Name Type Description Default
ps_model OnlinePropensityScore or None

Propensity score model P(W=1|X). Defaults to OnlinePropensityScore(LogisticRegression()).

None
outcome_model Regressor or None

Outcome model E[Y|X]. Defaults to LinearRegression().

None
cate_model Regressor or None

CATE model trained on Robinson-residualized targets. Defaults to LinearRegression().

None
Notes

Predict-first-then-learn: All three models predict before any are updated. This is the natural online approximation to cross-fitting.

Residual weighting: The CATE model is updated with sample weight w = tilde_W^2 when the River model supports sample weights. When |tilde_W| is very small (near 0.05), the update is skipped to avoid noisy updates from nearly-deterministic treatment assignments.

Connection to DML: The R-Learner is equivalent to Partially Linear DML (Robinson 1988, Chernozhukov et al. 2018) when the CATE model is linear.

References

Robinson, P.M. (1988). Root-N-consistent semiparametric regression. Econometrica, 56(4), 931-954.

Nie, X. and Wager, S. (2021). Quasi-oracle estimation of heterogeneous treatment effects. Biometrika, 108(2), 299-319.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> from river.linear_model import LinearRegression, LogisticRegression
>>> model = OnlineRLearner(
...     ps_model=None,
...     outcome_model=LinearRegression(),
...     cate_model=LinearRegression(),
... )
>>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
...     model.learn_one(x, w, y)
>>> isinstance(model.predict_ate(), float)
True
Source code in onlinecml/metalearners/r_learner.py
class OnlineRLearner(BaseOnlineEstimator):
    """Online R-Learner for CATE via the Robinson (1988) transformation.

    Estimates CATE by orthogonalizing the treatment assignment and outcome
    with respect to their conditional means. The residualized targets are:

    .. math::

        \\tilde{W}_i = W_i - \\hat{p}(X_i)

        \\tilde{Y}_i = Y_i - \\hat{m}(X_i)

    The CATE model is then trained on the pseudo-outcome
    ``tilde_Y / tilde_W`` weighted by ``tilde_W^2``:

    .. math::

        \\hat{\\tau}(x) = \\arg\\min_\\tau
        \\mathbb{E}[(\\tilde{Y}_i - \\tau(X_i) \\tilde{W}_i)^2]

    This approach is the theoretical foundation of Double Machine Learning
    (DML) and produces nearly oracle-rate CATE estimates when both nuisance
    models are consistent.

    Parameters
    ----------
    ps_model : OnlinePropensityScore or None
        Propensity score model ``P(W=1|X)``.
        Defaults to ``OnlinePropensityScore(LogisticRegression())``.
    outcome_model : river.base.Regressor or None
        Outcome model ``E[Y|X]``. Defaults to ``LinearRegression()``.
    cate_model : river.base.Regressor or None
        CATE model trained on Robinson-residualized targets.
        Defaults to ``LinearRegression()``.

    Notes
    -----
    **Predict-first-then-learn:** All three models predict before any are
    updated. This is the natural online approximation to cross-fitting.

    **Residual weighting:** The CATE model is updated with sample weight
    ``w = tilde_W^2`` when the River model supports sample weights. When
    ``|tilde_W|`` is very small (near 0.05), the update is skipped to
    avoid noisy updates from nearly-deterministic treatment assignments.

    **Connection to DML:** The R-Learner is equivalent to Partially Linear
    DML (Robinson 1988, Chernozhukov et al. 2018) when the CATE model is
    linear.

    References
    ----------
    Robinson, P.M. (1988). Root-N-consistent semiparametric regression.
    Econometrica, 56(4), 931-954.

    Nie, X. and Wager, S. (2021). Quasi-oracle estimation of heterogeneous
    treatment effects. Biometrika, 108(2), 299-319.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> from river.linear_model import LinearRegression, LogisticRegression
    >>> model = OnlineRLearner(
    ...     ps_model=None,
    ...     outcome_model=LinearRegression(),
    ...     cate_model=LinearRegression(),
    ... )
    >>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
    ...     model.learn_one(x, w, y)
    >>> isinstance(model.predict_ate(), float)
    True
    """

    def __init__(
        self,
        ps_model: OnlinePropensityScore | None = None,
        outcome_model=None,
        cate_model=None,
        min_residual: float = 0.05,
    ) -> None:
        self.ps_model = ps_model if ps_model is not None else OnlinePropensityScore(
            LogisticRegression()
        )
        self.outcome_model = outcome_model if outcome_model is not None else LinearRegression()
        self.cate_model = cate_model if cate_model is not None else LinearRegression()
        self.min_residual = min_residual
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation using the Robinson transformation.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            If provided, uses this logged propensity instead of the
            internal model.
        """
        # Predict all nuisance models before any updates
        if propensity is not None:
            p_hat = max(1e-6, min(1.0 - 1e-6, propensity))
        else:
            p_hat = self.ps_model.predict_one(x)

        m_hat = self.outcome_model.predict_one(x)
        cate_hat = self.cate_model.predict_one(x)

        # Robinson residuals
        w_res = treatment - p_hat      # W_tilde
        y_res = outcome - m_hat        # Y_tilde

        # Update running ATE from current CATE estimate
        self._ate_stats.update(cate_hat)
        self._n_seen += 1

        # Update CATE model with Robinson pseudo-outcome
        # weight = w_res^2 (residual-weighted regression)
        if abs(w_res) >= self.min_residual:
            # Pseudo-outcome: y_res / w_res (Robinson's rearrangement)
            pseudo_outcome = y_res / w_res
            w_sample = w_res ** 2
            try:
                self.cate_model.learn_one(x, pseudo_outcome, w=w_sample)
            except TypeError:
                self.cate_model.learn_one(x, pseudo_outcome)

        # Update nuisance models
        if propensity is None:
            self.ps_model.learn_one(x, treatment)
        self.outcome_model.learn_one(x, outcome)

    def predict_one(self, x: dict) -> float:
        """Predict the CATE for a single unit.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE from the Robinson-residualized model.
        """
        return self.cate_model.predict_one(x)

learn_one(x, treatment, outcome, propensity=None)

Process one observation using the Robinson transformation.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

If provided, uses this logged propensity instead of the internal model.

None
Source code in onlinecml/metalearners/r_learner.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation using the Robinson transformation.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        If provided, uses this logged propensity instead of the
        internal model.
    """
    # Predict all nuisance models before any updates
    if propensity is not None:
        p_hat = max(1e-6, min(1.0 - 1e-6, propensity))
    else:
        p_hat = self.ps_model.predict_one(x)

    m_hat = self.outcome_model.predict_one(x)
    cate_hat = self.cate_model.predict_one(x)

    # Robinson residuals
    w_res = treatment - p_hat      # W_tilde
    y_res = outcome - m_hat        # Y_tilde

    # Update running ATE from current CATE estimate
    self._ate_stats.update(cate_hat)
    self._n_seen += 1

    # Update CATE model with Robinson pseudo-outcome
    # weight = w_res^2 (residual-weighted regression)
    if abs(w_res) >= self.min_residual:
        # Pseudo-outcome: y_res / w_res (Robinson's rearrangement)
        pseudo_outcome = y_res / w_res
        w_sample = w_res ** 2
        try:
            self.cate_model.learn_one(x, pseudo_outcome, w=w_sample)
        except TypeError:
            self.cate_model.learn_one(x, pseudo_outcome)

    # Update nuisance models
    if propensity is None:
        self.ps_model.learn_one(x, treatment)
    self.outcome_model.learn_one(x, outcome)

predict_one(x)

Predict the CATE for a single unit.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE from the Robinson-residualized model.

Source code in onlinecml/metalearners/r_learner.py
def predict_one(self, x: dict) -> float:
    """Predict the CATE for a single unit.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE from the Robinson-residualized model.
    """
    return self.cate_model.predict_one(x)