Skip to content

Matching

onlinecml.matching.online_matching.OnlineMatching

Bases: BaseOnlineEstimator

Online K-nearest-neighbor matching estimator for CATE.

Maintains separate sliding-window buffers for treated and control units. For each new observation, finds the K nearest neighbors in the opposite treatment arm and computes a matched CATE estimate via IPW-corrected neighbor averaging.

Parameters:

Name Type Description Default
k int

Number of nearest neighbors to match. Default 1.

1
buffer_size int

Maximum number of units to retain in each arm's buffer. Older units are dropped when the buffer is full (FIFO). Default 200.

200
distance_fn callable or None

Distance function f(x1, x2) -> float. Defaults to euclidean_distance.

None
Notes

The buffer implements Sliding Window Nearest Neighbor (SWINN) matching. With finite buffer_size, older observations may be dropped. This provides implicit adaptation to concept drift at the cost of match quality early in the stream.

The per-observation CATE estimate is:

.. math::

\hat{\tau}_i = Y_i - \frac{1}{K} \sum_{j \in \mathcal{N}(i)} Y_j

where N(i) is the K nearest neighbors in the opposite arm.

Predict-then-match: The CATE estimate is computed from the current buffer before the new observation is added.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> matcher = OnlineMatching(k=3, buffer_size=100)
>>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
...     matcher.learn_one(x, w, y)
>>> isinstance(matcher.predict_ate(), float)
True
Source code in onlinecml/matching/online_matching.py
class OnlineMatching(BaseOnlineEstimator):
    """Online K-nearest-neighbor matching estimator for CATE.

    Maintains separate sliding-window buffers for treated and control
    units. For each new observation, finds the K nearest neighbors in
    the opposite treatment arm and computes a matched CATE estimate via
    IPW-corrected neighbor averaging.

    Parameters
    ----------
    k : int
        Number of nearest neighbors to match. Default 1.
    buffer_size : int
        Maximum number of units to retain in each arm's buffer.
        Older units are dropped when the buffer is full (FIFO).
        Default 200.
    distance_fn : callable or None
        Distance function ``f(x1, x2) -> float``. Defaults to
        ``euclidean_distance``.

    Notes
    -----
    The buffer implements Sliding Window Nearest Neighbor (SWINN) matching.
    With finite ``buffer_size``, older observations may be dropped. This
    provides implicit adaptation to concept drift at the cost of match
    quality early in the stream.

    The per-observation CATE estimate is:

    .. math::

        \\hat{\\tau}_i = Y_i - \\frac{1}{K} \\sum_{j \\in \\mathcal{N}(i)} Y_j

    where ``N(i)`` is the K nearest neighbors in the opposite arm.

    **Predict-then-match:** The CATE estimate is computed from the current
    buffer *before* the new observation is added.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> matcher = OnlineMatching(k=3, buffer_size=100)
    >>> for x, w, y, _ in LinearCausalStream(n=500, seed=42):
    ...     matcher.learn_one(x, w, y)
    >>> isinstance(matcher.predict_ate(), float)
    True
    """

    def __init__(
        self,
        k: int = 1,
        buffer_size: int = 200,
        distance_fn: DistanceFn | None = None,
    ) -> None:
        self.k = k
        self.buffer_size = buffer_size
        self.distance_fn = distance_fn if distance_fn is not None else euclidean_distance
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()
        self._treated_buffer: deque = deque()   # list of (x, y) tuples
        self._control_buffer: deque = deque()

    def _find_knn(self, x: dict, buffer: deque, k: int) -> list[float]:
        """Find K nearest neighbor outcomes in a buffer.

        Parameters
        ----------
        x : dict
            Query feature dictionary.
        buffer : deque
            Buffer of ``(features, outcome)`` tuples to search.
        k : int
            Number of neighbors to return.

        Returns
        -------
        list of float
            Outcomes of the K nearest neighbors. Returns an empty list
            if the buffer is empty.
        """
        if not buffer:
            return []
        distances = [(self.distance_fn(x, bx), by) for bx, by in buffer]
        k_nearest = heapq.nsmallest(k, distances, key=lambda t: t[0])
        return [y for _, y in k_nearest]

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update the matched CATE estimate.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            Not used; included for API compatibility.
        """
        # Match against the opposite arm's buffer (predict-then-add)
        if treatment == 1:
            neighbor_outcomes = self._find_knn(x, self._control_buffer, self.k)
        else:
            neighbor_outcomes = self._find_knn(x, self._treated_buffer, self.k)

        if neighbor_outcomes:
            neighbor_mean = sum(neighbor_outcomes) / len(neighbor_outcomes)
            # Treated: cate = Y - neighbor_mean; Control: cate = neighbor_mean - Y
            cate = outcome - neighbor_mean if treatment == 1 else neighbor_mean - outcome
            self._ate_stats.update(cate)

        self._n_seen += 1

        # Add current obs to the appropriate buffer
        if treatment == 1:
            self._treated_buffer.append((x, outcome))
            if len(self._treated_buffer) > self.buffer_size:
                self._treated_buffer.popleft()
        else:
            self._control_buffer.append((x, outcome))
            if len(self._control_buffer) > self.buffer_size:
                self._control_buffer.popleft()

    def predict_one(self, x: dict) -> float:
        """Predict the CATE for a single unit via nearest-neighbor matching.

        Finds the K nearest treated and control neighbors in the buffer,
        and returns the difference in their mean outcomes.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE. Returns 0.0 if either buffer is empty.
        """
        treated_nn = self._find_knn(x, self._treated_buffer, self.k)
        control_nn = self._find_knn(x, self._control_buffer, self.k)
        if not treated_nn or not control_nn:
            return 0.0
        return sum(treated_nn) / len(treated_nn) - sum(control_nn) / len(control_nn)

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update the matched CATE estimate.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

Not used; included for API compatibility.

None
Source code in onlinecml/matching/online_matching.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update the matched CATE estimate.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        Not used; included for API compatibility.
    """
    # Match against the opposite arm's buffer (predict-then-add)
    if treatment == 1:
        neighbor_outcomes = self._find_knn(x, self._control_buffer, self.k)
    else:
        neighbor_outcomes = self._find_knn(x, self._treated_buffer, self.k)

    if neighbor_outcomes:
        neighbor_mean = sum(neighbor_outcomes) / len(neighbor_outcomes)
        # Treated: cate = Y - neighbor_mean; Control: cate = neighbor_mean - Y
        cate = outcome - neighbor_mean if treatment == 1 else neighbor_mean - outcome
        self._ate_stats.update(cate)

    self._n_seen += 1

    # Add current obs to the appropriate buffer
    if treatment == 1:
        self._treated_buffer.append((x, outcome))
        if len(self._treated_buffer) > self.buffer_size:
            self._treated_buffer.popleft()
    else:
        self._control_buffer.append((x, outcome))
        if len(self._control_buffer) > self.buffer_size:
            self._control_buffer.popleft()

