Streaming Statistics Algorithms
Deep dive into the streaming algorithms that power PySuricata's memory-efficient statistics computation.
Overview
Streaming algorithms compute statistics in single-pass, constant-memory mode, enabling analysis of datasets larger than RAM.
Key Algorithms
- Welford's algorithm: Online mean and variance
- Pébay's formulas: Parallel merging of moments
- Higher moments: Skewness and kurtosis extension
- Numerical stability: Avoiding catastrophic cancellation
Welford's Online Algorithm
The Problem
Naive variance formula:
Issues: - Requires two passes (one for \(\sum x_i\), one for \(\sum x_i^2\)) - Catastrophic cancellation if \(\sum x_i^2 \approx (\sum x_i)^2 / n\) - Poor numerical stability
Welford's Solution
State variables: - \(n\): count - \(\mu\): running mean - \(M_2\): sum of squared deviations from current mean
Update formulas:
Finalize:
Derivation
Starting from the definition:
After adding \(x_{n+1}\):
Expand using the mean update:
After algebraic manipulation (see Welford 1962):
Key insight: Update uses both old and new mean, providing numerical stability.
Pseudocode
def welford_update(n, mean, M2, x):
"""Update running moments with new value x"""
n_new = n + 1
delta = x - mean
mean_new = mean + delta / n_new
delta2 = x - mean_new
M2_new = M2 + delta * delta2
return n_new, mean_new, M2_new
def welford_finalize(n, mean, M2):
"""Compute final statistics"""
if n < 2:
return mean, None
variance = M2 / (n - 1)
return mean, variance
Properties
- Single-pass: Only one scan through data
- Constant memory: O(1) space (3 numbers)
- Numerically stable: No catastrophic cancellation
- Exact: Same result as two-pass (up to FP rounding)
- Online: Can process streaming data
Pébay's Parallel Merge
The Problem
How to combine partial results from multiple chunks/threads?
Given: - State A: \((n_a, \mu_a, M_{2a})\) - State B: \((n_b, \mu_b, M_{2b})\)
Want: Combined state \((n, \mu, M_2)\) equivalent to processing all data together.
Pébay's Solution
Combined state:
Derivation
The combined mean is the weighted average:
For \(M_2\), use the identity:
Applying to both groups and summing:
Pseudocode
def pebay_merge(n_a, mean_a, M2_a, n_b, mean_b, M2_b):
"""Merge two partial states"""
n = n_a + n_b
if n == 0:
return 0, 0.0, 0.0
delta = mean_b - mean_a
mean = mean_a + delta * n_b / n
M2 = M2_a + M2_b + delta**2 * n_a * n_b / n
return n, mean, M2
Properties
- Associative: Order of merging doesn't matter
- Commutative: A ∪ B = B ∪ A
- Exact: Same result as single-pass over concatenated data
- Parallel: Enables multi-threading, distributed computation
- Fast: O(1) time to merge two states
Higher Moments Extension
Third and Fourth Moments
State variables: - \(n\), \(\mu\), \(M_2\), \(M_3\), \(M_4\)
Where: - \(M_3 = \sum (x_i - \mu)^3\) - \(M_4 = \sum (x_i - \mu)^4\)
Online Update
def moments_update(n, mean, M2, M3, M4, x):
"""Update all four moments"""
n_new = n + 1
delta = x - mean
delta_n = delta / n_new
delta_n2 = delta_n * delta_n
term1 = delta * delta_n * n
mean_new = mean + delta_n
M4_new = M4 + term1 * delta_n2 * (n_new*n_new - 3*n_new + 3) + 6*delta_n2*M2 - 4*delta_n*M3
M3_new = M3 + term1 * delta_n * (n_new - 2) - 3*delta_n*M2
M2_new = M2 + term1
return n_new, mean_new, M2_new, M3_new, M4_new
Pébay Merge for Higher Moments
def pebay_merge_moments(n_a, mean_a, M2_a, M3_a, M4_a,
n_b, mean_b, M2_b, M3_b, M4_b):
"""Merge higher moments"""
n = n_a + n_b
if n == 0:
return 0, 0.0, 0.0, 0.0, 0.0
delta = mean_b - mean_a
delta2 = delta * delta
delta3 = delta2 * delta
delta4 = delta3 * delta
mean = mean_a + delta * n_b / n
M2 = M2_a + M2_b + delta2 * n_a * n_b / n
M3 = M3_a + M3_b + \
delta3 * n_a * n_b * (n_a - n_b) / (n * n) + \
3 * delta * (n_a * M2_b - n_b * M2_a) / n
M4 = M4_a + M4_b + \
delta4 * n_a * n_b * (n_a*n_a - n_a*n_b + n_b*n_b) / (n * n * n) + \
6 * delta2 * (n_a*n_a * M2_b + n_b*n_b * M2_a) / (n * n) + \
4 * delta * (n_a * M3_b - n_b * M3_a) / n
return n, mean, M2, M3, M4
Computing Skewness and Kurtosis
def compute_shape(n, M2, M3, M4):
"""Compute skewness and excess kurtosis"""
if n < 3:
return None, None
variance = M2 / (n - 1)
if variance == 0:
return None, None
# Skewness (g1)
g1 = (n / ((n-1) * (n-2))) * (M3 / n) / (variance ** 1.5)
if n < 4:
return g1, None
# Excess kurtosis (g2)
g2 = ((n * (n+1)) / ((n-1) * (n-2) * (n-3))) * (M4 / n) / (variance ** 2) - \
(3 * (n-1) ** 2) / ((n-2) * (n-3))
return g1, g2
Numerical Stability Analysis
Why Naive Formula Fails
Consider \(x_i \approx 10^9 + \epsilon_i\) where \(|\epsilon_i| \ll 10^9\).
Naive formula:
Subtraction loses precision (catastrophic cancellation).
Welford's formula:
Works with deviations \(x_i - \mu\), which are \(O(\epsilon)\), avoiding large intermediate values.
Condition Number
For variance computation, Welford's algorithm has condition number \(\kappa \approx 1\), while naive formula has \(\kappa \approx \frac{\bar{x}^2}{\sigma^2}\) (can be huge).
Parallelization
MapReduce Pattern
# Map phase: compute partial moments per chunk
def map_chunk(chunk):
n, mean, M2, M3, M4 = 0, 0.0, 0.0, 0.0, 0.0
for x in chunk:
n, mean, M2, M3, M4 = moments_update(n, mean, M2, M3, M4, x)
return n, mean, M2, M3, M4
# Reduce phase: merge all partial results
def reduce_moments(states):
result = (0, 0.0, 0.0, 0.0, 0.0)
for state in states:
result = pebay_merge_moments(*result, *state)
return result
# Usage
partial_states = [map_chunk(chunk) for chunk in chunks]
final_state = reduce_moments(partial_states)
Multi-threading
from concurrent.futures import ThreadPoolExecutor
def parallel_moments(data, n_threads=4):
chunks = np.array_split(data, n_threads)
with ThreadPoolExecutor(max_workers=n_threads) as executor:
states = list(executor.map(map_chunk, chunks))
return reduce_moments(states)
Implementation in PySuricata
StreamingMoments Class
class StreamingMoments:
def __init__(self):
self.n = 0
self.mean = 0.0
self.M2 = 0.0
self.M3 = 0.0
self.M4 = 0.0
def update(self, values: np.ndarray):
"""Update with array of values"""
for x in values:
if not np.isfinite(x):
continue
self.n, self.mean, self.M2, self.M3, self.M4 = \
moments_update(self.n, self.mean, self.M2, self.M3, self.M4, x)
def merge(self, other: 'StreamingMoments'):
"""Merge with another moments object"""
self.n, self.mean, self.M2, self.M3, self.M4 = \
pebay_merge_moments(
self.n, self.mean, self.M2, self.M3, self.M4,
other.n, other.mean, other.M2, other.M3, other.M4
)
def finalize(self):
"""Compute final statistics"""
if self.n < 2:
return {"mean": self.mean, "variance": None, "skewness": None, "kurtosis": None}
variance = self.M2 / (self.n - 1)
std = math.sqrt(variance)
skewness, kurtosis = compute_shape(self.n, self.M2, self.M3, self.M4)
return {
"count": self.n,
"mean": self.mean,
"variance": variance,
"std": std,
"skewness": skewness,
"kurtosis": kurtosis
}
Validation
Test Properties
def test_welford_equivalence():
"""Verify Welford = two-pass"""
data = np.random.randn(10000)
# Welford
n, mean, M2 = 0, 0.0, 0.0
for x in data:
n, mean, M2 = welford_update(n, mean, M2, x)
var_welford = M2 / (n - 1)
# Two-pass
mean_twopass = np.mean(data)
var_twopass = np.var(data, ddof=1)
assert np.isclose(mean, mean_twopass)
assert np.isclose(var_welford, var_twopass)
def test_pebay_merge():
"""Verify merge = concatenate"""
data_a = np.random.randn(5000)
data_b = np.random.randn(3000)
# Separate
state_a = compute_moments(data_a)
state_b = compute_moments(data_b)
merged = pebay_merge_moments(*state_a, *state_b)
# Combined
data_combined = np.concatenate([data_a, data_b])
combined = compute_moments(data_combined)
assert np.allclose(merged, combined)
References
-
Welford, B.P. (1962), "Note on a Method for Calculating Corrected Sums of Squares and Products", Technometrics, 4(3): 419–420.
-
Pébay, P. (2008), "Formulas for Robust, One-Pass Parallel Computation of Covariances and Arbitrary-Order Statistical Moments", Sandia Report SAND2008-6212.
-
Chan, T.F., Golub, G.H., LeVeque, R.J. (1983), "Algorithms for Computing the Sample Variance: Analysis and Recommendations", The American Statistician, 37(3): 242–247.
-
West, D.H.D. (1979), "Updating Mean and Variance Estimates: An Improved Method", Communications of the ACM, 22(9): 532–535.
-
Wikipedia: Algorithms for calculating variance - Link
See Also
- Numeric Analysis - Application of these algorithms
- Sketch Algorithms - Other streaming algorithms
- Performance Tips - Optimization strategies
Last updated: 2025-10-12