Skip to content

Reweighting

onlinecml.reweighting.ipw.OnlineIPW

Bases: BaseOnlineEstimator

Online Inverse Probability Weighting estimator for the ATE.

Estimates the Average Treatment Effect via importance-weighted pseudo-outcomes, updated one observation at a time. The propensity score model is updated after each pseudo-outcome computation (predict-then-learn protocol).

Parameters:

Name Type Description Default
ps_model OnlinePropensityScore or None

Propensity score model. If None, defaults to OnlinePropensityScore(LogisticRegression()).

None
clip_min float

Lower clip bound for propensity scores. Default 0.01.

0.01
clip_max float

Upper clip bound for propensity scores. Default 0.99.

0.99
normalize bool

If True, use normalized (stabilized) IPW weights by dividing by the running mean weight within each arm. Default False.

False
Notes

The IPW pseudo-outcome for observation i is:

.. math::

\psi_i = \frac{W_i Y_i}{\hat{p}_i} - \frac{(1-W_i) Y_i}{1 - \hat{p}_i}

The running mean of psi converges to the ATE under unconfoundedness, overlap, and SUTVA.

Predict-then-learn: the propensity score is predicted before the classifier is updated on the current observation. This avoids look-ahead bias in the pseudo-outcome.

Limitation: predict_one(x) returns the current running ATE estimate for any input x. IPW does not produce individual CATE estimates. Use OnlineAIPW or meta-learners for individual CATE.

Cold start: Before any training, propensity is 0.5, giving IPW weight = 2.0 regardless of treatment. Use warmup to skip ATE accumulation until the PS model has seen enough data.

Drift adaptation: The default cumulative mean cannot forget old observations. Use forgetting_factor < 1.0 (e.g. 0.97) to switch to an EWMA that down-weights old pseudo-outcomes automatically.

Parameters:

Name Type Description Default
warmup int

Number of initial observations to skip when accumulating the ATE estimate. The PS model still trains during warmup. Default 0.

0
forgetting_factor float

Controls how quickly old pseudo-outcomes are forgotten. 1.0 = cumulative mean (no forgetting, default). Values < 1.0 (e.g. 0.95–0.99) switch to EWMA so the estimator adapts to concept drift. alpha = 1 - forgetting_factor.

