Skip to main content

laminar_core/cluster/control/
query.rs

1//! In-engine distributed query (pull path): a coordinator fans `RemoteScan` out
2//! to peers owning part of a table/MV and unions their Arrow batches.
3
4use std::pin::Pin;
5use std::sync::Arc;
6
7use arrow::record_batch::RecordBatch;
8use futures::Stream;
9
10use super::barrier::BARRIER_ADDR_KEY;
11use super::ClusterKv;
12use crate::cluster::discovery::NodeId;
13use crate::serialization::{BatchStreamDecoder, BatchStreamEncoder};
14
15#[allow(
16    clippy::doc_markdown,
17    clippy::default_trait_access,
18    clippy::missing_const_for_fn,
19    clippy::must_use_candidate,
20    clippy::too_many_lines,
21    missing_docs
22)]
23pub(crate) mod query_v1 {
24    tonic::include_proto!("laminar.query.v1");
25}
26
27use query_v1::query_service_client::QueryServiceClient;
28use query_v1::query_service_server::QueryService;
29use query_v1::{RemoteScanRequest, RemoteScanResponse};
30
31/// Rows per streamed `RemoteScan` chunk; bounds any single gRPC message.
32const REMOTE_SCAN_CHUNK_ROWS: usize = 8192;
33
34/// TCP connect timeout for a peer's lazy channel.
35const REMOTE_SCAN_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
36
37/// Idle (per-chunk) timeout for the first response and each chunk — not a
38/// total-call deadline, so large results still stream but a stalled peer fails.
39const REMOTE_SCAN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
40
41/// Attempts to *establish* a peer stream before giving up; transient connect
42/// failures evict the channel and re-resolve the peer between tries.
43const REMOTE_SCAN_MAX_ATTEMPTS: u32 = 3;
44/// Fixed delay between establishment attempts.
45const REMOTE_SCAN_RETRY_BACKOFF: std::time::Duration = std::time::Duration::from_millis(100);
46
47/// Handler slot read per request, so registration and server start are
48/// order-independent.
49pub type QueryHandlerSlot = Arc<parking_lot::RwLock<Option<Arc<dyn RemoteQueryHandler>>>>;
50
51/// Pooled per-peer channels for `RemoteScan`, reused across queries and evicted
52/// on RPC failure so a restarted peer is re-resolved.
53pub type QueryClientPool =
54    Arc<parking_lot::Mutex<rustc_hash::FxHashMap<NodeId, tonic::transport::Channel>>>;
55
56/// Node-local provider of table / materialized-view snapshots for remote scans.
57#[async_trait::async_trait]
58pub trait RemoteQueryHandler: Send + Sync + 'static {
59    /// This node's locally-held rows for `table_name`, optionally projected and
60    /// optionally filtered by `filter_sql` before serialization.
61    ///
62    /// # Errors
63    /// Unknown table or invalid projection. A `filter_sql` that fails to compile
64    /// is skipped (the coordinator re-applies it), not an error.
65    async fn remote_scan(
66        &self,
67        table_name: &str,
68        projection: Option<Vec<usize>>,
69        filter_sql: Option<String>,
70    ) -> Result<RecordBatch, String>;
71}
72
73/// Tonic `QueryService` implementation delegating to a [`RemoteQueryHandler`].
74pub(crate) struct QueryServiceImpl {
75    handler: QueryHandlerSlot,
76}
77
78impl QueryServiceImpl {
79    pub(crate) fn new(handler: QueryHandlerSlot) -> Self {
80        Self { handler }
81    }
82}
83
84type RemoteScanStream =
85    Pin<Box<dyn Stream<Item = Result<RemoteScanResponse, tonic::Status>> + Send>>;
86
87#[tonic::async_trait]
88impl QueryService for QueryServiceImpl {
89    type RemoteScanStream = RemoteScanStream;
90
91    async fn remote_scan(
92        &self,
93        request: tonic::Request<RemoteScanRequest>,
94    ) -> Result<tonic::Response<Self::RemoteScanStream>, tonic::Status> {
95        let handler = self
96            .handler
97            .read()
98            .clone()
99            .ok_or_else(|| tonic::Status::unavailable("no query handler registered"))?;
100
101        let req = request.into_inner();
102        let projection = (!req.projection.is_empty())
103            .then(|| req.projection.iter().map(|&p| p as usize).collect());
104        let filter_sql = (!req.filter_sql.is_empty()).then(|| req.filter_sql.clone());
105
106        let batch = handler
107            .remote_scan(&req.table_name, projection, filter_sql)
108            .await
109            .map_err(tonic::Status::internal)?;
110
111        // Stream the slice in row-bounded chunks as one Arrow IPC stream (schema
112        // in the first chunk only); a zero-row slice is one valid empty chunk.
113        let total = batch.num_rows();
114        let ranges: Vec<(usize, usize)> = if total == 0 {
115            vec![(0, 0)]
116        } else {
117            (0..total)
118                .step_by(REMOTE_SCAN_CHUNK_ROWS)
119                .map(|off| (off, REMOTE_SCAN_CHUNK_ROWS.min(total - off)))
120                .collect()
121        };
122        let encoder = BatchStreamEncoder::new(&batch.schema())
123            .map_err(|e| tonic::Status::internal(format!("arrow ipc schema encode: {e}")))?;
124        let stream = futures::stream::unfold(
125            (encoder, batch, ranges.into_iter().peekable()),
126            |(mut encoder, batch, mut ranges)| async move {
127                let (off, len) = ranges.next()?;
128                // IIFE so the `&mut encoder`/`&mut ranges` borrows end before the
129                // state tuple is moved back out below.
130                let produced = (|| {
131                    let mut bytes = encoder.encode(&batch.slice(off, len))?;
132                    // The end-of-stream marker rides the final chunk.
133                    if ranges.peek().is_none() {
134                        bytes.extend_from_slice(&encoder.finish()?);
135                    }
136                    Ok::<_, arrow_schema::ArrowError>(bytes)
137                })();
138                let item = produced
139                    .map(|arrow_ipc| RemoteScanResponse { arrow_ipc })
140                    .map_err(|e| tonic::Status::internal(format!("arrow ipc encode: {e}")));
141                Some((item, (encoder, batch, ranges)))
142            },
143        );
144        Ok(tonic::Response::new(Box::pin(stream)))
145    }
146}
147
148/// `QueryService` server reading the registered handler out of `slot` per request.
149pub(crate) fn query_service_server(
150    slot: QueryHandlerSlot,
151) -> query_v1::query_service_server::QueryServiceServer<QueryServiceImpl> {
152    query_v1::query_service_server::QueryServiceServer::new(QueryServiceImpl::new(slot))
153}
154
155/// Pooled channel to `peer`, connecting lazily. `Ok(None)` = no published
156/// address (down / not started / pruned) → the caller skips the peer.
157async fn connect(
158    pool: &QueryClientPool,
159    kv: &Arc<dyn ClusterKv>,
160    peer: NodeId,
161) -> Result<Option<tonic::transport::Channel>, String> {
162    if let Some(chan) = pool.lock().get(&peer).cloned() {
163        return Ok(Some(chan));
164    }
165    let Some(addr) = kv.read_from(peer, BARRIER_ADDR_KEY).await else {
166        return Ok(None);
167    };
168    let channel = super::tls::client_endpoint(&addr)?
169        .connect_timeout(REMOTE_SCAN_CONNECT_TIMEOUT)
170        .connect_lazy();
171    Ok(Some(pool.lock().entry(peer).or_insert(channel).clone()))
172}
173
174/// Lazily-streamed Arrow [`RecordBatch`] chunks from a peer's `RemoteScan`; a
175/// decode or mid-stream transport error surfaces as an `Err` item.
176pub type RemoteBatchStream = Pin<Box<dyn Stream<Item = Result<RecordBatch, String>> + Send>>;
177
178/// Scan `table_name` on `peer` as a lazy Arrow-IPC stream. `Ok(None)` means the
179/// peer is unreachable (no published address) and should be skipped, not failed.
180///
181/// # Errors
182/// Transport/timeout while establishing; post-stream failures surface as `Err` items.
183pub async fn remote_scan_client(
184    pool: &QueryClientPool,
185    kv: &Arc<dyn ClusterKv>,
186    peer: NodeId,
187    table_name: &str,
188    projection: Option<Vec<usize>>,
189    filter_sql: Option<String>,
190) -> Result<Option<RemoteBatchStream>, String> {
191    let projection = projection
192        .unwrap_or_default()
193        .into_iter()
194        .map(|p| u32::try_from(p).map_err(|_| format!("projection index {p} out of range")))
195        .collect::<Result<Vec<u32>, String>>()?;
196
197    let request = RemoteScanRequest {
198        table_name: table_name.to_string(),
199        projection,
200        filter_sql: filter_sql.unwrap_or_default(),
201    };
202
203    // Establish the stream, retrying transient connect failures. Each retry
204    // evicts the pooled channel so `connect` re-resolves a possibly-restarted
205    // peer. The idle timeout bounds time-to-first-response, not the whole call,
206    // so a large multi-chunk result is free to take its time once streaming.
207    let mut attempt = 0u32;
208    let stream = loop {
209        attempt += 1;
210        let Some(channel) = connect(pool, kv, peer).await? else {
211            return Ok(None); // peer has no address (down/pruned) — skip it
212        };
213        let mut client = QueryServiceClient::new(channel);
214        match tokio::time::timeout(
215            REMOTE_SCAN_IDLE_TIMEOUT,
216            client.remote_scan(request.clone()),
217        )
218        .await
219        {
220            Ok(Ok(resp)) => break resp.into_inner(),
221            Ok(Err(status)) => {
222                pool.lock().remove(&peer);
223                // Only `Unavailable` (e.g. a restarting peer) is worth retrying.
224                if status.code() == tonic::Code::Unavailable && attempt < REMOTE_SCAN_MAX_ATTEMPTS {
225                    tokio::time::sleep(REMOTE_SCAN_RETRY_BACKOFF).await;
226                    continue;
227                }
228                return Err(format!("remote_scan to peer {} failed: {status}", peer.0));
229            }
230            Err(_) => {
231                pool.lock().remove(&peer);
232                if attempt < REMOTE_SCAN_MAX_ATTEMPTS {
233                    tokio::time::sleep(REMOTE_SCAN_RETRY_BACKOFF).await;
234                    continue;
235                }
236                return Err(format!(
237                    "remote_scan to peer {} timed out opening stream",
238                    peer.0
239                ));
240            }
241        }
242    };
243
244    // One decoder for the whole response (a chunk may complete 0+ batches, so
245    // buffer the surplus). Idle timeout bounds the inter-chunk gap, not total time.
246    let pool = Arc::clone(pool);
247    let decoder = BatchStreamDecoder::new();
248    let out = futures::stream::unfold(
249        Some((
250            stream,
251            decoder,
252            std::collections::VecDeque::<RecordBatch>::new(),
253        )),
254        move |state| {
255            let pool = Arc::clone(&pool);
256            async move {
257                let (mut stream, mut decoder, mut pending) = state?;
258                loop {
259                    if let Some(batch) = pending.pop_front() {
260                        return Some((Ok(batch), Some((stream, decoder, pending))));
261                    }
262                    match tokio::time::timeout(REMOTE_SCAN_IDLE_TIMEOUT, stream.message()).await {
263                        Ok(Ok(Some(resp))) => match decoder.decode_chunk(resp.arrow_ipc) {
264                            Ok(batches) => pending.extend(batches),
265                            Err(e) => return Some((Err(e.to_string()), None)),
266                        },
267                        Ok(Ok(None)) => return None,
268                        Ok(Err(status)) => {
269                            pool.lock().remove(&peer);
270                            return Some((Err(status.to_string()), None));
271                        }
272                        Err(_) => {
273                            pool.lock().remove(&peer);
274                            return Some((
275                                Err(format!("remote_scan to peer {} stalled mid-stream", peer.0)),
276                                None,
277                            ));
278                        }
279                    }
280                }
281            }
282        },
283    );
284    Ok(Some(Box::pin(out)))
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::cluster::control::barrier::InMemoryKv;
291    use arrow::array::Int32Array;
292    use arrow_schema::{DataType, Field, Schema};
293    use futures::StreamExt;
294
295    struct StaticHandler(RecordBatch);
296
297    #[async_trait::async_trait]
298    impl RemoteQueryHandler for StaticHandler {
299        async fn remote_scan(
300            &self,
301            _table: &str,
302            projection: Option<Vec<usize>>,
303            _filter_sql: Option<String>,
304        ) -> Result<RecordBatch, String> {
305            match projection {
306                Some(p) => self.0.project(&p).map_err(|e| e.to_string()),
307                None => Ok(self.0.clone()),
308            }
309        }
310    }
311
312    /// Records the projection + `filter_sql` it was called with, so a test can
313    /// assert the request carried them over the wire.
314    type SeenArgs = Arc<parking_lot::Mutex<Option<(Option<Vec<usize>>, Option<String>)>>>;
315    struct RecordingHandler {
316        batch: RecordBatch,
317        seen: SeenArgs,
318    }
319
320    #[async_trait::async_trait]
321    impl RemoteQueryHandler for RecordingHandler {
322        async fn remote_scan(
323            &self,
324            _table: &str,
325            projection: Option<Vec<usize>>,
326            filter_sql: Option<String>,
327        ) -> Result<RecordBatch, String> {
328            *self.seen.lock() = Some((projection.clone(), filter_sql));
329            match projection {
330                Some(p) => self.batch.project(&p).map_err(|e| e.to_string()),
331                None => Ok(self.batch.clone()),
332            }
333        }
334    }
335
336    /// Serve `handler` on an ephemeral port, returning a kv that resolves `peer`
337    /// to it plus an empty client pool.
338    async fn serve_handler(
339        peer: NodeId,
340        handler: Arc<dyn RemoteQueryHandler>,
341    ) -> (QueryClientPool, Arc<dyn ClusterKv>) {
342        let slot: QueryHandlerSlot = Arc::new(parking_lot::RwLock::new(Some(handler)));
343        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
344        let addr = listener.local_addr().unwrap();
345        tokio::spawn(async move {
346            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
347            let mut builder = tonic::transport::Server::builder();
348            // Mirror production: apply control-plane TLS when installed.
349            if let Some(tls) = crate::cluster::control::tls::server_tls() {
350                builder = builder.tls_config(tls.clone()).unwrap();
351            }
352            let _ = builder
353                .add_service(query_service_server(slot))
354                .serve_with_incoming(incoming)
355                .await;
356        });
357        let kv = InMemoryKv::new(NodeId(0));
358        kv.seed(peer, BARRIER_ADDR_KEY, addr.to_string());
359        let pool: QueryClientPool =
360            Arc::new(parking_lot::Mutex::new(rustc_hash::FxHashMap::default()));
361        (pool, Arc::new(kv))
362    }
363
364    async fn serve(peer: NodeId, batch: RecordBatch) -> (QueryClientPool, Arc<dyn ClusterKv>) {
365        serve_handler(peer, Arc::new(StaticHandler(batch))).await
366    }
367
368    fn int_batch(values: Vec<i32>) -> RecordBatch {
369        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
370        RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))]).unwrap()
371    }
372
373    /// Collect every chunk of a remote scan stream, unwrapping any `Err` item.
374    async fn collect_chunks(stream: RemoteBatchStream) -> Vec<RecordBatch> {
375        stream
376            .map(|item| item.expect("remote scan chunk decode failed"))
377            .collect::<Vec<_>>()
378            .await
379    }
380
381    // A result larger than one chunk arrives as multiple batches that reassemble
382    // in stream order on the caller.
383    #[tokio::test]
384    async fn remote_scan_reassembles_chunks_in_order() {
385        let count = i32::try_from(REMOTE_SCAN_CHUNK_ROWS).unwrap() + 100;
386        let values: Vec<i32> = (0..count).collect();
387        let peer = NodeId(7);
388        let (pool, kv) = serve(peer, int_batch(values.clone())).await;
389
390        let stream = remote_scan_client(&pool, &kv, peer, "mv", None, None)
391            .await
392            .unwrap()
393            .expect("peer resolvable");
394        let batches = collect_chunks(stream).await;
395        assert!(batches.len() > 1);
396        let got = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
397        let col = got.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
398        assert_eq!(col.values(), values.as_slice());
399    }
400
401    // An empty local slice is a valid (schema-only) result, not a query failure.
402    #[tokio::test]
403    async fn remote_scan_empty_slice_is_not_an_error() {
404        let peer = NodeId(7);
405        let (pool, kv) = serve(peer, int_batch(vec![])).await;
406
407        let stream = remote_scan_client(&pool, &kv, peer, "mv", None, None)
408            .await
409            .unwrap()
410            .expect("peer resolvable");
411        let batches = collect_chunks(stream).await;
412        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
413        assert_eq!(total, 0);
414    }
415
416    // Projection and filter_sql reach the serving handler over the wire, and the
417    // projected result streams back.
418    #[tokio::test]
419    async fn remote_scan_forwards_projection_and_filter() {
420        let seen: SeenArgs = Arc::new(parking_lot::Mutex::new(None));
421        let handler = Arc::new(RecordingHandler {
422            batch: int_batch(vec![10, 20, 30]),
423            seen: Arc::clone(&seen),
424        });
425        let peer = NodeId(7);
426        let (pool, kv) = serve_handler(peer, handler).await;
427
428        let stream = remote_scan_client(
429            &pool,
430            &kv,
431            peer,
432            "mv",
433            Some(vec![0]),
434            Some("(\"n\" > 1)".into()),
435        )
436        .await
437        .unwrap()
438        .expect("peer resolvable");
439        let batches = collect_chunks(stream).await;
440        let got = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
441        let col = got.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
442        assert_eq!(col.values(), &[10, 20, 30]);
443
444        let (proj, filter) = seen.lock().clone().expect("handler was called");
445        assert_eq!(proj, Some(vec![0]));
446        assert_eq!(filter.as_deref(), Some("(\"n\" > 1)"));
447    }
448
449    // End-to-end mTLS: a node serves and dials with a cert chained to the shared
450    // CA, so a successful scan proves both directions verified (the server
451    // requires a client cert; the client verifies the server cert + SAN).
452    //
453    // Ignored because it installs the process-global cluster TLS, which would
454    // make the plaintext tests above use TLS. Run it alone:
455    //   cargo test -p laminar-core --features cluster -- --ignored
456    #[tokio::test]
457    #[ignore = "installs process-global cluster TLS; run with --ignored"]
458    async fn remote_scan_over_mtls() {
459        const SAN: &str = "laminar-cluster";
460
461        // Mint a throwaway CA and one node cert (both server- and client-auth,
462        // SAN = the name the client verifies).
463        let mut ca_params = rcgen::CertificateParams::new(vec!["laminar-test-ca".into()]).unwrap();
464        ca_params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
465        let ca_key = rcgen::KeyPair::generate().unwrap();
466        let ca_cert = ca_params.self_signed(&ca_key).unwrap();
467
468        let mut leaf = rcgen::CertificateParams::new(vec![SAN.into()]).unwrap();
469        leaf.extended_key_usages = vec![
470            rcgen::ExtendedKeyUsagePurpose::ServerAuth,
471            rcgen::ExtendedKeyUsagePurpose::ClientAuth,
472        ];
473        let leaf_key = rcgen::KeyPair::generate().unwrap();
474        let leaf_cert = leaf.signed_by(&leaf_key, &ca_cert, &ca_key).unwrap();
475
476        crate::cluster::control::set_cluster_tls(crate::cluster::control::ClusterTls::from_pem(
477            leaf_cert.pem().as_bytes(),
478            leaf_key.serialize_pem().as_bytes(),
479            ca_cert.pem().as_bytes(),
480            SAN,
481        ));
482
483        let peer = NodeId(7);
484        let (pool, kv) =
485            serve_handler(peer, Arc::new(StaticHandler(int_batch(vec![1, 2, 3])))).await;
486        let stream = remote_scan_client(&pool, &kv, peer, "mv", None, None)
487            .await
488            .unwrap()
489            .expect("peer resolvable");
490        let batches = collect_chunks(stream).await;
491        let got = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
492        let col = got.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
493        assert_eq!(col.values(), &[1, 2, 3]);
494    }
495}