Skip to main content

laminar_db/
show_commands.rs

1//! SHOW and DESCRIBE command builders for `LaminarDB`.
2//!
3//! Reopens `impl LaminarDB` to keep the main `db.rs` focused on dispatch.
4
5use std::sync::Arc;
6
7use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt64Array};
8use arrow::datatypes::{DataType, Field, Schema};
9
10use crate::db::LaminarDB;
11use crate::error::DbError;
12
13impl LaminarDB {
14    /// Build a SHOW CHECKPOINT STATUS metadata result. Public so the server's
15    /// `GET /api/v1/cluster/checkpoints` endpoint can surface it.
16    ///
17    /// # Errors
18    ///
19    /// Returns [`DbError::Checkpoint`] if the metadata batch cannot be
20    /// assembled from the latest checkpoint.
21    pub async fn build_show_checkpoint_status(&self) -> Result<RecordBatch, DbError> {
22        let store = self.checkpoint_store();
23        let (latest, list) = match &store {
24            Some(s) => {
25                let latest = s.load_latest().await.map_err(|e| {
26                    DbError::Checkpoint(format!("failed to load latest checkpoint: {e}"))
27                })?;
28                let list = s
29                    .list()
30                    .await
31                    .map_err(|e| DbError::Checkpoint(format!("failed to list checkpoints: {e}")))?;
32                (latest, list)
33            }
34            None => (None, vec![]),
35        };
36
37        // Build single-row result with checkpoint metadata.
38        let (cp_id, epoch, ts_ms, sources, sinks, total_checkpoints) = if let Some(ref m) = latest {
39            (
40                m.checkpoint_id,
41                m.epoch,
42                m.timestamp_ms,
43                m.source_names.join(", "),
44                m.sink_names.join(", "),
45                list.len() as u64,
46            )
47        } else {
48            (0, 0, 0, String::new(), String::new(), 0)
49        };
50
51        let schema = Arc::new(Schema::new(vec![
52            Field::new("checkpoint_id", DataType::UInt64, false),
53            Field::new("epoch", DataType::UInt64, false),
54            Field::new("timestamp_ms", DataType::UInt64, false),
55            Field::new("sources", DataType::Utf8, false),
56            Field::new("sinks", DataType::Utf8, false),
57            Field::new("total_checkpoints", DataType::UInt64, false),
58        ]));
59
60        RecordBatch::try_new(
61            schema,
62            vec![
63                Arc::new(UInt64Array::from(vec![cp_id])),
64                Arc::new(UInt64Array::from(vec![epoch])),
65                Arc::new(UInt64Array::from(vec![ts_ms])),
66                Arc::new(StringArray::from(vec![sources.as_str()])),
67                Arc::new(StringArray::from(vec![sinks.as_str()])),
68                Arc::new(UInt64Array::from(vec![total_checkpoints])),
69            ],
70        )
71        .map_err(|e| DbError::Checkpoint(format!("failed to build checkpoint status batch: {e}")))
72    }
73
74    /// Build a SHOW MATERIALIZED VIEWS metadata result.
75    pub(crate) fn build_show_materialized_views(&self) -> RecordBatch {
76        let registry = self.mv_registry.lock();
77        let mut names = Vec::new();
78        let mut sqls = Vec::new();
79        let mut states = Vec::new();
80        for view in registry.views() {
81            let info = crate::handle::MaterializedViewInfo::from(view);
82            names.push(info.name);
83            sqls.push(info.sql);
84            states.push(info.state);
85        }
86        let names_ref: Vec<&str> = names.iter().map(String::as_str).collect();
87        let sqls_ref: Vec<&str> = sqls.iter().map(String::as_str).collect();
88        let states_ref: Vec<&str> = states.iter().map(String::as_str).collect();
89        let schema = Arc::new(Schema::new(vec![
90            Field::new("view_name", DataType::Utf8, false),
91            Field::new("sql", DataType::Utf8, false),
92            Field::new("state", DataType::Utf8, false),
93        ]));
94        RecordBatch::try_new(
95            schema,
96            vec![
97                Arc::new(StringArray::from(names_ref)),
98                Arc::new(StringArray::from(sqls_ref)),
99                Arc::new(StringArray::from(states_ref)),
100            ],
101        )
102        .expect("show materialized views: schema matches columns")
103    }
104
105    /// Build a SHOW SOURCES metadata result with connector metadata.
106    pub(crate) fn build_show_sources(&self) -> RecordBatch {
107        let sources = self.sources();
108        let mgr = self.connector_manager.lock();
109        let regs = mgr.sources();
110
111        let mut names = Vec::with_capacity(sources.len());
112        let mut connectors: Vec<Option<&str>> = Vec::with_capacity(sources.len());
113        let mut formats: Vec<Option<&str>> = Vec::with_capacity(sources.len());
114        let mut watermarks: Vec<Option<&str>> = Vec::with_capacity(sources.len());
115
116        for s in &sources {
117            names.push(s.name.as_str());
118            if let Some(reg) = regs.get(&s.name) {
119                connectors.push(reg.connector_type.as_deref());
120                formats.push(reg.format.as_deref());
121            } else {
122                connectors.push(None);
123                formats.push(None);
124            }
125            watermarks.push(s.watermark_column.as_deref());
126        }
127
128        let schema = Arc::new(Schema::new(vec![
129            Field::new("source_name", DataType::Utf8, false),
130            Field::new("connector", DataType::Utf8, true),
131            Field::new("format", DataType::Utf8, true),
132            Field::new("watermark_column", DataType::Utf8, true),
133        ]));
134        RecordBatch::try_new(
135            schema,
136            vec![
137                Arc::new(StringArray::from(names)),
138                Arc::new(StringArray::from(connectors)),
139                Arc::new(StringArray::from(formats)),
140                Arc::new(StringArray::from(watermarks)),
141            ],
142        )
143        .expect("show sources: schema matches columns")
144    }
145
146    /// Build a SHOW SINKS metadata result with connector metadata.
147    pub(crate) fn build_show_sinks(&self) -> RecordBatch {
148        let sinks = self.sinks();
149        let mgr = self.connector_manager.lock();
150        let regs = mgr.sinks();
151
152        let mut names = Vec::with_capacity(sinks.len());
153        let mut inputs: Vec<Option<String>> = Vec::with_capacity(sinks.len());
154        let mut connectors: Vec<Option<&str>> = Vec::with_capacity(sinks.len());
155        let mut formats: Vec<Option<&str>> = Vec::with_capacity(sinks.len());
156
157        for s in &sinks {
158            names.push(s.name.as_str());
159            // Input comes from catalog (always registered), connector metadata from ConnectorManager
160            let catalog_input = self.catalog.get_sink_input(&s.name);
161            if let Some(reg) = regs.get(&s.name) {
162                inputs.push(Some(reg.input.clone()));
163                connectors.push(reg.connector_type.as_deref());
164                formats.push(reg.format.as_deref());
165            } else {
166                inputs.push(catalog_input);
167                connectors.push(None);
168                formats.push(None);
169            }
170        }
171
172        let schema = Arc::new(Schema::new(vec![
173            Field::new("sink_name", DataType::Utf8, false),
174            Field::new("input", DataType::Utf8, true),
175            Field::new("connector", DataType::Utf8, true),
176            Field::new("format", DataType::Utf8, true),
177        ]));
178        RecordBatch::try_new(
179            schema,
180            vec![
181                Arc::new(StringArray::from(names)),
182                Arc::new(StringArray::from(inputs)),
183                Arc::new(StringArray::from(connectors)),
184                Arc::new(StringArray::from(formats)),
185            ],
186        )
187        .expect("show sinks: schema matches columns")
188    }
189
190    /// Build a SHOW QUERIES metadata result.
191    pub(crate) fn build_show_queries(&self) -> RecordBatch {
192        let queries = self.queries();
193        let ids: Vec<u64> = queries.iter().map(|q| q.id).collect();
194        let sqls: Vec<&str> = queries.iter().map(|q| q.sql.as_str()).collect();
195        let actives: Vec<bool> = queries.iter().map(|q| q.active).collect();
196        let schema = Arc::new(Schema::new(vec![
197            Field::new("query_id", DataType::UInt64, false),
198            Field::new("sql", DataType::Utf8, false),
199            Field::new("active", DataType::Boolean, false),
200        ]));
201        RecordBatch::try_new(
202            schema,
203            vec![
204                Arc::new(UInt64Array::from(ids)),
205                Arc::new(StringArray::from(sqls)),
206                Arc::new(BooleanArray::from(actives)),
207            ],
208        )
209        .expect("show queries: schema matches columns")
210    }
211
212    /// Build a SHOW STREAMS metadata result with SQL definitions.
213    pub(crate) fn build_show_streams(&self) -> RecordBatch {
214        let streams = self.catalog.list_streams();
215        let mgr = self.connector_manager.lock();
216        let regs = mgr.streams();
217
218        let mut names = Vec::with_capacity(streams.len());
219        let mut sqls: Vec<Option<&str>> = Vec::with_capacity(streams.len());
220
221        for name in &streams {
222            names.push(name.as_str());
223            sqls.push(regs.get(name.as_str()).map(|r| r.query_sql.as_str()));
224        }
225
226        let schema = Arc::new(Schema::new(vec![
227            Field::new("stream_name", DataType::Utf8, false),
228            Field::new("sql", DataType::Utf8, true),
229        ]));
230        RecordBatch::try_new(
231            schema,
232            vec![
233                Arc::new(StringArray::from(names)),
234                Arc::new(StringArray::from(sqls)),
235            ],
236        )
237        .expect("show streams: schema matches columns")
238    }
239
240    /// Build a SHOW CREATE SOURCE result.
241    pub(crate) fn build_show_create_source(&self, name: &str) -> Result<RecordBatch, DbError> {
242        if self.catalog.get_source(name).is_none() {
243            return Err(DbError::SourceNotFound(name.to_string()));
244        }
245        let mgr = self.connector_manager.lock();
246        let ddl = mgr
247            .get_ddl(name)
248            .ok_or_else(|| DbError::InvalidOperation(format!("No stored DDL for source '{name}'")))?
249            .to_string();
250        drop(mgr);
251
252        let schema = Arc::new(Schema::new(vec![Field::new(
253            "create_statement",
254            DataType::Utf8,
255            false,
256        )]));
257        Ok(RecordBatch::try_new(
258            schema,
259            vec![Arc::new(StringArray::from(vec![ddl.as_str()]))],
260        )
261        .expect("show create source: schema matches columns"))
262    }
263
264    /// Build a SHOW CREATE SINK result.
265    pub(crate) fn build_show_create_sink(&self, name: &str) -> Result<RecordBatch, DbError> {
266        if self.catalog.get_sink_input(name).is_none() {
267            return Err(DbError::SinkNotFound(name.to_string()));
268        }
269        let mgr = self.connector_manager.lock();
270        let ddl = mgr
271            .get_ddl(name)
272            .ok_or_else(|| DbError::InvalidOperation(format!("No stored DDL for sink '{name}'")))?
273            .to_string();
274        drop(mgr);
275
276        let schema = Arc::new(Schema::new(vec![Field::new(
277            "create_statement",
278            DataType::Utf8,
279            false,
280        )]));
281        Ok(RecordBatch::try_new(
282            schema,
283            vec![Arc::new(StringArray::from(vec![ddl.as_str()]))],
284        )
285        .expect("show create sink: schema matches columns"))
286    }
287
288    /// Build a SHOW TABLES metadata result.
289    pub(crate) fn build_show_tables(&self) -> RecordBatch {
290        let ts = self.table_store.read();
291        let mut names = Vec::new();
292        let mut pks = Vec::new();
293        let mut row_counts = Vec::new();
294        let mut connectors = Vec::new();
295
296        for name in ts.table_names() {
297            let pk = ts.primary_key(&name).unwrap_or("").to_string();
298            let count = ts.table_row_count(&name) as u64;
299            let conn = ts.connector(&name).unwrap_or("").to_string();
300
301            names.push(name);
302            pks.push(pk);
303            row_counts.push(count);
304            connectors.push(conn);
305        }
306
307        let names_ref: Vec<&str> = names.iter().map(String::as_str).collect();
308        let pks_ref: Vec<&str> = pks.iter().map(String::as_str).collect();
309        let connectors_ref: Vec<&str> = connectors.iter().map(String::as_str).collect();
310
311        let schema = Arc::new(Schema::new(vec![
312            Field::new("name", DataType::Utf8, false),
313            Field::new("primary_key", DataType::Utf8, false),
314            Field::new("row_count", DataType::UInt64, false),
315            Field::new("connector", DataType::Utf8, false),
316        ]));
317
318        RecordBatch::try_new(
319            schema,
320            vec![
321                Arc::new(StringArray::from(names_ref)),
322                Arc::new(StringArray::from(pks_ref)),
323                Arc::new(UInt64Array::from(row_counts)),
324                Arc::new(StringArray::from(connectors_ref)),
325            ],
326        )
327        .expect("show tables: schema matches columns")
328    }
329
330    /// Build a DESCRIBE result.
331    pub(crate) fn build_describe(&self, name: &str) -> Result<RecordBatch, DbError> {
332        // Try sources first
333        let schema = if let Some(s) = self.catalog.describe_source(name) {
334            s
335        // Then reference/dimension tables
336        } else if let Some(s) = self.table_store.read().table_schema(name) {
337            s
338        // Then materialized views
339        } else if let Some(s) = self
340            .mv_registry
341            .lock()
342            .get(name)
343            .map(|mv| mv.schema.clone())
344        {
345            s
346        // Then check sinks (no schema, but confirm existence)
347        } else if self.catalog.list_sinks().contains(&name.to_string()) {
348            return Err(DbError::InvalidOperation(
349                "DESCRIBE is not supported for sinks. Use SHOW SINKS for details.".to_string(),
350            ));
351        // Then streams (no stored schema yet)
352        } else if self.catalog.list_streams().contains(&name.to_string()) {
353            return Err(DbError::InvalidOperation(format!(
354                "Stream '{name}' exists but schema is only available after pipeline start"
355            )));
356        } else {
357            return Err(DbError::TableNotFound(name.to_string()));
358        };
359
360        schema_to_describe_batch(&schema)
361    }
362}
363
364/// Convert an Arrow schema to a DESCRIBE result `RecordBatch`.
365pub(crate) fn schema_to_describe_batch(schema: &Schema) -> Result<RecordBatch, DbError> {
366    let col_names: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
367    let col_types: Vec<String> = schema
368        .fields()
369        .iter()
370        .map(|f| format!("{}", f.data_type()))
371        .collect();
372    let col_nullable: Vec<bool> = schema.fields().iter().map(|f| f.is_nullable()).collect();
373
374    let names_ref: Vec<&str> = col_names.iter().map(String::as_str).collect();
375    let types_ref: Vec<&str> = col_types.iter().map(String::as_str).collect();
376
377    let result_schema = Arc::new(Schema::new(vec![
378        Field::new("column_name", DataType::Utf8, false),
379        Field::new("data_type", DataType::Utf8, false),
380        Field::new("nullable", DataType::Boolean, false),
381    ]));
382
383    RecordBatch::try_new(
384        result_schema,
385        vec![
386            Arc::new(StringArray::from(names_ref)),
387            Arc::new(StringArray::from(types_ref)),
388            Arc::new(BooleanArray::from(col_nullable)),
389        ],
390    )
391    .map_err(|e| DbError::InvalidOperation(format!("describe metadata: {e}")))
392}