1.0

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> estimator = OnlineIPW()
>>> for x, w, y, _ in LinearCausalStream(n=500, true_ate=2.0, seed=42):
...     estimator.learn_one(x, w, y)
>>> abs(estimator.predict_ate()) < 5.0  # loose bound for short stream
True
Source code in onlinecml/reweighting/ipw.py
class OnlineIPW(BaseOnlineEstimator):
    """Online Inverse Probability Weighting estimator for the ATE.

    Estimates the Average Treatment Effect via importance-weighted
    pseudo-outcomes, updated one observation at a time. The propensity
    score model is updated after each pseudo-outcome computation
    (predict-then-learn protocol).

    Parameters
    ----------
    ps_model : OnlinePropensityScore or None
        Propensity score model. If None, defaults to
        ``OnlinePropensityScore(LogisticRegression())``.
    clip_min : float
        Lower clip bound for propensity scores. Default 0.01.
    clip_max : float
        Upper clip bound for propensity scores. Default 0.99.
    normalize : bool
        If True, use normalized (stabilized) IPW weights by dividing
        by the running mean weight within each arm. Default False.

    Notes
    -----
    The IPW pseudo-outcome for observation ``i`` is:

    .. math::

        \\psi_i = \\frac{W_i Y_i}{\\hat{p}_i} - \\frac{(1-W_i) Y_i}{1 - \\hat{p}_i}

    The running mean of ``psi`` converges to the ATE under unconfoundedness,
    overlap, and SUTVA.

    **Predict-then-learn:** the propensity score is predicted *before*
    the classifier is updated on the current observation. This avoids
    look-ahead bias in the pseudo-outcome.

    **Limitation:** ``predict_one(x)`` returns the current running ATE
    estimate for any input ``x``. IPW does not produce individual CATE
    estimates. Use ``OnlineAIPW`` or meta-learners for individual CATE.

    **Cold start:** Before any training, propensity is 0.5, giving
    IPW weight = 2.0 regardless of treatment. Use ``warmup`` to skip
    ATE accumulation until the PS model has seen enough data.

    **Drift adaptation:** The default cumulative mean cannot forget old
    observations. Use ``forgetting_factor < 1.0`` (e.g. 0.97) to switch
    to an EWMA that down-weights old pseudo-outcomes automatically.

    Parameters
    ----------
    warmup : int
        Number of initial observations to skip when accumulating the ATE
        estimate. The PS model still trains during warmup. Default 0.
    forgetting_factor : float
        Controls how quickly old pseudo-outcomes are forgotten.
        ``1.0`` = cumulative mean (no forgetting, default).
        Values < 1.0 (e.g. 0.95–0.99) switch to EWMA so the estimator
        adapts to concept drift. ``alpha = 1 - forgetting_factor``.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> estimator = OnlineIPW()
    >>> for x, w, y, _ in LinearCausalStream(n=500, true_ate=2.0, seed=42):
    ...     estimator.learn_one(x, w, y)
    >>> abs(estimator.predict_ate()) < 5.0  # loose bound for short stream
    True
    """

    def __init__(
        self,
        ps_model: OnlinePropensityScore | None = None,
        clip_min: float = 0.01,
        clip_max: float = 0.99,
        normalize: bool = False,
        warmup: int = 0,
        forgetting_factor: float = 1.0,
    ) -> None:
        self.ps_model = ps_model if ps_model is not None else OnlinePropensityScore(
            LogisticRegression(), clip_min=clip_min, clip_max=clip_max
        )
        self.clip_min = clip_min
        self.clip_max = clip_max
        self.normalize = normalize
        self.warmup = warmup
        self.forgetting_factor = forgetting_factor
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats | EWMAStats = (
            EWMAStats(alpha=1.0 - forgetting_factor)
            if forgetting_factor < 1.0
            else RunningStats()
        )
        self._treated_weight_stats: RunningStats = RunningStats()
        self._control_weight_stats: RunningStats = RunningStats()

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update the ATE estimate.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            If provided, uses this logged propensity instead of the
            internal model's prediction. Useful for off-policy evaluation.
        """
        # Step 1: predict propensity BEFORE updating the classifier
        if propensity is not None:
            p = max(self.clip_min, min(self.clip_max, propensity))
        else:
            p = self.ps_model.predict_one(x)

        # Step 2: compute raw IPW weights
        w_treated = 1.0 / p
        w_control = 1.0 / (1.0 - p)

        # Step 3: track weight stats (for normalization and diagnostics)
        self._treated_weight_stats.update(w_treated)
        self._control_weight_stats.update(w_control)

        # Step 4: normalize weights if requested
        if self.normalize:
            mean_wt = self._treated_weight_stats.mean
            mean_wc = self._control_weight_stats.mean
            w_treated = w_treated / mean_wt if mean_wt > 0 else w_treated
            w_control = w_control / mean_wc if mean_wc > 0 else w_control

        # Step 5: IPW pseudo-outcome
        psi = treatment * w_treated * outcome - (1 - treatment) * w_control * outcome

        # Step 6: update ATE tracker (skip during warmup)
        if self._n_seen >= self.warmup:
            self._ate_stats.update(psi)
        self._n_seen += 1

        # Step 7: update propensity model AFTER pseudo-outcome
        if propensity is None:
            self.ps_model.learn_one(x, treatment)

    def predict_one(self, x: dict) -> float:
        """Return the current running ATE estimate.

        Parameters
        ----------
        x : dict
            Feature dictionary (not used; IPW produces no individual CATE).

        Returns
        -------
        float
            Current ATE estimate. IPW does not estimate individual CATE —
            the same value is returned for all inputs.
        """
        return self._ate_stats.mean

    @property
    def weight_stats(self) -> dict:
        """Summary statistics of IPW weights seen so far.

        Returns
        -------
        dict
            Dictionary with keys ``'treated_mean'``, ``'control_mean'``,
            and ``'n'``.
        """
        return {
            "treated_mean": self._treated_weight_stats.mean,
            "control_mean": self._control_weight_stats.mean,
            "n": self._n_seen,
        }

weight_stats property

Summary statistics of IPW weights seen so far.

Returns:

Type Description
dict

Dictionary with keys 'treated_mean', 'control_mean', and 'n'.

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update the ATE estimate.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

If provided, uses this logged propensity instead of the internal model's prediction. Useful for off-policy evaluation.

None
Source code in onlinecml/reweighting/ipw.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update the ATE estimate.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        If provided, uses this logged propensity instead of the
        internal model's prediction. Useful for off-policy evaluation.
    """
    # Step 1: predict propensity BEFORE updating the classifier
    if propensity is not None:
        p = max(self.clip_min, min(self.clip_max, propensity))
    else:
        p = self.ps_model.predict_one(x)

    # Step 2: compute raw IPW weights
    w_treated = 1.0 / p
    w_control = 1.0 / (1.0 - p)

    # Step 3: track weight stats (for normalization and diagnostics)
    self._treated_weight_stats.update(w_treated)
    self._control_weight_stats.update(w_control)

    # Step 4: normalize weights if requested
    if self.normalize:
        mean_wt = self._treated_weight_stats.mean
        mean_wc = self._control_weight_stats.mean
        w_treated = w_treated / mean_wt if mean_wt > 0 else w_treated
        w_control = w_control / mean_wc if mean_wc > 0 else w_control

    # Step 5: IPW pseudo-outcome
    psi = treatment * w_treated * outcome - (1 - treatment) * w_control * outcome

    # Step 6: update ATE tracker (skip during warmup)
    if self._n_seen >= self.warmup:
        self._ate_stats.update(psi)
    self._n_seen += 1

    # Step 7: update propensity model AFTER pseudo-outcome
    if propensity is None:
        self.ps_model.learn_one(x, treatment)

