Skip to main content

laminar_sql/datafusion/
bridge.rs

1//! Channel-based push-to-pull bridge for `DataFusion` integration
2//!
3//! This module provides the `StreamBridge` which connects `LaminarDB`'s
4//! push-based event processing model (where the Reactor pushes events)
5//! with `DataFusion`'s pull-based query execution (where consumers pull
6//! `RecordBatch` instances).
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌─────────────────┐         ┌──────────────────┐
12//! │  `LaminarDB`      │         │   `DataFusion`     │
13//! │  Reactor (push) │──send──▶│  Query (pull)    │
14//! │                 │         │                  │
15//! └─────────────────┘         └──────────────────┘
16//!         │                           ▲
17//!         │                           │
18//!         └───────── channel ─────────┘
19//! ```
20
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow_array::RecordBatch;
26use arrow_schema::SchemaRef;
27use datafusion::physical_plan::RecordBatchStream;
28use datafusion_common::DataFusionError;
29use futures::Stream;
30use tokio::sync::mpsc;
31
32/// Default channel capacity for the bridge.
33const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
34
35/// A bridge that connects push-based data producers with pull-based consumers.
36///
37/// The bridge creates a channel pair: a sender for pushing `RecordBatch`
38/// instances from the producer side, and a stream for pulling batches
39/// from the consumer side.
40///
41/// # Usage
42///
43/// ```rust,ignore
44/// let schema = Arc::new(Schema::new(vec![...]));
45/// let bridge = StreamBridge::new(schema, 100);
46/// let sender = bridge.sender();
47///
48/// // Producer side (`LaminarDB` Reactor)
49/// sender.send(batch).await?;
50///
51/// // Consumer side (`DataFusion` query)
52/// let stream = bridge.into_stream();
53/// while let Some(batch) = stream.next().await { ... }
54/// ```
55#[derive(Debug)]
56pub struct StreamBridge {
57    /// Schema of the record batches flowing through the bridge
58    schema: SchemaRef,
59    /// Sender side of the channel
60    sender: BridgeSender,
61    /// Receiver side of the channel
62    receiver: Option<mpsc::Receiver<Result<RecordBatch, DataFusionError>>>,
63}
64
65impl StreamBridge {
66    /// Creates a new bridge with the given schema and channel capacity.
67    ///
68    /// # Arguments
69    ///
70    /// * `schema` - Schema of `RecordBatch` instances that will flow through
71    /// * `capacity` - Maximum number of batches that can be buffered
72    #[must_use]
73    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
74        let (tx, rx) = mpsc::channel(capacity);
75        Self {
76            schema,
77            sender: BridgeSender { tx },
78            receiver: Some(rx),
79        }
80    }
81
82    /// Creates a new bridge with default capacity.
83    #[must_use]
84    pub fn with_default_capacity(schema: SchemaRef) -> Self {
85        Self::new(schema, DEFAULT_CHANNEL_CAPACITY)
86    }
87
88    /// Returns the schema for this bridge.
89    #[must_use]
90    pub fn schema(&self) -> SchemaRef {
91        Arc::clone(&self.schema)
92    }
93
94    /// Returns a cloneable sender for pushing batches into the bridge.
95    ///
96    /// Multiple senders can be created by cloning the returned sender.
97    #[must_use]
98    pub fn sender(&self) -> BridgeSender {
99        self.sender.clone()
100    }
101
102    /// Converts this bridge into a `RecordBatchStream` for `DataFusion`.
103    ///
104    /// This consumes the bridge, taking ownership of the receiver.
105    /// After calling this, you can still use senders obtained from `sender()`.
106    ///
107    /// # Panics
108    ///
109    /// Panics if called more than once (the receiver can only be taken once).
110    #[must_use]
111    pub fn into_stream(mut self) -> BridgeStream {
112        BridgeStream {
113            schema: self.schema,
114            receiver: self.receiver.take().expect("receiver already taken"),
115        }
116    }
117
118    /// Creates a stream without consuming the bridge.
119    ///
120    /// This takes ownership of the receiver, so subsequent calls will return `None`.
121    #[must_use]
122    pub fn take_stream(&mut self) -> Option<BridgeStream> {
123        self.receiver.take().map(|receiver| BridgeStream {
124            schema: Arc::clone(&self.schema),
125            receiver,
126        })
127    }
128}
129
130/// A cloneable sender for pushing `RecordBatch` instances into a bridge.
131///
132/// Multiple producers can share senders by cloning this type.
133#[derive(Debug, Clone)]
134pub struct BridgeSender {
135    tx: mpsc::Sender<Result<RecordBatch, DataFusionError>>,
136}
137
138impl BridgeSender {
139    /// Sends a batch to the bridge.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if the receiver has been dropped.
144    pub async fn send(&self, batch: RecordBatch) -> Result<(), BridgeSendError> {
145        self.tx
146            .send(Ok(batch))
147            .await
148            .map_err(|_| BridgeSendError::ReceiverDropped)
149    }
150
151    /// Sends an error to the bridge.
152    ///
153    /// This allows the producer to signal errors to the consumer.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the receiver has been dropped.
158    pub async fn send_error(&self, error: DataFusionError) -> Result<(), BridgeSendError> {
159        self.tx
160            .send(Err(error))
161            .await
162            .map_err(|_| BridgeSendError::ReceiverDropped)
163    }
164
165    /// Attempts to send a batch without waiting.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the channel is full or the receiver is dropped.
170    pub fn try_send(&self, batch: RecordBatch) -> Result<(), BridgeTrySendError> {
171        self.tx.try_send(Ok(batch)).map_err(|e| match e {
172            mpsc::error::TrySendError::Full(_) => BridgeTrySendError::Full,
173            mpsc::error::TrySendError::Closed(_) => BridgeTrySendError::ReceiverDropped,
174        })
175    }
176
177    /// Returns true if the receiver has been dropped.
178    #[must_use]
179    pub fn is_closed(&self) -> bool {
180        self.tx.is_closed()
181    }
182}
183
184/// A stream that pulls `RecordBatch` instances from the bridge.
185///
186/// This implements both `Stream` and `DataFusion`'s `RecordBatchStream`
187/// so it can be used directly in `DataFusion` query execution.
188pub struct BridgeStream {
189    schema: SchemaRef,
190    receiver: mpsc::Receiver<Result<RecordBatch, DataFusionError>>,
191}
192
193impl std::fmt::Debug for BridgeStream {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct("BridgeStream")
196            .field("schema", &self.schema)
197            .finish_non_exhaustive()
198    }
199}
200
201impl Stream for BridgeStream {
202    type Item = Result<RecordBatch, DataFusionError>;
203
204    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205        Pin::new(&mut self.receiver).poll_recv(cx)
206    }
207}
208
209impl RecordBatchStream for BridgeStream {
210    fn schema(&self) -> SchemaRef {
211        Arc::clone(&self.schema)
212    }
213}
214
215/// Error when sending a batch to the bridge.
216#[derive(Debug, thiserror::Error)]
217pub enum BridgeSendError {
218    /// The receiver has been dropped.
219    #[error("bridge receiver has been dropped")]
220    ReceiverDropped,
221}
222
223/// Error when trying to send a batch without blocking.
224#[derive(Debug, thiserror::Error)]
225pub enum BridgeTrySendError {
226    /// The channel is full.
227    #[error("bridge channel is full")]
228    Full,
229    /// The receiver has been dropped.
230    #[error("bridge receiver has been dropped")]
231    ReceiverDropped,
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use arrow_array::Int64Array;
238    use arrow_schema::{DataType, Field, Schema};
239    use futures::StreamExt;
240
241    fn test_schema() -> SchemaRef {
242        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
243    }
244
245    fn test_batch(schema: &SchemaRef, values: Vec<i64>) -> RecordBatch {
246        let array = Arc::new(Int64Array::from(values));
247        RecordBatch::try_new(Arc::clone(schema), vec![array]).unwrap()
248    }
249
250    #[tokio::test]
251    async fn test_bridge_send_receive() {
252        let schema = test_schema();
253        let bridge = StreamBridge::new(Arc::clone(&schema), 10);
254        let sender = bridge.sender();
255        let mut stream = bridge.into_stream();
256
257        // Send a batch
258        let batch = test_batch(&schema, vec![1, 2, 3]);
259        sender.send(batch.clone()).await.unwrap();
260        drop(sender); // Close the channel
261
262        // Receive the batch
263        let received = stream.next().await.unwrap().unwrap();
264        assert_eq!(received.num_rows(), 3);
265
266        // Stream should end
267        assert!(stream.next().await.is_none());
268    }
269
270    #[tokio::test]
271    async fn test_bridge_multiple_batches() {
272        let schema = test_schema();
273        let bridge = StreamBridge::new(Arc::clone(&schema), 10);
274        let sender = bridge.sender();
275        let mut stream = bridge.into_stream();
276
277        // Send multiple batches
278        for i in 0..5 {
279            let batch = test_batch(&schema, vec![i64::from(i)]);
280            sender.send(batch).await.unwrap();
281        }
282        drop(sender);
283
284        // Receive all batches
285        let mut count = 0;
286        while let Some(result) = stream.next().await {
287            result.unwrap();
288            count += 1;
289        }
290        assert_eq!(count, 5);
291    }
292
293    #[tokio::test]
294    async fn test_bridge_sender_clone() {
295        let schema = test_schema();
296        let bridge = StreamBridge::new(Arc::clone(&schema), 10);
297        let sender1 = bridge.sender();
298        let sender2 = sender1.clone();
299        let mut stream = bridge.into_stream();
300
301        // Send from both senders
302        sender1.send(test_batch(&schema, vec![1])).await.unwrap();
303        sender2.send(test_batch(&schema, vec![2])).await.unwrap();
304        drop(sender1);
305        drop(sender2);
306
307        let mut count = 0;
308        while let Some(result) = stream.next().await {
309            result.unwrap();
310            count += 1;
311        }
312        assert_eq!(count, 2);
313    }
314
315    #[tokio::test]
316    async fn test_bridge_send_error() {
317        let schema = test_schema();
318        let bridge = StreamBridge::new(Arc::clone(&schema), 10);
319        let sender = bridge.sender();
320        let mut stream = bridge.into_stream();
321
322        // Send an error
323        sender
324            .send_error(DataFusionError::Plan("test error".to_string()))
325            .await
326            .unwrap();
327        drop(sender);
328
329        let result = stream.next().await.unwrap();
330        assert!(result.is_err());
331    }
332
333    #[tokio::test]
334    async fn test_bridge_try_send() {
335        let schema = test_schema();
336        let bridge = StreamBridge::new(Arc::clone(&schema), 2);
337        let sender = bridge.sender();
338        // Keep the stream alive to prevent channel close
339        let _stream = bridge.into_stream();
340
341        // Fill the channel
342        sender.try_send(test_batch(&schema, vec![1])).unwrap();
343        sender.try_send(test_batch(&schema, vec![2])).unwrap();
344
345        // Should fail when full
346        let result = sender.try_send(test_batch(&schema, vec![3]));
347        assert!(matches!(result, Err(BridgeTrySendError::Full)));
348    }
349
350    #[tokio::test]
351    async fn test_bridge_receiver_dropped() {
352        let schema = test_schema();
353        let bridge = StreamBridge::new(Arc::clone(&schema), 10);
354        let sender = bridge.sender();
355        let stream = bridge.into_stream();
356        drop(stream);
357
358        // Should detect closed channel
359        assert!(sender.is_closed());
360
361        let result = sender.send(test_batch(&schema, vec![1])).await;
362        assert!(matches!(result, Err(BridgeSendError::ReceiverDropped)));
363    }
364
365    #[test]
366    fn test_bridge_stream_schema() {
367        let schema = test_schema();
368        let bridge = StreamBridge::new(Arc::clone(&schema), 10);
369        let stream = bridge.into_stream();
370
371        assert_eq!(RecordBatchStream::schema(&stream), schema);
372    }
373}