Skip to main content

Module stream_join

Module stream_join 

Source
Expand description

§Stream-Stream Join Operators

Implementation of time-bounded joins between two event streams.

Stream-stream joins match events from two streams based on a join key and a time bound. Events are matched if they share a key and their timestamps fall within the specified time window.

§Join Types

  • Inner: Only emit matched pairs
  • Left: Emit all left events, with right match if exists
  • Right: Emit all right events, with left match if exists
  • Full: Emit all events, with matches where they exist
  • Left Semi: Emit left rows that have at least one match (left columns only)
  • Left Anti: Emit left rows with no match within the time bound (left columns only)
  • Right Semi/Anti: Mirror of left variants

§Example

use laminar_core::operator::stream_join::{
    StreamJoinOperator, JoinType, JoinSide, StreamJoinConfig, JoinRowEncoding,
};
use std::time::Duration;

// Basic join (backward compatible)
let operator = StreamJoinOperator::new(
    "order_id".to_string(),  // left key column
    "order_id".to_string(),  // right key column
    Duration::from_secs(3600), // 1 hour time bound
    JoinType::Inner,
);

// Optimized join with CPU-friendly encoding
let config = StreamJoinConfig::builder()
    .left_key_column("order_id")
    .right_key_column("order_id")
    .time_bound(Duration::from_secs(3600))
    .join_type(JoinType::Inner)
    .row_encoding(JoinRowEncoding::CpuFriendly)  // 30-50% faster for memory-resident state
    .asymmetric_compaction(true)                  // Skip compaction on finished sides
    .per_key_tracking(true)                       // Aggressive cleanup for sparse keys
    .build()
    .unwrap();
let optimized_operator = StreamJoinOperator::from_config(config);

§SQL Syntax

SELECT o.*, p.status
FROM orders o
JOIN payments p
    ON o.order_id = p.order_id
    AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR;

-- Session variables for optimization
SET streaming_join_row_encoding = 'cpu_friendly';
SET streaming_join_asymmetric_compaction = true;

§State Management

Events are stored in state with keys formatted as:

  • sjl:<key_hash>:<timestamp>:<event_id> for left events
  • sjr:<key_hash>:<timestamp>:<event_id> for right events

State is automatically cleaned up when watermark passes event_timestamp + time_bound.

§Optimizations

  • CPU-Friendly Encoding: Inlines primitive values for faster access (30-50% improvement)
  • Asymmetric Compaction: Skips compaction on finished/idle sides
  • Per-Key Tracking: Aggressive cleanup for sparse key patterns
  • Build-Side Pruning: Early pruning based on probe-side watermark

Structs§

ArchivedJoinRow
An archived JoinRow
JoinMetrics
Metrics for tracking join operations.
JoinRow
A stored join row containing serialized event data.
JoinRowResolver
The resolver for an archived JoinRow
KeyMetadata
Per-key metadata for cleanup tracking.
SideStats
Per-side statistics for asymmetric optimization.
StreamJoinConfig
Configuration for stream-stream joins.
StreamJoinConfigBuilder
Builder for StreamJoinConfig.
StreamJoinOperator
Stream-stream join operator.

Enums§

JoinRowEncoding
Row encoding strategy for join state.
JoinSide
Identifies which side of the join an event came from.
JoinType
Join type for stream-stream joins.