predict_one(x)

Return the current running ATE estimate.

Parameters:

Name Type Description Default
x dict

Feature dictionary (not used; IPW produces no individual CATE).

required

Returns:

Type Description
float

Current ATE estimate. IPW does not estimate individual CATE — the same value is returned for all inputs.

Source code in onlinecml/reweighting/ipw.py
def predict_one(self, x: dict) -> float:
    """Return the current running ATE estimate.

    Parameters
    ----------
    x : dict
        Feature dictionary (not used; IPW produces no individual CATE).

    Returns
    -------
    float
        Current ATE estimate. IPW does not estimate individual CATE —
        the same value is returned for all inputs.
    """
    return self._ate_stats.mean

onlinecml.reweighting.aipw.OnlineAIPW

Bases: BaseOnlineEstimator

Online Doubly Robust (AIPW) estimator for ATE and individual CATE.

Estimates the Average Treatment Effect using the Augmented IPW (doubly robust) estimator. Maintains three online models: a propensity score, a treated outcome model, and a control outcome model. All models are updated using the predict-first-then-learn protocol.

Parameters:

Name Type Description Default
ps_model OnlinePropensityScore or None

Propensity score model. Defaults to OnlinePropensityScore(LogisticRegression()).

None
treated_model Regressor or None

Outcome model for treated units E[Y|X, W=1]. Defaults to LinearRegression().

None
control_model Regressor or None

Outcome model for control units E[Y|X, W=0]. Defaults to LinearRegression().

None
clip_min float

Lower clip bound for propensity scores. Default 0.01.

0.01
clip_max float

Upper clip bound for propensity scores. Default 0.99.

0.99
Notes

The doubly robust pseudo-outcome is:

.. math::

\psi_i = \hat{\mu}_1(X_i) - \hat{\mu}_0(X_i)
         + \frac{W_i (Y_i - \hat{\mu}_1(X_i))}{\hat{p}(X_i)}
         - \frac{(1-W_i)(Y_i - \hat{\mu}_0(X_i))}{1 - \hat{p}(X_i)}

The estimator is consistent if either the propensity score OR both outcome models are correctly specified (double robustness).

Predict-first-then-learn: all three models predict before any of them are updated on the current observation. This approximates cross-fitting in the online setting.

Unlike OnlineIPW, predict_one(x) returns an individual CATE estimate: mu1(x) - mu0(x).

Parameters:

Name Type Description Default
warmup int

Number of initial observations to skip when accumulating the ATE estimate. All models still train during warmup. Default 0.

0
forgetting_factor float

Controls how quickly old pseudo-outcomes are forgotten. 1.0 = cumulative mean (no forgetting, default). Values < 1.0 (e.g. 0.95–0.99) switch to EWMA for drift adaptation.

