Expand description
§Stream-Stream Join Operators
Implementation of time-bounded joins between two event streams.
Stream-stream joins match events from two streams based on a join key and a time bound. Events are matched if they share a key and their timestamps fall within the specified time window.
§Join Types
- Inner: Only emit matched pairs
- Left: Emit all left events, with right match if exists
- Right: Emit all right events, with left match if exists
- Full: Emit all events, with matches where they exist
- Left Semi: Emit left rows that have at least one match (left columns only)
- Left Anti: Emit left rows with no match within the time bound (left columns only)
- Right Semi/Anti: Mirror of left variants
§Example
use laminar_core::operator::stream_join::{
StreamJoinOperator, JoinType, JoinSide, StreamJoinConfig, JoinRowEncoding,
};
use std::time::Duration;
// Basic join (backward compatible)
let operator = StreamJoinOperator::new(
"order_id".to_string(), // left key column
"order_id".to_string(), // right key column
Duration::from_secs(3600), // 1 hour time bound
JoinType::Inner,
);
// Optimized join with CPU-friendly encoding
let config = StreamJoinConfig::builder()
.left_key_column("order_id")
.right_key_column("order_id")
.time_bound(Duration::from_secs(3600))
.join_type(JoinType::Inner)
.row_encoding(JoinRowEncoding::CpuFriendly) // 30-50% faster for memory-resident state
.asymmetric_compaction(true) // Skip compaction on finished sides
.per_key_tracking(true) // Aggressive cleanup for sparse keys
.build()
.unwrap();
let optimized_operator = StreamJoinOperator::from_config(config);§SQL Syntax
SELECT o.*, p.status
FROM orders o
JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR;
-- Session variables for optimization
SET streaming_join_row_encoding = 'cpu_friendly';
SET streaming_join_asymmetric_compaction = true;§State Management
Events are stored in state with keys formatted as:
sjl:<key_hash>:<timestamp>:<event_id>for left eventssjr:<key_hash>:<timestamp>:<event_id>for right events
State is automatically cleaned up when watermark passes
event_timestamp + time_bound.
§Optimizations
- CPU-Friendly Encoding: Inlines primitive values for faster access (30-50% improvement)
- Asymmetric Compaction: Skips compaction on finished/idle sides
- Per-Key Tracking: Aggressive cleanup for sparse key patterns
- Build-Side Pruning: Early pruning based on probe-side watermark
Structs§
- Archived
Join Row - An archived
JoinRow - Join
Metrics - Metrics for tracking join operations.
- JoinRow
- A stored join row containing serialized event data.
- Join
RowResolver - The resolver for an archived
JoinRow - KeyMetadata
- Per-key metadata for cleanup tracking.
- Side
Stats - Per-side statistics for asymmetric optimization.
- Stream
Join Config - Configuration for stream-stream joins.
- Stream
Join Config Builder - Builder for
StreamJoinConfig. - Stream
Join Operator - Stream-stream join operator.
Enums§
- Join
RowEncoding - Row encoding strategy for join state.
- Join
Side - Identifies which side of the join an event came from.
- Join
Type - Join type for stream-stream joins.