Skip to content

Annotated Sequence Diagrams with Complexity Analysis

This document contains sequence diagrams showing the data flow through PySuricata's streaming algorithms, annotated with time and space complexity for each operation.

Pandas Data Processing Flow

sequenceDiagram
    participant User
    participant Profile
    participant Adapter as PandasAdapter
    participant Accumulator as NumericAccumulator
    participant KMV
    participant Reservoir as ReservoirSampler
    participant Extreme as ExtremeTracker
    participant MG as MisraGries

    User->>Profile: profile(data, config)
    Note over Profile: O(1) - Configuration setup

    Profile->>Adapter: process_chunk(chunk)
    Note over Adapter: O(n) - Iterate through rows

    loop For each numeric column
        Adapter->>Accumulator: update(values)
        Note over Accumulator: O(n) - Process n values

        Accumulator->>KMV: add(value)
        Note over KMV: O(log k) - Binary search insert<br/>Space: O(k) bounded

        Accumulator->>Reservoir: add(value)
        Note over Reservoir: O(1) - Constant time<br/>Space: O(s) bounded

        Accumulator->>Extreme: update(values, indices)
        Note over Extreme: O(n log k) - Process n values<br/>Space: O(k) bounded

        Accumulator->>MG: add(value)
        Note over MG: O(1) - Constant time<br/>Space: O(k) bounded
    end

    Accumulator->>Profile: finalize()
    Note over Accumulator: O(k log k) - Extract results<br/>Space: O(k) bounded

    Profile->>User: Report
    Note over Profile: O(1) - Return results

Polars Data Processing Flow

sequenceDiagram
    participant User
    participant Profile
    participant Adapter as PolarsAdapter
    participant Accumulator as NumericAccumulator
    participant KMV
    participant Reservoir as ReservoirSampler
    participant Extreme as ExtremeTracker
    participant MG as MisraGries

    User->>Profile: profile(data, config)
    Note over Profile: O(1) - Configuration setup

    Profile->>Adapter: process_chunk(chunk)
    Note over Adapter: O(n) - Iterate through rows

    loop For each numeric column
        Adapter->>Accumulator: update(values)
        Note over Accumulator: O(n) - Process n values

        Accumulator->>KMV: add(value)
        Note over KMV: O(log k) - Binary search insert<br/>Space: O(k) bounded

        Accumulator->>Reservoir: add(value)
        Note over Reservoir: O(1) - Constant time<br/>Space: O(s) bounded

        Accumulator->>Extreme: update(values, indices)
        Note over Extreme: O(n log k) - Process n values<br/>Space: O(k) bounded

        Accumulator->>MG: add(value)
        Note over MG: O(1) - Constant time<br/>Space: O(k) bounded
    end

    Accumulator->>Profile: finalize()
    Note over Accumulator: O(k log k) - Extract results<br/>Space: O(k) bounded

    Profile->>User: Report
    Note over Profile: O(1) - Return results

KMV Sketch Memory Optimization

sequenceDiagram
    participant KMV
    participant ExactCounter as _exact_counter
    participant Values as _values
    participant Hash as _u64

    Note over KMV: Before Fix: O(n) memory growth<br/>After Fix: O(k) bounded memory

    KMV->>ExactCounter: add(value)
    Note over ExactCounter: O(1) - Dict lookup<br/>Space: O(min(n, max_exact_tracking))

    alt Exact mode (count < max_exact_tracking)
        ExactCounter->>ExactCounter: increment counter
        Note over ExactCounter: O(1) - Dict update
    else Transition to approximation mode
        ExactCounter->>Values: convert to hashes
        Note over Values: O(k) - Convert exact values<br/>Space: O(k) bounded

        ExactCounter->>ExactCounter: clear()
        Note over ExactCounter: O(1) - Free memory

        Values->>Values: sort()
        Note over Values: O(k log k) - Sort hashes
    end

    alt Approximation mode
        KMV->>Hash: _u64(value)
        Note over Hash: O(1) - Hash computation

        Hash->>Values: insert if smaller
        Note over Values: O(log k) - Binary search insert<br/>Space: O(k) bounded
    end

    KMV->>Values: estimate()
    Note over Values: O(1) - Direct calculation<br/>Space: O(k) bounded

ExtremeTracker Memory Optimization