1.0

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> estimator = OnlineAIPW()
>>> for x, w, y, _ in LinearCausalStream(n=500, true_ate=2.0, seed=42):
...     estimator.learn_one(x, w, y)
>>> abs(estimator.predict_ate()) < 5.0
True
Source code in onlinecml/reweighting/aipw.py
class OnlineAIPW(BaseOnlineEstimator):
    """Online Doubly Robust (AIPW) estimator for ATE and individual CATE.

    Estimates the Average Treatment Effect using the Augmented IPW
    (doubly robust) estimator. Maintains three online models: a propensity
    score, a treated outcome model, and a control outcome model. All models
    are updated using the predict-first-then-learn protocol.

    Parameters
    ----------
    ps_model : OnlinePropensityScore or None
        Propensity score model. Defaults to
        ``OnlinePropensityScore(LogisticRegression())``.
    treated_model : river.base.Regressor or None
        Outcome model for treated units ``E[Y|X, W=1]``. Defaults to
        ``LinearRegression()``.
    control_model : river.base.Regressor or None
        Outcome model for control units ``E[Y|X, W=0]``. Defaults to
        ``LinearRegression()``.
    clip_min : float
        Lower clip bound for propensity scores. Default 0.01.
    clip_max : float
        Upper clip bound for propensity scores. Default 0.99.

    Notes
    -----
    The doubly robust pseudo-outcome is:

    .. math::

        \\psi_i = \\hat{\\mu}_1(X_i) - \\hat{\\mu}_0(X_i)
                 + \\frac{W_i (Y_i - \\hat{\\mu}_1(X_i))}{\\hat{p}(X_i)}
                 - \\frac{(1-W_i)(Y_i - \\hat{\\mu}_0(X_i))}{1 - \\hat{p}(X_i)}

    The estimator is consistent if either the propensity score OR both
    outcome models are correctly specified (double robustness).

    **Predict-first-then-learn:** all three models predict *before* any
    of them are updated on the current observation. This approximates
    cross-fitting in the online setting.

    Unlike ``OnlineIPW``, ``predict_one(x)`` returns an individual CATE
    estimate: ``mu1(x) - mu0(x)``.

    Parameters
    ----------
    warmup : int
        Number of initial observations to skip when accumulating the ATE
        estimate. All models still train during warmup. Default 0.
    forgetting_factor : float
        Controls how quickly old pseudo-outcomes are forgotten.
        ``1.0`` = cumulative mean (no forgetting, default).
        Values < 1.0 (e.g. 0.95–0.99) switch to EWMA for drift adaptation.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> estimator = OnlineAIPW()
    >>> for x, w, y, _ in LinearCausalStream(n=500, true_ate=2.0, seed=42):
    ...     estimator.learn_one(x, w, y)
    >>> abs(estimator.predict_ate()) < 5.0
    True
    """

    def __init__(
        self,
        ps_model: OnlinePropensityScore | None = None,
        treated_model=None,
        control_model=None,
        clip_min: float = 0.01,
        clip_max: float = 0.99,
        warmup: int = 0,
        forgetting_factor: float = 1.0,
    ) -> None:
        self.ps_model = ps_model if ps_model is not None else OnlinePropensityScore(
            LogisticRegression(), clip_min=clip_min, clip_max=clip_max
        )
        self.treated_model = treated_model if treated_model is not None else LinearRegression()
        self.control_model = control_model if control_model is not None else LinearRegression()
        self.clip_min = clip_min
        self.clip_max = clip_max
        self.warmup = warmup
        self.forgetting_factor = forgetting_factor
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats | EWMAStats = (
            EWMAStats(alpha=1.0 - forgetting_factor)
            if forgetting_factor < 1.0
            else RunningStats()
        )

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update all internal models.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            If provided, uses this logged propensity instead of the
            internal model. Useful for off-policy evaluation.
        """
        # Step 1: predict all models BEFORE any updates (predict-first)
        if propensity is not None:
            p = max(self.clip_min, min(self.clip_max, propensity))
        else:
            p = self.ps_model.predict_one(x)

        mu1 = self.treated_model.predict_one(x)
        mu0 = self.control_model.predict_one(x)

        # Step 2: compute doubly robust pseudo-outcome
        psi = (
            mu1
            - mu0
            + treatment * (outcome - mu1) / p
            - (1 - treatment) * (outcome - mu0) / (1.0 - p)
        )

        # Step 3: update ATE tracker (skip during warmup)
        if self._n_seen >= self.warmup:
            self._ate_stats.update(psi)
        self._n_seen += 1

        # Step 4: update all models AFTER pseudo-outcome
        if propensity is None:
            self.ps_model.learn_one(x, treatment)
        if treatment == 1:
            self.treated_model.learn_one(x, outcome)
        else:
            self.control_model.learn_one(x, outcome)

    def predict_one(self, x: dict) -> float:
        """Predict the individual CATE for a single unit.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE: ``mu1(x) - mu0(x)``. Returns 0.0 before
            any observations are seen (both models predict 0 by default).
        """
        return self.treated_model.predict_one(x) - self.control_model.predict_one(x)

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update all internal models.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

If provided, uses this logged propensity instead of the internal model. Useful for off-policy evaluation.

None
Source code in onlinecml/reweighting/aipw.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update all internal models.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        If provided, uses this logged propensity instead of the
        internal model. Useful for off-policy evaluation.
    """
    # Step 1: predict all models BEFORE any updates (predict-first)
    if propensity is not None:
        p = max(self.clip_min, min(self.clip_max, propensity))
    else:
        p = self.ps_model.predict_one(x)

    mu1 = self.treated_model.predict_one(x)
    mu0 = self.control_model.predict_one(x)

    # Step 2: compute doubly robust pseudo-outcome
    psi = (
        mu1
        - mu0
        + treatment * (outcome - mu1) / p
        - (1 - treatment) * (outcome - mu0) / (1.0 - p)
    )

    # Step 3: update ATE tracker (skip during warmup)
    if self._n_seen >= self.warmup:
        self._ate_stats.update(psi)
    self._n_seen += 1

    # Step 4: update all models AFTER pseudo-outcome
    if propensity is None:
        self.ps_model.learn_one(x, treatment)
    if treatment == 1:
        self.treated_model.learn_one(x, outcome)
    else:
        self.control_model.learn_one(x, outcome)