predict_one(x)

Predict the CATE for a single unit via nearest-neighbor matching.

Finds the K nearest treated and control neighbors in the buffer, and returns the difference in their mean outcomes.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE. Returns 0.0 if either buffer is empty.

Source code in onlinecml/matching/online_matching.py
def predict_one(self, x: dict) -> float:
    """Predict the CATE for a single unit via nearest-neighbor matching.

    Finds the K nearest treated and control neighbors in the buffer,
    and returns the difference in their mean outcomes.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE. Returns 0.0 if either buffer is empty.
    """
    treated_nn = self._find_knn(x, self._treated_buffer, self.k)
    control_nn = self._find_knn(x, self._control_buffer, self.k)
    if not treated_nn or not control_nn:
        return 0.0
    return sum(treated_nn) / len(treated_nn) - sum(control_nn) / len(control_nn)

onlinecml.matching.caliper_matching.OnlineCaliperMatching

Bases: BaseOnlineEstimator

Online matching with a maximum distance threshold (caliper).

Extends nearest-neighbor matching by rejecting matches that exceed a maximum distance threshold. Units that cannot be matched within the caliper are tracked separately. Reports the proportion of units in common support.

Parameters:

Name Type Description Default
caliper float

Maximum allowable distance for a match. Observations whose nearest neighbor is farther than caliper are counted as unmatched. Default 1.0.

