1use std::sync::Arc;
6
7use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt64Array};
8use arrow::datatypes::{DataType, Field, Schema};
9
10use crate::db::LaminarDB;
11use crate::error::DbError;
12
13impl LaminarDB {
14 pub async fn build_show_checkpoint_status(&self) -> Result<RecordBatch, DbError> {
22 let store = self.checkpoint_store();
23 let (latest, list) = match &store {
24 Some(s) => {
25 let latest = s.load_latest().await.map_err(|e| {
26 DbError::Checkpoint(format!("failed to load latest checkpoint: {e}"))
27 })?;
28 let list = s
29 .list()
30 .await
31 .map_err(|e| DbError::Checkpoint(format!("failed to list checkpoints: {e}")))?;
32 (latest, list)
33 }
34 None => (None, vec![]),
35 };
36
37 let (cp_id, epoch, ts_ms, sources, sinks, total_checkpoints) = if let Some(ref m) = latest {
39 (
40 m.checkpoint_id,
41 m.epoch,
42 m.timestamp_ms,
43 m.source_names.join(", "),
44 m.sink_names.join(", "),
45 list.len() as u64,
46 )
47 } else {
48 (0, 0, 0, String::new(), String::new(), 0)
49 };
50
51 let schema = Arc::new(Schema::new(vec![
52 Field::new("checkpoint_id", DataType::UInt64, false),
53 Field::new("epoch", DataType::UInt64, false),
54 Field::new("timestamp_ms", DataType::UInt64, false),
55 Field::new("sources", DataType::Utf8, false),
56 Field::new("sinks", DataType::Utf8, false),
57 Field::new("total_checkpoints", DataType::UInt64, false),
58 ]));
59
60 RecordBatch::try_new(
61 schema,
62 vec![
63 Arc::new(UInt64Array::from(vec![cp_id])),
64 Arc::new(UInt64Array::from(vec![epoch])),
65 Arc::new(UInt64Array::from(vec![ts_ms])),
66 Arc::new(StringArray::from(vec![sources.as_str()])),
67 Arc::new(StringArray::from(vec![sinks.as_str()])),
68 Arc::new(UInt64Array::from(vec![total_checkpoints])),
69 ],
70 )
71 .map_err(|e| DbError::Checkpoint(format!("failed to build checkpoint status batch: {e}")))
72 }
73
74 pub(crate) fn build_show_materialized_views(&self) -> RecordBatch {
76 let registry = self.mv_registry.lock();
77 let mut names = Vec::new();
78 let mut sqls = Vec::new();
79 let mut states = Vec::new();
80 for view in registry.views() {
81 let info = crate::handle::MaterializedViewInfo::from(view);
82 names.push(info.name);
83 sqls.push(info.sql);
84 states.push(info.state);
85 }
86 let names_ref: Vec<&str> = names.iter().map(String::as_str).collect();
87 let sqls_ref: Vec<&str> = sqls.iter().map(String::as_str).collect();
88 let states_ref: Vec<&str> = states.iter().map(String::as_str).collect();
89 let schema = Arc::new(Schema::new(vec![
90 Field::new("view_name", DataType::Utf8, false),
91 Field::new("sql", DataType::Utf8, false),
92 Field::new("state", DataType::Utf8, false),
93 ]));
94 RecordBatch::try_new(
95 schema,
96 vec![
97 Arc::new(StringArray::from(names_ref)),
98 Arc::new(StringArray::from(sqls_ref)),
99 Arc::new(StringArray::from(states_ref)),
100 ],
101 )
102 .expect("show materialized views: schema matches columns")
103 }
104
105 pub(crate) fn build_show_sources(&self) -> RecordBatch {
107 let sources = self.sources();
108 let mgr = self.connector_manager.lock();
109 let regs = mgr.sources();
110
111 let mut names = Vec::with_capacity(sources.len());
112 let mut connectors: Vec<Option<&str>> = Vec::with_capacity(sources.len());
113 let mut formats: Vec<Option<&str>> = Vec::with_capacity(sources.len());
114 let mut watermarks: Vec<Option<&str>> = Vec::with_capacity(sources.len());
115
116 for s in &sources {
117 names.push(s.name.as_str());
118 if let Some(reg) = regs.get(&s.name) {
119 connectors.push(reg.connector_type.as_deref());
120 formats.push(reg.format.as_deref());
121 } else {
122 connectors.push(None);
123 formats.push(None);
124 }
125 watermarks.push(s.watermark_column.as_deref());
126 }
127
128 let schema = Arc::new(Schema::new(vec![
129 Field::new("source_name", DataType::Utf8, false),
130 Field::new("connector", DataType::Utf8, true),
131 Field::new("format", DataType::Utf8, true),
132 Field::new("watermark_column", DataType::Utf8, true),
133 ]));
134 RecordBatch::try_new(
135 schema,
136 vec![
137 Arc::new(StringArray::from(names)),
138 Arc::new(StringArray::from(connectors)),
139 Arc::new(StringArray::from(formats)),
140 Arc::new(StringArray::from(watermarks)),
141 ],
142 )
143 .expect("show sources: schema matches columns")
144 }
145
146 pub(crate) fn build_show_sinks(&self) -> RecordBatch {
148 let sinks = self.sinks();
149 let mgr = self.connector_manager.lock();
150 let regs = mgr.sinks();
151
152 let mut names = Vec::with_capacity(sinks.len());
153 let mut inputs: Vec<Option<String>> = Vec::with_capacity(sinks.len());
154 let mut connectors: Vec<Option<&str>> = Vec::with_capacity(sinks.len());
155 let mut formats: Vec<Option<&str>> = Vec::with_capacity(sinks.len());
156
157 for s in &sinks {
158 names.push(s.name.as_str());
159 let catalog_input = self.catalog.get_sink_input(&s.name);
161 if let Some(reg) = regs.get(&s.name) {
162 inputs.push(Some(reg.input.clone()));
163 connectors.push(reg.connector_type.as_deref());
164 formats.push(reg.format.as_deref());
165 } else {
166 inputs.push(catalog_input);
167 connectors.push(None);
168 formats.push(None);
169 }
170 }
171
172 let schema = Arc::new(Schema::new(vec![
173 Field::new("sink_name", DataType::Utf8, false),
174 Field::new("input", DataType::Utf8, true),
175 Field::new("connector", DataType::Utf8, true),
176 Field::new("format", DataType::Utf8, true),
177 ]));
178 RecordBatch::try_new(
179 schema,
180 vec![
181 Arc::new(StringArray::from(names)),
182 Arc::new(StringArray::from(inputs)),
183 Arc::new(StringArray::from(connectors)),
184 Arc::new(StringArray::from(formats)),
185 ],
186 )
187 .expect("show sinks: schema matches columns")
188 }
189
190 pub(crate) fn build_show_queries(&self) -> RecordBatch {
192 let queries = self.queries();
193 let ids: Vec<u64> = queries.iter().map(|q| q.id).collect();
194 let sqls: Vec<&str> = queries.iter().map(|q| q.sql.as_str()).collect();
195 let actives: Vec<bool> = queries.iter().map(|q| q.active).collect();
196 let schema = Arc::new(Schema::new(vec![
197 Field::new("query_id", DataType::UInt64, false),
198 Field::new("sql", DataType::Utf8, false),
199 Field::new("active", DataType::Boolean, false),
200 ]));
201 RecordBatch::try_new(
202 schema,
203 vec![
204 Arc::new(UInt64Array::from(ids)),
205 Arc::new(StringArray::from(sqls)),
206 Arc::new(BooleanArray::from(actives)),
207 ],
208 )
209 .expect("show queries: schema matches columns")
210 }
211
212 pub(crate) fn build_show_streams(&self) -> RecordBatch {
214 let streams = self.catalog.list_streams();
215 let mgr = self.connector_manager.lock();
216 let regs = mgr.streams();
217
218 let mut names = Vec::with_capacity(streams.len());
219 let mut sqls: Vec<Option<&str>> = Vec::with_capacity(streams.len());
220
221 for name in &streams {
222 names.push(name.as_str());
223 sqls.push(regs.get(name.as_str()).map(|r| r.query_sql.as_str()));
224 }
225
226 let schema = Arc::new(Schema::new(vec![
227 Field::new("stream_name", DataType::Utf8, false),
228 Field::new("sql", DataType::Utf8, true),
229 ]));
230 RecordBatch::try_new(
231 schema,
232 vec![
233 Arc::new(StringArray::from(names)),
234 Arc::new(StringArray::from(sqls)),
235 ],
236 )
237 .expect("show streams: schema matches columns")
238 }
239
240 pub(crate) fn build_show_create_source(&self, name: &str) -> Result<RecordBatch, DbError> {
242 if self.catalog.get_source(name).is_none() {
243 return Err(DbError::SourceNotFound(name.to_string()));
244 }
245 let mgr = self.connector_manager.lock();
246 let ddl = mgr
247 .get_ddl(name)
248 .ok_or_else(|| DbError::InvalidOperation(format!("No stored DDL for source '{name}'")))?
249 .to_string();
250 drop(mgr);
251
252 let schema = Arc::new(Schema::new(vec![Field::new(
253 "create_statement",
254 DataType::Utf8,
255 false,
256 )]));
257 Ok(RecordBatch::try_new(
258 schema,
259 vec![Arc::new(StringArray::from(vec![ddl.as_str()]))],
260 )
261 .expect("show create source: schema matches columns"))
262 }
263
264 pub(crate) fn build_show_create_sink(&self, name: &str) -> Result<RecordBatch, DbError> {
266 if self.catalog.get_sink_input(name).is_none() {
267 return Err(DbError::SinkNotFound(name.to_string()));
268 }
269 let mgr = self.connector_manager.lock();
270 let ddl = mgr
271 .get_ddl(name)
272 .ok_or_else(|| DbError::InvalidOperation(format!("No stored DDL for sink '{name}'")))?
273 .to_string();
274 drop(mgr);
275
276 let schema = Arc::new(Schema::new(vec![Field::new(
277 "create_statement",
278 DataType::Utf8,
279 false,
280 )]));
281 Ok(RecordBatch::try_new(
282 schema,
283 vec![Arc::new(StringArray::from(vec![ddl.as_str()]))],
284 )
285 .expect("show create sink: schema matches columns"))
286 }
287
288 pub(crate) fn build_show_tables(&self) -> RecordBatch {
290 let ts = self.table_store.read();
291 let mut names = Vec::new();
292 let mut pks = Vec::new();
293 let mut row_counts = Vec::new();
294 let mut connectors = Vec::new();
295
296 for name in ts.table_names() {
297 let pk = ts.primary_key(&name).unwrap_or("").to_string();
298 let count = ts.table_row_count(&name) as u64;
299 let conn = ts.connector(&name).unwrap_or("").to_string();
300
301 names.push(name);
302 pks.push(pk);
303 row_counts.push(count);
304 connectors.push(conn);
305 }
306
307 let names_ref: Vec<&str> = names.iter().map(String::as_str).collect();
308 let pks_ref: Vec<&str> = pks.iter().map(String::as_str).collect();
309 let connectors_ref: Vec<&str> = connectors.iter().map(String::as_str).collect();
310
311 let schema = Arc::new(Schema::new(vec![
312 Field::new("name", DataType::Utf8, false),
313 Field::new("primary_key", DataType::Utf8, false),
314 Field::new("row_count", DataType::UInt64, false),
315 Field::new("connector", DataType::Utf8, false),
316 ]));
317
318 RecordBatch::try_new(
319 schema,
320 vec![
321 Arc::new(StringArray::from(names_ref)),
322 Arc::new(StringArray::from(pks_ref)),
323 Arc::new(UInt64Array::from(row_counts)),
324 Arc::new(StringArray::from(connectors_ref)),
325 ],
326 )
327 .expect("show tables: schema matches columns")
328 }
329
330 pub(crate) fn build_describe(&self, name: &str) -> Result<RecordBatch, DbError> {
332 let schema = if let Some(s) = self.catalog.describe_source(name) {
334 s
335 } else if let Some(s) = self.table_store.read().table_schema(name) {
337 s
338 } else if let Some(s) = self
340 .mv_registry
341 .lock()
342 .get(name)
343 .map(|mv| mv.schema.clone())
344 {
345 s
346 } else if self.catalog.list_sinks().contains(&name.to_string()) {
348 return Err(DbError::InvalidOperation(
349 "DESCRIBE is not supported for sinks. Use SHOW SINKS for details.".to_string(),
350 ));
351 } else if self.catalog.list_streams().contains(&name.to_string()) {
353 return Err(DbError::InvalidOperation(format!(
354 "Stream '{name}' exists but schema is only available after pipeline start"
355 )));
356 } else {
357 return Err(DbError::TableNotFound(name.to_string()));
358 };
359
360 schema_to_describe_batch(&schema)
361 }
362}
363
364pub(crate) fn schema_to_describe_batch(schema: &Schema) -> Result<RecordBatch, DbError> {
366 let col_names: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
367 let col_types: Vec<String> = schema
368 .fields()
369 .iter()
370 .map(|f| format!("{}", f.data_type()))
371 .collect();
372 let col_nullable: Vec<bool> = schema.fields().iter().map(|f| f.is_nullable()).collect();
373
374 let names_ref: Vec<&str> = col_names.iter().map(String::as_str).collect();
375 let types_ref: Vec<&str> = col_types.iter().map(String::as_str).collect();
376
377 let result_schema = Arc::new(Schema::new(vec![
378 Field::new("column_name", DataType::Utf8, false),
379 Field::new("data_type", DataType::Utf8, false),
380 Field::new("nullable", DataType::Boolean, false),
381 ]));
382
383 RecordBatch::try_new(
384 result_schema,
385 vec![
386 Arc::new(StringArray::from(names_ref)),
387 Arc::new(StringArray::from(types_ref)),
388 Arc::new(BooleanArray::from(col_nullable)),
389 ],
390 )
391 .map_err(|e| DbError::InvalidOperation(format!("describe metadata: {e}")))
392}