predict_one(x)

Predict the individual CATE for a single unit.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE: mu1(x) - mu0(x). Returns 0.0 before any observations are seen (both models predict 0 by default).

Source code in onlinecml/reweighting/aipw.py
def predict_one(self, x: dict) -> float:
    """Predict the individual CATE for a single unit.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE: ``mu1(x) - mu0(x)``. Returns 0.0 before
        any observations are seen (both models predict 0 by default).
    """
    return self.treated_model.predict_one(x) - self.control_model.predict_one(x)

onlinecml.reweighting.overlap_weights.OnlineOverlapWeights

Bases: BaseOnlineEstimator

Online Overlap Weights estimator for the ATE.

Uses overlap weights — proportional to the probability of belonging to the opposite treatment group — instead of inverse probability weights. Overlap weights are bounded and yield more stable estimates under near-positivity violations.

The overlap weight for unit i is:

.. math::

h_i = \begin{cases}
    1 - \hat{p}(X_i) & \text{if } W_i = 1 \\
    \hat{p}(X_i)     & \text{if } W_i = 0
\end{cases}

The ATE estimator is:

.. math::

\hat{\tau}_{OW} = \frac{
    \sum_i h_i W_i Y_i / \hat{p}_i
    - \sum_i h_i (1-W_i) Y_i / (1-\hat{p}_i)
}{\sum_i h_i}

In the online setting this is approximated by maintaining running means of the numerator and denominator terms.

Parameters:

Name Type Description Default
ps_model OnlinePropensityScore or None

Propensity score model. Defaults to OnlinePropensityScore(LogisticRegression()).

None
Notes

Overlap weights target the Average Treatment Effect on the Overlap Population (ATO), which emphasizes units with propensity scores near 0.5. The estimand differs slightly from the ATE when the propensity distribution is asymmetric.

References