1.0
buffer_size int

Maximum number of units to retain in each arm's buffer. Default 200.

200
distance_fn callable or None

Distance function f(x1, x2) -> float. Defaults to euclidean_distance.

None
Notes

The common_support_rate property returns the proportion of observations that were successfully matched (distance ≤ caliper). A high unmatched rate indicates a positivity violation.

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> matcher = OnlineCaliperMatching(caliper=2.0, buffer_size=100)
>>> for x, w, y, _ in LinearCausalStream(n=300, seed=42):
...     matcher.learn_one(x, w, y)
>>> isinstance(matcher.common_support_rate, float)
True
Source code in onlinecml/matching/caliper_matching.py
class OnlineCaliperMatching(BaseOnlineEstimator):
    """Online matching with a maximum distance threshold (caliper).

    Extends nearest-neighbor matching by rejecting matches that exceed
    a maximum distance threshold. Units that cannot be matched within
    the caliper are tracked separately. Reports the proportion of units
    in common support.

    Parameters
    ----------
    caliper : float
        Maximum allowable distance for a match. Observations whose
        nearest neighbor is farther than ``caliper`` are counted as
        unmatched. Default 1.0.
    buffer_size : int
        Maximum number of units to retain in each arm's buffer. Default 200.
    distance_fn : callable or None
        Distance function ``f(x1, x2) -> float``.
        Defaults to ``euclidean_distance``.

    Notes
    -----
    The ``common_support_rate`` property returns the proportion of
    observations that were successfully matched (distance ≤ caliper).
    A high unmatched rate indicates a positivity violation.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> matcher = OnlineCaliperMatching(caliper=2.0, buffer_size=100)
    >>> for x, w, y, _ in LinearCausalStream(n=300, seed=42):
    ...     matcher.learn_one(x, w, y)
    >>> isinstance(matcher.common_support_rate, float)
    True
    """

    def __init__(
        self,
        caliper: float = 1.0,
        buffer_size: int = 200,
        distance_fn: DistanceFn | None = None,
    ) -> None:
        self.caliper = caliper
        self.buffer_size = buffer_size
        self.distance_fn = distance_fn if distance_fn is not None else euclidean_distance
        # Non-constructor state
        self._n_seen: int = 0
        self._n_matched: int = 0
        self._ate_stats: RunningStats = RunningStats()
        self._treated_buffer: deque = deque()
        self._control_buffer: deque = deque()

    def _find_nearest(self, x: dict, buffer: deque) -> tuple[float, float] | None:
        """Find the nearest neighbor in a buffer, return (distance, outcome).

        Parameters
        ----------
        x : dict
            Query feature dictionary.
        buffer : deque
            Buffer of ``(features, outcome)`` tuples.

        Returns
        -------
        tuple of (float, float) or None
            ``(distance, outcome)`` of the nearest neighbor, or None if
            the buffer is empty.
        """
        if not buffer:
            return None
        best_dist = float("inf")
        best_y = 0.0
        for bx, by in buffer:
            d = self.distance_fn(x, bx)
            if d < best_dist:
                best_dist = d
                best_y = by
        return (best_dist, best_y)

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation with caliper-constrained matching.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            Not used; included for API compatibility.
        """
        opposite_buffer = self._control_buffer if treatment == 1 else self._treated_buffer
        result = self._find_nearest(x, opposite_buffer)
        self._n_seen += 1

        if result is not None:
            dist, neighbor_y = result
            if dist <= self.caliper:
                cate = outcome - neighbor_y if treatment == 1 else neighbor_y - outcome
                self._ate_stats.update(cate)
                self._n_matched += 1

        # Add to appropriate buffer
        if treatment == 1:
            self._treated_buffer.append((x, outcome))
            if len(self._treated_buffer) > self.buffer_size:
                self._treated_buffer.popleft()
        else:
            self._control_buffer.append((x, outcome))
            if len(self._control_buffer) > self.buffer_size:
                self._control_buffer.popleft()

    def predict_one(self, x: dict) -> float:
        """Predict CATE for a single unit via caliper-constrained matching.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE. Returns 0.0 if either buffer is empty or if
            the nearest neighbors exceed the caliper.
        """
        res_t = self._find_nearest(x, self._treated_buffer)
        res_c = self._find_nearest(x, self._control_buffer)
        if res_t is None or res_c is None:
            return 0.0
        dist_t, y_t = res_t
        dist_c, y_c = res_c
        if dist_t > self.caliper or dist_c > self.caliper:
            return 0.0
        return y_t - y_c

    @property
    def common_support_rate(self) -> float:
        """Proportion of observations successfully matched within the caliper.

        Returns
        -------
        float
            Value in [0, 1]. Returns 0.0 before any observations are seen.
        """
        if self._n_seen == 0:
            return 0.0
        return self._n_matched / self._n_seen

common_support_rate property

Proportion of observations successfully matched within the caliper.

Returns:

Type Description
float

Value in [0, 1]. Returns 0.0 before any observations are seen.

learn_one(x, treatment, outcome, propensity=None)

Process one observation with caliper-constrained matching.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

Not used; included for API compatibility.

None
Source code in onlinecml/matching/caliper_matching.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation with caliper-constrained matching.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        Not used; included for API compatibility.
    """
    opposite_buffer = self._control_buffer if treatment == 1 else self._treated_buffer
    result = self._find_nearest(x, opposite_buffer)
    self._n_seen += 1

    if result is not None:
        dist, neighbor_y = result
        if dist <= self.caliper:
            cate = outcome - neighbor_y if treatment == 1 else neighbor_y - outcome
            self._ate_stats.update(cate)
            self._n_matched += 1

    # Add to appropriate buffer
    if treatment == 1:
        self._treated_buffer.append((x, outcome))
        if len(self._treated_buffer) > self.buffer_size:
            self._treated_buffer.popleft()
    else:
        self._control_buffer.append((x, outcome))
        if len(self._control_buffer) > self.buffer_size:
            self._control_buffer.popleft()