sequenceDiagram
    participant Extreme as ExtremeTracker
    participant MinHeap as _min_heap
    participant MaxHeap as _max_heap
    participant Heapq

    Note over Extreme: Before Fix: O(k × chunks) temporary growth<br/>After Fix: O(k) constant space

    Extreme->>Extreme: update(values, indices)
    Note over Extreme: O(n log k) - Process n values

    loop For each value
        Extreme->>MinHeap: _add_to_min_heap(index, value)
        Note over MinHeap: O(log k) - Heap insert<br/>Space: O(k) bounded

        Extreme->>MaxHeap: _add_to_max_heap(index, value)
        Note over MaxHeap: O(log k) - Heap insert<br/>Space: O(k) bounded
    end

    alt Min heap not full
        MinHeap->>Heapq: heappush(value, index)
        Note over Heapq: O(log k) - Heap insert
    else Min heap full
        MinHeap->>MinHeap: find largest value
        Note over MinHeap: O(k) - Linear search

        alt New value smaller
            MinHeap->>MinHeap: replace largest
            Note over MinHeap: O(k) - Find and replace
            MinHeap->>Heapq: heapify()
            Note over Heapq: O(k) - Restore heap property
        end
    end

    alt Max heap not full
        MaxHeap->>Heapq: heappush(-value, index)
        Note over Heapq: O(log k) - Heap insert (negated)
    else Max heap full
        MaxHeap->>MaxHeap: find smallest original value
        Note over MaxHeap: O(k) - Linear search

        alt New value larger
            MaxHeap->>MaxHeap: replace smallest
            Note over MaxHeap: O(k) - Find and replace
            MaxHeap->>Heapq: heapify()
            Note over Heapq: O(k) - Restore heap property
        end
    end

    Extreme->>Extreme: get_extremes()
    Note over Extreme: O(k log k) - Extract and sort<br/>Space: O(k) bounded

Chunk Metadata Optimization

sequenceDiagram
    participant Accumulator as NumericAccumulator
    participant Config as NumericConfig
    participant Boundaries as _chunk_boundaries
    participant Missing as _chunk_missing
    participant Counter as _chunk_count

    Note over Accumulator: Before Fix: O(num_chunks) unbounded growth<br/>After Fix: O(min(num_chunks, max_chunks)) bounded

    Accumulator->>Config: check enable_chunk_metadata
    Note over Config: O(1) - Configuration check

    alt Chunk metadata enabled
        Accumulator->>Counter: check _chunk_count < max_chunks
        Note over Counter: O(1) - Counter check

        alt Under limit
            Accumulator->>Boundaries: append(cumulative_rows)
            Note over Boundaries: O(1) - List append<br/>Space: O(chunk_count)

            Accumulator->>Missing: append(missing_count)
            Note over Missing: O(1) - List append<br/>Space: O(chunk_count)

            Accumulator->>Counter: increment()
            Note over Counter: O(1) - Counter increment
        else Over limit
            Accumulator->>Config: disable chunk metadata
            Note over Config: O(1) - Switch to summary mode

            Accumulator->>Boundaries: stop tracking
            Note over Boundaries: O(1) - Stop appending<br/>Space: O(max_chunks) bounded
        end
    else Chunk metadata disabled
        Accumulator->>Accumulator: skip tracking
        Note over Accumulator: O(1) - No memory usage<br/>Space: O(1) constant
    end

    Accumulator->>Accumulator: finalize()
    Note over Accumulator: O(chunk_count) - Process metadata<br/>Space: O(min(chunk_count, max_chunks)) bounded

Memory Monitoring Integration

sequenceDiagram
    participant User
    participant Tracemalloc
    participant Psutil
    participant Process
    participant Profile
    participant Accumulator

    User->>Tracemalloc: start()
    Note over Tracemalloc: O(1) - Start memory tracking

    User->>Psutil: Process(os.getpid())
    Note over Psutil: O(1) - Get process handle

    User->>Process: memory_info().rss
    Note over Process: O(1) - Get initial memory

    User->>Profile: profile(data, config)
    Note over Profile: O(n) - Process data

    loop During processing
        Profile->>Accumulator: update(chunk)
        Note over Accumulator: O(n) - Process chunk

        User->>Process: memory_info().rss
        Note over Process: O(1) - Monitor memory

        User->>Tracemalloc: get_traced_memory()
        Note over Tracemalloc: O(1) - Get traced memory
    end

    Profile->>User: Report
    Note over Profile: O(1) - Return results

    User->>Process: memory_info().rss
    Note over Process: O(1) - Get final memory

    User->>Tracemalloc: get_traced_memory()
    Note over Tracemalloc: O(1) - Get peak memory

    User->>Tracemalloc: stop()
    Note over Tracemalloc: O(1) - Stop tracking

Complexity Summary

Time Complexity

  • Per Element: O(1) for basic operations, O(log k) for heap operations
  • Per Chunk: O(n) where n is chunk size
  • Total: O(N) where N is total dataset size

Space Complexity

  • KMV: O(k) bounded (was O(n) unbounded)
  • ExtremeTracker: O(k) bounded (was O(k × chunks) temporary)
  • Chunk Metadata: O(min(num_chunks, max_chunks)) bounded (was O(num_chunks) unbounded)
  • Total: O(k + s + c) where k=sketch_size, s=sample_size, c=max_chunks

Memory Efficiency

  • Before Fixes: O(n) growth for low-cardinality columns
  • After Fixes: O(1) constant growth
  • Memory per Row: <1KB (typically <0.1KB)
  • Peak Memory: <200MB for 1M rows

The memory leak fixes successfully transform PySuricata from a memory-intensive system to a truly streaming system with bounded memory usage.