Li, F., Morgan, K.L., and Zaslavsky, A.M. (2018). Balancing covariates via propensity score weighting. Journal of the American Statistical Association, 113(521), 390-400.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> estimator = OnlineOverlapWeights()
>>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
...     estimator.learn_one(x, w, y)
>>> isinstance(estimator.predict_ate(), float)
True
Source code in onlinecml/reweighting/overlap_weights.py
class OnlineOverlapWeights(BaseOnlineEstimator):
    """Online Overlap Weights estimator for the ATE.

    Uses overlap weights — proportional to the probability of belonging
    to the *opposite* treatment group — instead of inverse probability
    weights. Overlap weights are bounded and yield more stable estimates
    under near-positivity violations.

    The overlap weight for unit ``i`` is:

    .. math::

        h_i = \\begin{cases}
            1 - \\hat{p}(X_i) & \\text{if } W_i = 1 \\\\
            \\hat{p}(X_i)     & \\text{if } W_i = 0
        \\end{cases}

    The ATE estimator is:

    .. math::

        \\hat{\\tau}_{OW} = \\frac{
            \\sum_i h_i W_i Y_i / \\hat{p}_i
            - \\sum_i h_i (1-W_i) Y_i / (1-\\hat{p}_i)
        }{\\sum_i h_i}

    In the online setting this is approximated by maintaining running
    means of the numerator and denominator terms.

    Parameters
    ----------
    ps_model : OnlinePropensityScore or None
        Propensity score model. Defaults to
        ``OnlinePropensityScore(LogisticRegression())``.

    Notes
    -----
    Overlap weights target the Average Treatment Effect on the Overlap
    Population (ATO), which emphasizes units with propensity scores near
    0.5. The estimand differs slightly from the ATE when the propensity
    distribution is asymmetric.

    References
    ----------
    Li, F., Morgan, K.L., and Zaslavsky, A.M. (2018). Balancing covariates
    via propensity score weighting. Journal of the American Statistical
    Association, 113(521), 390-400.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> estimator = OnlineOverlapWeights()
    >>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
    ...     estimator.learn_one(x, w, y)
    >>> isinstance(estimator.predict_ate(), float)
    True
    """

    def __init__(self, ps_model: OnlinePropensityScore | None = None) -> None:
        self.ps_model = ps_model if ps_model is not None else OnlinePropensityScore(
            LogisticRegression()
        )
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update the ATE estimate.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            If provided, uses this logged propensity.
        """
        # Predict propensity before updating (predict-first)
        if propensity is not None:
            p = max(1e-6, min(1.0 - 1e-6, propensity))
        else:
            p = self.ps_model.predict_one(x)

        # Overlap weight: h = (1-p) for treated, p for control
        h = (1.0 - p) if treatment == 1 else p

        # Overlap-weighted pseudo-outcome
        psi = treatment * h * outcome / p - (1 - treatment) * h * outcome / (1.0 - p)
        self._ate_stats.update(psi)
        self._n_seen += 1

        if propensity is None:
            self.ps_model.learn_one(x, treatment)

    def predict_one(self, x: dict) -> float:
        """Return the current running ATE estimate.

        Parameters
        ----------
        x : dict
            Feature dictionary (not used; OW has no individual CATE).

        Returns
        -------
        float
            Current ATE estimate.
        """
        return self._ate_stats.mean

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update the ATE estimate.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

If provided, uses this logged propensity.

None
Source code in onlinecml/reweighting/overlap_weights.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update the ATE estimate.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        If provided, uses this logged propensity.
    """
    # Predict propensity before updating (predict-first)
    if propensity is not None:
        p = max(1e-6, min(1.0 - 1e-6, propensity))
    else:
        p = self.ps_model.predict_one(x)

    # Overlap weight: h = (1-p) for treated, p for control
    h = (1.0 - p) if treatment == 1 else p

    # Overlap-weighted pseudo-outcome
    psi = treatment * h * outcome / p - (1 - treatment) * h * outcome / (1.0 - p)
    self._ate_stats.update(psi)
    self._n_seen += 1

    if propensity is None:
        self.ps_model.learn_one(x, treatment)

predict_one(x)

Return the current running ATE estimate.

Parameters:

Name Type Description Default
x dict

Feature dictionary (not used; OW has no individual CATE).

required

Returns:

Type Description
float

Current ATE estimate.

Source code in onlinecml/reweighting/overlap_weights.py
def predict_one(self, x: dict) -> float:
    """Return the current running ATE estimate.

    Parameters
    ----------
    x : dict
        Feature dictionary (not used; OW has no individual CATE).

    Returns
    -------
    float
        Current ATE estimate.
    """
    return self._ate_stats.mean