predict_one(x)

Predict CATE for a single unit via caliper-constrained matching.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE. Returns 0.0 if either buffer is empty or if the nearest neighbors exceed the caliper.

Source code in onlinecml/matching/caliper_matching.py
def predict_one(self, x: dict) -> float:
    """Predict CATE for a single unit via caliper-constrained matching.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE. Returns 0.0 if either buffer is empty or if
        the nearest neighbors exceed the caliper.
    """
    res_t = self._find_nearest(x, self._treated_buffer)
    res_c = self._find_nearest(x, self._control_buffer)
    if res_t is None or res_c is None:
        return 0.0
    dist_t, y_t = res_t
    dist_c, y_c = res_c
    if dist_t > self.caliper or dist_c > self.caliper:
        return 0.0
    return y_t - y_c

onlinecml.matching.kernel_matching.OnlineKernelMatching

Bases: BaseOnlineEstimator

Online kernel-weighted matching for CATE estimation.

Instead of selecting K discrete neighbors, uses all units in the opposite arm's buffer with weights determined by a kernel function. The CATE for a treated unit is:

.. math::

\hat{\tau}(x) = Y_i - \frac{\sum_j K(d(x, x_j)) Y_j}{\sum_j K(d(x, x_j))}

Parameters:

Name Type Description Default
bandwidth float

Bandwidth of the kernel. Smaller values → sharper matching. Default 1.0.

1.0
buffer_size int

Maximum number of units to retain per arm. Default 200.

200
distance_fn callable or None

Distance function f(x1, x2) -> float. Defaults to euclidean_distance.

