1use std::pin::Pin;
5use std::sync::Arc;
6
7use arrow::record_batch::RecordBatch;
8use futures::Stream;
9
10use super::barrier::BARRIER_ADDR_KEY;
11use super::ClusterKv;
12use crate::cluster::discovery::NodeId;
13use crate::serialization::{BatchStreamDecoder, BatchStreamEncoder};
14
15#[allow(
16 clippy::doc_markdown,
17 clippy::default_trait_access,
18 clippy::missing_const_for_fn,
19 clippy::must_use_candidate,
20 clippy::too_many_lines,
21 missing_docs
22)]
23pub(crate) mod query_v1 {
24 tonic::include_proto!("laminar.query.v1");
25}
26
27use query_v1::query_service_client::QueryServiceClient;
28use query_v1::query_service_server::QueryService;
29use query_v1::{RemoteScanRequest, RemoteScanResponse};
30
31const REMOTE_SCAN_CHUNK_ROWS: usize = 8192;
33
34const REMOTE_SCAN_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
36
37const REMOTE_SCAN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
40
41const REMOTE_SCAN_MAX_ATTEMPTS: u32 = 3;
44const REMOTE_SCAN_RETRY_BACKOFF: std::time::Duration = std::time::Duration::from_millis(100);
46
47pub type QueryHandlerSlot = Arc<parking_lot::RwLock<Option<Arc<dyn RemoteQueryHandler>>>>;
50
51pub type QueryClientPool =
54 Arc<parking_lot::Mutex<rustc_hash::FxHashMap<NodeId, tonic::transport::Channel>>>;
55
56#[async_trait::async_trait]
58pub trait RemoteQueryHandler: Send + Sync + 'static {
59 async fn remote_scan(
66 &self,
67 table_name: &str,
68 projection: Option<Vec<usize>>,
69 filter_sql: Option<String>,
70 ) -> Result<RecordBatch, String>;
71}
72
73pub(crate) struct QueryServiceImpl {
75 handler: QueryHandlerSlot,
76}
77
78impl QueryServiceImpl {
79 pub(crate) fn new(handler: QueryHandlerSlot) -> Self {
80 Self { handler }
81 }
82}
83
84type RemoteScanStream =
85 Pin<Box<dyn Stream<Item = Result<RemoteScanResponse, tonic::Status>> + Send>>;
86
87#[tonic::async_trait]
88impl QueryService for QueryServiceImpl {
89 type RemoteScanStream = RemoteScanStream;
90
91 async fn remote_scan(
92 &self,
93 request: tonic::Request<RemoteScanRequest>,
94 ) -> Result<tonic::Response<Self::RemoteScanStream>, tonic::Status> {
95 let handler = self
96 .handler
97 .read()
98 .clone()
99 .ok_or_else(|| tonic::Status::unavailable("no query handler registered"))?;
100
101 let req = request.into_inner();
102 let projection = (!req.projection.is_empty())
103 .then(|| req.projection.iter().map(|&p| p as usize).collect());
104 let filter_sql = (!req.filter_sql.is_empty()).then(|| req.filter_sql.clone());
105
106 let batch = handler
107 .remote_scan(&req.table_name, projection, filter_sql)
108 .await
109 .map_err(tonic::Status::internal)?;
110
111 let total = batch.num_rows();
114 let ranges: Vec<(usize, usize)> = if total == 0 {
115 vec![(0, 0)]
116 } else {
117 (0..total)
118 .step_by(REMOTE_SCAN_CHUNK_ROWS)
119 .map(|off| (off, REMOTE_SCAN_CHUNK_ROWS.min(total - off)))
120 .collect()
121 };
122 let encoder = BatchStreamEncoder::new(&batch.schema())
123 .map_err(|e| tonic::Status::internal(format!("arrow ipc schema encode: {e}")))?;
124 let stream = futures::stream::unfold(
125 (encoder, batch, ranges.into_iter().peekable()),
126 |(mut encoder, batch, mut ranges)| async move {
127 let (off, len) = ranges.next()?;
128 let produced = (|| {
131 let mut bytes = encoder.encode(&batch.slice(off, len))?;
132 if ranges.peek().is_none() {
134 bytes.extend_from_slice(&encoder.finish()?);
135 }
136 Ok::<_, arrow_schema::ArrowError>(bytes)
137 })();
138 let item = produced
139 .map(|arrow_ipc| RemoteScanResponse { arrow_ipc })
140 .map_err(|e| tonic::Status::internal(format!("arrow ipc encode: {e}")));
141 Some((item, (encoder, batch, ranges)))
142 },
143 );
144 Ok(tonic::Response::new(Box::pin(stream)))
145 }
146}
147
148pub(crate) fn query_service_server(
150 slot: QueryHandlerSlot,
151) -> query_v1::query_service_server::QueryServiceServer<QueryServiceImpl> {
152 query_v1::query_service_server::QueryServiceServer::new(QueryServiceImpl::new(slot))
153}
154
155async fn connect(
158 pool: &QueryClientPool,
159 kv: &Arc<dyn ClusterKv>,
160 peer: NodeId,
161) -> Result<Option<tonic::transport::Channel>, String> {
162 if let Some(chan) = pool.lock().get(&peer).cloned() {
163 return Ok(Some(chan));
164 }
165 let Some(addr) = kv.read_from(peer, BARRIER_ADDR_KEY).await else {
166 return Ok(None);
167 };
168 let channel = super::tls::client_endpoint(&addr)?
169 .connect_timeout(REMOTE_SCAN_CONNECT_TIMEOUT)
170 .connect_lazy();
171 Ok(Some(pool.lock().entry(peer).or_insert(channel).clone()))
172}
173
174pub type RemoteBatchStream = Pin<Box<dyn Stream<Item = Result<RecordBatch, String>> + Send>>;
177
178pub async fn remote_scan_client(
184 pool: &QueryClientPool,
185 kv: &Arc<dyn ClusterKv>,
186 peer: NodeId,
187 table_name: &str,
188 projection: Option<Vec<usize>>,
189 filter_sql: Option<String>,
190) -> Result<Option<RemoteBatchStream>, String> {
191 let projection = projection
192 .unwrap_or_default()
193 .into_iter()
194 .map(|p| u32::try_from(p).map_err(|_| format!("projection index {p} out of range")))
195 .collect::<Result<Vec<u32>, String>>()?;
196
197 let request = RemoteScanRequest {
198 table_name: table_name.to_string(),
199 projection,
200 filter_sql: filter_sql.unwrap_or_default(),
201 };
202
203 let mut attempt = 0u32;
208 let stream = loop {
209 attempt += 1;
210 let Some(channel) = connect(pool, kv, peer).await? else {
211 return Ok(None); };
213 let mut client = QueryServiceClient::new(channel);
214 match tokio::time::timeout(
215 REMOTE_SCAN_IDLE_TIMEOUT,
216 client.remote_scan(request.clone()),
217 )
218 .await
219 {
220 Ok(Ok(resp)) => break resp.into_inner(),
221 Ok(Err(status)) => {
222 pool.lock().remove(&peer);
223 if status.code() == tonic::Code::Unavailable && attempt < REMOTE_SCAN_MAX_ATTEMPTS {
225 tokio::time::sleep(REMOTE_SCAN_RETRY_BACKOFF).await;
226 continue;
227 }
228 return Err(format!("remote_scan to peer {} failed: {status}", peer.0));
229 }
230 Err(_) => {
231 pool.lock().remove(&peer);
232 if attempt < REMOTE_SCAN_MAX_ATTEMPTS {
233 tokio::time::sleep(REMOTE_SCAN_RETRY_BACKOFF).await;
234 continue;
235 }
236 return Err(format!(
237 "remote_scan to peer {} timed out opening stream",
238 peer.0
239 ));
240 }
241 }
242 };
243
244 let pool = Arc::clone(pool);
247 let decoder = BatchStreamDecoder::new();
248 let out = futures::stream::unfold(
249 Some((
250 stream,
251 decoder,
252 std::collections::VecDeque::<RecordBatch>::new(),
253 )),
254 move |state| {
255 let pool = Arc::clone(&pool);
256 async move {
257 let (mut stream, mut decoder, mut pending) = state?;
258 loop {
259 if let Some(batch) = pending.pop_front() {
260 return Some((Ok(batch), Some((stream, decoder, pending))));
261 }
262 match tokio::time::timeout(REMOTE_SCAN_IDLE_TIMEOUT, stream.message()).await {
263 Ok(Ok(Some(resp))) => match decoder.decode_chunk(resp.arrow_ipc) {
264 Ok(batches) => pending.extend(batches),
265 Err(e) => return Some((Err(e.to_string()), None)),
266 },
267 Ok(Ok(None)) => return None,
268 Ok(Err(status)) => {
269 pool.lock().remove(&peer);
270 return Some((Err(status.to_string()), None));
271 }
272 Err(_) => {
273 pool.lock().remove(&peer);
274 return Some((
275 Err(format!("remote_scan to peer {} stalled mid-stream", peer.0)),
276 None,
277 ));
278 }
279 }
280 }
281 }
282 },
283 );
284 Ok(Some(Box::pin(out)))
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::cluster::control::barrier::InMemoryKv;
291 use arrow::array::Int32Array;
292 use arrow_schema::{DataType, Field, Schema};
293 use futures::StreamExt;
294
295 struct StaticHandler(RecordBatch);
296
297 #[async_trait::async_trait]
298 impl RemoteQueryHandler for StaticHandler {
299 async fn remote_scan(
300 &self,
301 _table: &str,
302 projection: Option<Vec<usize>>,
303 _filter_sql: Option<String>,
304 ) -> Result<RecordBatch, String> {
305 match projection {
306 Some(p) => self.0.project(&p).map_err(|e| e.to_string()),
307 None => Ok(self.0.clone()),
308 }
309 }
310 }
311
312 type SeenArgs = Arc<parking_lot::Mutex<Option<(Option<Vec<usize>>, Option<String>)>>>;
315 struct RecordingHandler {
316 batch: RecordBatch,
317 seen: SeenArgs,
318 }
319
320 #[async_trait::async_trait]
321 impl RemoteQueryHandler for RecordingHandler {
322 async fn remote_scan(
323 &self,
324 _table: &str,
325 projection: Option<Vec<usize>>,
326 filter_sql: Option<String>,
327 ) -> Result<RecordBatch, String> {
328 *self.seen.lock() = Some((projection.clone(), filter_sql));
329 match projection {
330 Some(p) => self.batch.project(&p).map_err(|e| e.to_string()),
331 None => Ok(self.batch.clone()),
332 }
333 }
334 }
335
336 async fn serve_handler(
339 peer: NodeId,
340 handler: Arc<dyn RemoteQueryHandler>,
341 ) -> (QueryClientPool, Arc<dyn ClusterKv>) {
342 let slot: QueryHandlerSlot = Arc::new(parking_lot::RwLock::new(Some(handler)));
343 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
344 let addr = listener.local_addr().unwrap();
345 tokio::spawn(async move {
346 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
347 let mut builder = tonic::transport::Server::builder();
348 if let Some(tls) = crate::cluster::control::tls::server_tls() {
350 builder = builder.tls_config(tls.clone()).unwrap();
351 }
352 let _ = builder
353 .add_service(query_service_server(slot))
354 .serve_with_incoming(incoming)
355 .await;
356 });
357 let kv = InMemoryKv::new(NodeId(0));
358 kv.seed(peer, BARRIER_ADDR_KEY, addr.to_string());
359 let pool: QueryClientPool =
360 Arc::new(parking_lot::Mutex::new(rustc_hash::FxHashMap::default()));
361 (pool, Arc::new(kv))
362 }
363
364 async fn serve(peer: NodeId, batch: RecordBatch) -> (QueryClientPool, Arc<dyn ClusterKv>) {
365 serve_handler(peer, Arc::new(StaticHandler(batch))).await
366 }
367
368 fn int_batch(values: Vec<i32>) -> RecordBatch {
369 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
370 RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))]).unwrap()
371 }
372
373 async fn collect_chunks(stream: RemoteBatchStream) -> Vec<RecordBatch> {
375 stream
376 .map(|item| item.expect("remote scan chunk decode failed"))
377 .collect::<Vec<_>>()
378 .await
379 }
380
381 #[tokio::test]
384 async fn remote_scan_reassembles_chunks_in_order() {
385 let count = i32::try_from(REMOTE_SCAN_CHUNK_ROWS).unwrap() + 100;
386 let values: Vec<i32> = (0..count).collect();
387 let peer = NodeId(7);
388 let (pool, kv) = serve(peer, int_batch(values.clone())).await;
389
390 let stream = remote_scan_client(&pool, &kv, peer, "mv", None, None)
391 .await
392 .unwrap()
393 .expect("peer resolvable");
394 let batches = collect_chunks(stream).await;
395 assert!(batches.len() > 1);
396 let got = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
397 let col = got.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
398 assert_eq!(col.values(), values.as_slice());
399 }
400
401 #[tokio::test]
403 async fn remote_scan_empty_slice_is_not_an_error() {
404 let peer = NodeId(7);
405 let (pool, kv) = serve(peer, int_batch(vec![])).await;
406
407 let stream = remote_scan_client(&pool, &kv, peer, "mv", None, None)
408 .await
409 .unwrap()
410 .expect("peer resolvable");
411 let batches = collect_chunks(stream).await;
412 let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
413 assert_eq!(total, 0);
414 }
415
416 #[tokio::test]
419 async fn remote_scan_forwards_projection_and_filter() {
420 let seen: SeenArgs = Arc::new(parking_lot::Mutex::new(None));
421 let handler = Arc::new(RecordingHandler {
422 batch: int_batch(vec![10, 20, 30]),
423 seen: Arc::clone(&seen),
424 });
425 let peer = NodeId(7);
426 let (pool, kv) = serve_handler(peer, handler).await;
427
428 let stream = remote_scan_client(
429 &pool,
430 &kv,
431 peer,
432 "mv",
433 Some(vec![0]),
434 Some("(\"n\" > 1)".into()),
435 )
436 .await
437 .unwrap()
438 .expect("peer resolvable");
439 let batches = collect_chunks(stream).await;
440 let got = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
441 let col = got.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
442 assert_eq!(col.values(), &[10, 20, 30]);
443
444 let (proj, filter) = seen.lock().clone().expect("handler was called");
445 assert_eq!(proj, Some(vec![0]));
446 assert_eq!(filter.as_deref(), Some("(\"n\" > 1)"));
447 }
448
449 #[tokio::test]
457 #[ignore = "installs process-global cluster TLS; run with --ignored"]
458 async fn remote_scan_over_mtls() {
459 const SAN: &str = "laminar-cluster";
460
461 let mut ca_params = rcgen::CertificateParams::new(vec!["laminar-test-ca".into()]).unwrap();
464 ca_params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
465 let ca_key = rcgen::KeyPair::generate().unwrap();
466 let ca_cert = ca_params.self_signed(&ca_key).unwrap();
467
468 let mut leaf = rcgen::CertificateParams::new(vec![SAN.into()]).unwrap();
469 leaf.extended_key_usages = vec![
470 rcgen::ExtendedKeyUsagePurpose::ServerAuth,
471 rcgen::ExtendedKeyUsagePurpose::ClientAuth,
472 ];
473 let leaf_key = rcgen::KeyPair::generate().unwrap();
474 let leaf_cert = leaf.signed_by(&leaf_key, &ca_cert, &ca_key).unwrap();
475
476 crate::cluster::control::set_cluster_tls(crate::cluster::control::ClusterTls::from_pem(
477 leaf_cert.pem().as_bytes(),
478 leaf_key.serialize_pem().as_bytes(),
479 ca_cert.pem().as_bytes(),
480 SAN,
481 ));
482
483 let peer = NodeId(7);
484 let (pool, kv) =
485 serve_handler(peer, Arc::new(StaticHandler(int_batch(vec![1, 2, 3])))).await;
486 let stream = remote_scan_client(&pool, &kv, peer, "mv", None, None)
487 .await
488 .unwrap()
489 .expect("peer resolvable");
490 let batches = collect_chunks(stream).await;
491 let got = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
492 let col = got.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
493 assert_eq!(col.values(), &[1, 2, 3]);
494 }
495}