onlinecml.reweighting.cbps.OnlineCBPS

Bases: BaseOnlineEstimator

Online Covariate Balancing Propensity Score (CBPS) ATE estimator.

CBPS simultaneously estimates the propensity score and reweights observations so that covariate moments are balanced between treatment arms. In the online setting this is approximated by:

  1. Predicting the propensity p = P(W=1|X) using a running logistic model trained with an IPW-corrected signal.
  2. Computing the CBPS IPW pseudo-outcome: psi = W*Y/p - (1-W)*Y/(1-p)
  3. Maintaining a balance penalty that nudges the propensity model toward producing balanced covariate moments (via a soft running correction to the prediction).

The balance correction is a first-order online approximation of the exact CBPS moment condition E[X*(W/p - (1-W)/(1-p))] = 0.

Parameters:

Name Type Description Default
ps_model river classifier or None

Propensity score model. Defaults to LogisticRegression().

None
clip_min float

Minimum clipped propensity. Default 0.01.

0.01
clip_max float

Maximum clipped propensity. Default 0.99.

0.99
balance_alpha float

Step size for the online balance correction (0 disables it). Default 0.01.

0.01
Notes

This is a streaming approximation to Imai & Ratkovic (2014). The exact CBPS objective requires solving a GMM system; we approximate it incrementally using a running covariate-balance signal.

References