None
kernel_fn callable or None

Kernel function f(distance, bandwidth) -> float. Defaults to Gaussian kernel.

None

Examples:

>>> from onlinecml.datasets import LinearCausalStream
>>> matcher = OnlineKernelMatching(bandwidth=1.5, buffer_size=100)
>>> for x, w, y, _ in LinearCausalStream(n=300, seed=42):
...     matcher.learn_one(x, w, y)
>>> isinstance(matcher.predict_ate(), float)
True
Source code in onlinecml/matching/kernel_matching.py
class OnlineKernelMatching(BaseOnlineEstimator):
    """Online kernel-weighted matching for CATE estimation.

    Instead of selecting K discrete neighbors, uses all units in the
    opposite arm's buffer with weights determined by a kernel function.
    The CATE for a treated unit is:

    .. math::

        \\hat{\\tau}(x) = Y_i - \\frac{\\sum_j K(d(x, x_j)) Y_j}{\\sum_j K(d(x, x_j))}

    Parameters
    ----------
    bandwidth : float
        Bandwidth of the kernel. Smaller values → sharper matching.
        Default 1.0.
    buffer_size : int
        Maximum number of units to retain per arm. Default 200.
    distance_fn : callable or None
        Distance function ``f(x1, x2) -> float``.
        Defaults to ``euclidean_distance``.
    kernel_fn : callable or None
        Kernel function ``f(distance, bandwidth) -> float``.
        Defaults to Gaussian kernel.

    Examples
    --------
    >>> from onlinecml.datasets import LinearCausalStream
    >>> matcher = OnlineKernelMatching(bandwidth=1.5, buffer_size=100)
    >>> for x, w, y, _ in LinearCausalStream(n=300, seed=42):
    ...     matcher.learn_one(x, w, y)
    >>> isinstance(matcher.predict_ate(), float)
    True
    """

    def __init__(
        self,
        bandwidth: float = 1.0,
        buffer_size: int = 200,
        distance_fn: DistanceFn | None = None,
        kernel_fn: Callable[[float, float], float] | None = None,
    ) -> None:
        self.bandwidth = bandwidth
        self.buffer_size = buffer_size
        self.distance_fn = distance_fn if distance_fn is not None else euclidean_distance
        self.kernel_fn = kernel_fn if kernel_fn is not None else _gaussian_kernel
        # Non-constructor state
        self._n_seen: int = 0
        self._ate_stats: RunningStats = RunningStats()
        self._treated_buffer: deque = deque()
        self._control_buffer: deque = deque()

    def _kernel_weighted_mean(self, x: dict, buffer: deque) -> float | None:
        """Compute the kernel-weighted mean outcome from a buffer.

        Parameters
        ----------
        x : dict
            Query feature dictionary.
        buffer : deque
            Buffer of ``(features, outcome)`` tuples.

        Returns
        -------
        float or None
            Kernel-weighted mean outcome, or None if buffer is empty
            or all weights are zero.
        """
        if not buffer:
            return None
        total_weight = 0.0
        weighted_sum = 0.0
        for bx, by in buffer:
            d = self.distance_fn(x, bx)
            w = self.kernel_fn(d, self.bandwidth)
            total_weight += w
            weighted_sum += w * by
        if total_weight <= 0.0:
            return None
        return weighted_sum / total_weight

    def learn_one(
        self,
        x: dict,
        treatment: int,
        outcome: float,
        propensity: float | None = None,
    ) -> None:
        """Process one observation and update the CATE estimate.

        Parameters
        ----------
        x : dict
            Feature dictionary for this observation.
        treatment : int
            Treatment indicator (0 = control, 1 = treated).
        outcome : float
            Observed outcome.
        propensity : float or None
            Not used; included for API compatibility.
        """
        opposite_buffer = self._control_buffer if treatment == 1 else self._treated_buffer
        neighbor_mean = self._kernel_weighted_mean(x, opposite_buffer)
        self._n_seen += 1

        if neighbor_mean is not None:
            cate = outcome - neighbor_mean if treatment == 1 else neighbor_mean - outcome
            self._ate_stats.update(cate)

        if treatment == 1:
            self._treated_buffer.append((x, outcome))
            if len(self._treated_buffer) > self.buffer_size:
                self._treated_buffer.popleft()
        else:
            self._control_buffer.append((x, outcome))
            if len(self._control_buffer) > self.buffer_size:
                self._control_buffer.popleft()

    def predict_one(self, x: dict) -> float:
        """Predict the CATE for a single unit via kernel-weighted matching.

        Parameters
        ----------
        x : dict
            Feature dictionary for the unit.

        Returns
        -------
        float
            Estimated CATE. Returns 0.0 if either buffer is empty.
        """
        y_t = self._kernel_weighted_mean(x, self._treated_buffer)
        y_c = self._kernel_weighted_mean(x, self._control_buffer)
        if y_t is None or y_c is None:
            return 0.0
        return y_t - y_c

learn_one(x, treatment, outcome, propensity=None)

Process one observation and update the CATE estimate.

Parameters:

Name Type Description Default
x dict

Feature dictionary for this observation.

required
treatment int

Treatment indicator (0 = control, 1 = treated).

required
outcome float

Observed outcome.

required
propensity float or None

Not used; included for API compatibility.

None
Source code in onlinecml/matching/kernel_matching.py
def learn_one(
    self,
    x: dict,
    treatment: int,
    outcome: float,
    propensity: float | None = None,
) -> None:
    """Process one observation and update the CATE estimate.

    Parameters
    ----------
    x : dict
        Feature dictionary for this observation.
    treatment : int
        Treatment indicator (0 = control, 1 = treated).
    outcome : float
        Observed outcome.
    propensity : float or None
        Not used; included for API compatibility.
    """
    opposite_buffer = self._control_buffer if treatment == 1 else self._treated_buffer
    neighbor_mean = self._kernel_weighted_mean(x, opposite_buffer)
    self._n_seen += 1

    if neighbor_mean is not None:
        cate = outcome - neighbor_mean if treatment == 1 else neighbor_mean - outcome
        self._ate_stats.update(cate)

    if treatment == 1:
        self._treated_buffer.append((x, outcome))
        if len(self._treated_buffer) > self.buffer_size:
            self._treated_buffer.popleft()
    else:
        self._control_buffer.append((x, outcome))
        if len(self._control_buffer) > self.buffer_size:
            self._control_buffer.popleft()

predict_one(x)

Predict the CATE for a single unit via kernel-weighted matching.

Parameters:

Name Type Description Default
x dict

Feature dictionary for the unit.

required

Returns:

Type Description
float

Estimated CATE. Returns 0.0 if either buffer is empty.

Source code in onlinecml/matching/kernel_matching.py
def predict_one(self, x: dict) -> float:
    """Predict the CATE for a single unit via kernel-weighted matching.

    Parameters
    ----------
    x : dict
        Feature dictionary for the unit.

    Returns
    -------
    float
        Estimated CATE. Returns 0.0 if either buffer is empty.
    """
    y_t = self._kernel_weighted_mean(x, self._treated_buffer)
    y_c = self._kernel_weighted_mean(x, self._control_buffer)
    if y_t is None or y_c is None:
        return 0.0
    return y_t - y_c

Distance Functions

onlinecml.matching.distance.euclidean_distance(x, y)

Compute the Euclidean distance between two feature dicts.

Only features present in both dicts are used. Missing features are treated as 0.

Parameters:

Name Type Description Default
x dict

First feature dictionary.

required
y dict

Second feature dictionary.

required

Returns:

Type Description
float

Euclidean distance.

Source code in onlinecml/matching/distance.py
def euclidean_distance(x: dict, y: dict) -> float:
    """Compute the Euclidean distance between two feature dicts.

    Only features present in both dicts are used. Missing features are
    treated as 0.

    Parameters
    ----------
    x : dict
        First feature dictionary.
    y : dict
        Second feature dictionary.

    Returns
    -------
    float
        Euclidean distance.
    """
    keys = set(x) | set(y)
    return math.sqrt(sum((x.get(k, 0.0) - y.get(k, 0.0)) ** 2 for k in keys))