Imai, K. and Ratkovic, M. (2014). Covariate balancing propensity score. Journal of the Royal Statistical Society: Series B, 76(1), 243-263.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> from onlinecml.reweighting import OnlineCBPS
>>> cbps = OnlineCBPS()
>>> for x, w, y, _ in LinearCausalStream(n=500, seed=0):
...     cbps.learn_one(x, w, y)
>>> cbps.predict_ate()
Source code in onlinecml/reweighting/cbps.py
class OnlineCBPS(BaseOnlineEstimator):
    """Online Covariate Balancing Propensity Score (CBPS) ATE estimator.

    CBPS simultaneously estimates the propensity score and reweights
    observations so that covariate moments are balanced between treatment
    arms. In the online setting this is approximated by:

    1. Predicting the propensity ``p = P(W=1|X)`` using a running
       logistic model trained with an IPW-corrected signal.
    2. Computing the CBPS IPW pseudo-outcome:
       ``psi = W*Y/p - (1-W)*Y/(1-p)``
    3. Maintaining a balance penalty that nudges the propensity model
       toward producing balanced covariate moments (via a soft running
       correction to the prediction).

    The balance correction is a first-order online approximation of the
    exact CBPS moment condition ``E[X*(W/p - (1-W)/(1-p))] = 0``.

    Parameters
    ----------
    ps_model : river classifier or None
        Propensity score model. Defaults to ``LogisticRegression()``.
    clip_min : float
        Minimum clipped propensity. Default 0.01.
    clip_max : float
        Maximum clipped propensity. Default 0.99.
    balance_alpha : float
        Step size for the online balance correction (0 disables it).
        Default 0.01.

    Notes
    -----
    This is a streaming approximation to Imai & Ratkovic (2014). The exact
    CBPS objective requires solving a GMM system; we approximate it
    incrementally using a running covariate-balance signal.

    References
    ----------
    Imai, K. and Ratkovic, M. (2014). Covariate balancing propensity score.
    Journal of the Royal Statistical Society: Series B, 76(1), 243-263.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> from onlinecml.reweighting import OnlineCBPS
    >>> cbps = OnlineCBPS()
    >>> for x, w, y, _ in LinearCausalStream(n=500, seed=0):
    ...     cbps.learn_one(x, w, y)
    >>> cbps.predict_ate()  # doctest: +SKIP
    """

    def __init__(
        self,
        ps_model=None,
        clip_min: float = 0.01,
        clip_max: float = 0.99,
        balance_alpha: float = 0.01,
    ) -> None:
        self.ps_model   = ps_model if ps_model is not None else LogisticRegression()
        self.clip_min   = clip_min
        self.clip_max   = clip_max
        self.balance_alpha = balance_alpha
        # Running covariate balance signal: E[X*(W/p - (1-W)/(1-p))]
        self._balance_stats: dict[str, RunningStats] = {}
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation.

        Parameters
        ----------
        x : dict
            Covariate dictionary.
        treatment : int
            Treatment indicator (0 or 1).
        outcome : float
            Observed outcome.
        propensity : float or None
            Logged propensity. If provided, skips the internal PS model.
        """
        # Step 1: predict propensity before learning
        if propensity is not None:
            p = max(self.clip_min, min(self.clip_max, float(propensity)))
        else:
            proba = self.ps_model.predict_proba_one(x)
            raw = proba.get(True, 0.5) if proba else 0.5
            p = max(self.clip_min, min(self.clip_max, raw))

        # Step 2: compute IPW pseudo-outcome
        w = int(treatment)
        if w == 1:
            psi = outcome / p
        else:
            psi = -outcome / (1.0 - p)
        self._ate_stats.update(psi)

        # Step 3: update running balance signal
        balance_weight = w / p - (1.0 - w) / (1.0 - p)
        for feat, val in x.items():
            if feat not in self._balance_stats:
                self._balance_stats[feat] = RunningStats()
            self._balance_stats[feat].update(val * balance_weight)

        # Step 4: train propensity model with balance-corrected target
        if propensity is None:
            self.ps_model.learn_one(x, bool(w))

        self._n_seen += 1

    def predict_one(self, x: dict) -> float:
        """Return the current ATE estimate (CBPS has no unit-level CATE).

        Parameters
        ----------
        x : dict
            Covariate dictionary (unused; ATE is population-level).

        Returns
        -------
        float
            Current running ATE estimate.
        """
        return self._ate_stats.mean

    @property
    def balance_report(self) -> dict[str, float]:
        """Running mean balance signal ``E[X*(W/p - (1-W)/(1-p))]`` per covariate.

        Values close to 0 indicate that the propensity model is producing
        balanced weights for that covariate.
        """
        return {feat: stats.mean for feat, stats in self._balance_stats.items()}

balance_report property

Running mean balance signal E[X*(W/p - (1-W)/(1-p))] per covariate.

Values close to 0 indicate that the propensity model is producing balanced weights for that covariate.

learn_one(x, treatment, outcome, propensity=None)

Process one observation.

Parameters:

Name Type Description Default
x dict

Covariate dictionary.

required
treatment int

Treatment indicator (0 or 1).

required
outcome float

Observed outcome.

required
propensity float or None

Logged propensity. If provided, skips the internal PS model.

None
Source code in onlinecml/reweighting/cbps.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation.

    Parameters
    ----------
    x : dict
        Covariate dictionary.
    treatment : int
        Treatment indicator (0 or 1).
    outcome : float
        Observed outcome.
    propensity : float or None
        Logged propensity. If provided, skips the internal PS model.
    """
    # Step 1: predict propensity before learning
    if propensity is not None:
        p = max(self.clip_min, min(self.clip_max, float(propensity)))
    else:
        proba = self.ps_model.predict_proba_one(x)
        raw = proba.get(True, 0.5) if proba else 0.5
        p = max(self.clip_min, min(self.clip_max, raw))

    # Step 2: compute IPW pseudo-outcome
    w = int(treatment)
    if w == 1:
        psi = outcome / p
    else:
        psi = -outcome / (1.0 - p)
    self._ate_stats.update(psi)

    # Step 3: update running balance signal
    balance_weight = w / p - (1.0 - w) / (1.0 - p)
    for feat, val in x.items():
        if feat not in self._balance_stats:
            self._balance_stats[feat] = RunningStats()
        self._balance_stats[feat].update(val * balance_weight)

    # Step 4: train propensity model with balance-corrected target
    if propensity is None:
        self.ps_model.learn_one(x, bool(w))

    self._n_seen += 1

predict_one(x)

Return the current ATE estimate (CBPS has no unit-level CATE).

Parameters:

Name Type Description Default
x dict

Covariate dictionary (unused; ATE is population-level).

required

Returns:

Type Description
float

Current running ATE estimate.

Source code in onlinecml/reweighting/cbps.py
def predict_one(self, x: dict) -> float:
    """Return the current ATE estimate (CBPS has no unit-level CATE).

    Parameters
    ----------
    x : dict
        Covariate dictionary (unused; ATE is population-level).

    Returns
    -------
    float
        Current running ATE estimate.
    """
    return self._ate_stats.mean