onlinecml.matching.distance.ps_distance(p_x, p_y)

Compute the absolute difference in propensity scores.

Parameters:

Name Type Description Default
p_x float

Propensity score for unit x.

required
p_y float

Propensity score for unit y.

required

Returns:

Type Description
float

Absolute propensity score distance |p_x - p_y|.

Source code in onlinecml/matching/distance.py
def ps_distance(p_x: float, p_y: float) -> float:
    """Compute the absolute difference in propensity scores.

    Parameters
    ----------
    p_x : float
        Propensity score for unit x.
    p_y : float
        Propensity score for unit y.

    Returns
    -------
    float
        Absolute propensity score distance ``|p_x - p_y|``.
    """
    return abs(p_x - p_y)

onlinecml.matching.distance.mahalanobis_distance(x, y, cov_inv=None)

Compute the Mahalanobis distance between two feature dicts.

If no inverse covariance matrix is provided, falls back to scaled Euclidean distance (divides each dimension by its variance proxy = 1).

Parameters:

Name Type Description Default
x dict

First feature dictionary.

required
y dict

Second feature dictionary.

required
cov_inv dict or None

Inverse covariance matrix as a nested dict {feature: {feature: value}}. If None, uses identity matrix (equivalent to Euclidean distance).

None

Returns:

Type Description
float

Mahalanobis distance.

Source code in onlinecml/matching/distance.py
def mahalanobis_distance(x: dict, y: dict, cov_inv: dict | None = None) -> float:
    """Compute the Mahalanobis distance between two feature dicts.

    If no inverse covariance matrix is provided, falls back to scaled
    Euclidean distance (divides each dimension by its variance proxy = 1).

    Parameters
    ----------
    x : dict
        First feature dictionary.
    y : dict
        Second feature dictionary.
    cov_inv : dict or None
        Inverse covariance matrix as a nested dict
        ``{feature: {feature: value}}``. If None, uses identity matrix
        (equivalent to Euclidean distance).

    Returns
    -------
    float
        Mahalanobis distance.
    """
    keys = sorted(set(x) | set(y))
    diff = [x.get(k, 0.0) - y.get(k, 0.0) for k in keys]
    if cov_inv is None:
        return math.sqrt(sum(d * d for d in diff))
    # d^T Sigma^{-1} d
    result = 0.0
    for i, ki in enumerate(keys):
        row = cov_inv.get(ki, {})
        for j, kj in enumerate(keys):
            result += diff[i] * row.get(kj, 0.0) * diff[j]
    return math.sqrt(max(0.0, result))

onlinecml.matching.distance.combined_distance(x, y, p_x, p_y, ps_weight=0.5)

Compute a weighted combination of Euclidean and PS distance.

Parameters:

Name Type Description Default
x dict

First feature dictionary.

required
y dict

Second feature dictionary.

required
p_x float

Propensity score for unit x.

required
p_y float

Propensity score for unit y.

required
ps_weight float

Weight on the PS distance component (0 to 1). Default 0.5.

0.5

Returns:

Type Description
float

Combined distance.

Source code in onlinecml/matching/distance.py
def combined_distance(
    x: dict,
    y: dict,
    p_x: float,
    p_y: float,
    ps_weight: float = 0.5,
) -> float:
    """Compute a weighted combination of Euclidean and PS distance.

    Parameters
    ----------
    x : dict
        First feature dictionary.
    y : dict
        Second feature dictionary.
    p_x : float
        Propensity score for unit x.
    p_y : float
        Propensity score for unit y.
    ps_weight : float
        Weight on the PS distance component (0 to 1). Default 0.5.

    Returns
    -------
    float
        Combined distance.
    """
    return (1.0 - ps_weight) * euclidean_distance(x, y) + ps_weight * ps_distance(p_x, p_y)