1#![allow(clippy::disallowed_types)] mod ai;
6#[cfg(feature = "cluster-unstable")]
7mod cluster;
8#[cfg(feature = "cluster-unstable")]
9mod cluster_config;
10mod config;
11mod http;
12mod metrics;
13mod pgwire;
14mod reload;
15mod server;
16mod watcher;
17
18#[cfg(all(feature = "jemalloc", not(target_env = "msvc")))]
22#[global_allocator]
23static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
24
25#[cfg(all(feature = "mimalloc", target_env = "msvc"))]
26#[global_allocator]
27static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
28
29use std::path::PathBuf;
30
31use anyhow::Result;
32use clap::Parser;
33use tracing::info;
34use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
35
36#[derive(Parser, Debug)]
37#[command(author, version, about = "LaminarDB streaming database server")]
38struct Args {
39 #[arg(short, long, default_value = "laminardb.toml")]
40 config: String,
41 #[arg(long, default_value = "info")]
42 log_level: String,
43 #[arg(long)]
44 admin_bind: Option<String>,
45 #[arg(long)]
47 pgwire_bind: Option<String>,
48 #[arg(long)]
50 validate_checkpoints: bool,
51}
52
53#[tokio::main]
54async fn main() -> Result<()> {
55 let args = Args::parse();
56
57 tracing_subscriber::registry()
58 .with(
59 tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
60 format!(
61 "laminar_server={l},laminar_db={l},laminar_core={l},\
62 laminar_sql={l},laminar_connectors={l},laminar_storage={l}",
63 l = args.log_level
64 )
65 .into()
66 }),
67 )
68 .with(tracing_subscriber::fmt::layer())
69 .init();
70
71 info!("Starting LaminarDB server");
72 info!("Version: {}", env!("CARGO_PKG_VERSION"));
73 info!("Config file: {}", args.config);
74
75 let config_path = PathBuf::from(&args.config);
76 let mut config = config::load_config(&config_path)?;
77
78 if let Some(bind) = args.admin_bind {
79 config.server.bind = bind;
80 }
81 if let Some(pg) = args.pgwire_bind {
82 config.server.pgwire_bind = if pg.is_empty() { None } else { Some(pg) };
84 }
85
86 if args.validate_checkpoints {
87 return validate_checkpoints_and_exit(&config).await;
88 }
89
90 let handle = server::run_server(config, config_path).await?;
91 handle.wait_for_shutdown().await?;
92
93 Ok(())
94}
95
96async fn validate_checkpoints_and_exit(config: &config::ServerConfig) -> Result<()> {
97 let store = build_checkpoint_store(config);
98 let Some(store) = store else {
99 info!("No checkpoint configuration found — nothing to validate");
100 return Ok(());
101 };
102
103 info!("Validating checkpoints...");
104 let report = store
105 .recover_latest_validated()
106 .await
107 .map_err(|e| anyhow::anyhow!("validation failed: {e}"))?;
108
109 info!(
110 "Examined {} checkpoint(s) in {:?}",
111 report.examined, report.elapsed
112 );
113 for (id, reason) in &report.skipped {
114 info!(" INVALID checkpoint {id}: {reason}");
115 }
116 match report.chosen_id {
117 Some(id) => info!(" VALID checkpoint {id} selected for recovery"),
118 None if report.examined == 0 => info!(" No checkpoints found (fresh start)"),
119 None => info!(" WARNING: No valid checkpoint found — recovery would start fresh"),
120 }
121
122 let orphans = store
124 .cleanup_orphans()
125 .await
126 .map_err(|e| anyhow::anyhow!("orphan cleanup failed: {e}"))?;
127 if orphans > 0 {
128 info!("Cleaned up {orphans} orphaned state file(s)");
129 }
130
131 Ok(())
132}
133
134fn build_checkpoint_store(
135 config: &config::ServerConfig,
136) -> Option<Box<dyn laminar_storage::checkpoint_store::CheckpointStore>> {
137 let cp = &config.checkpoint;
138 let url = &cp.url;
139
140 let obj_store = match laminar_storage::object_store_builder::build_object_store(
141 url,
142 &cp.storage,
143 ) {
144 Ok(s) => s,
145 Err(e) => {
146 tracing::error!(url = %url, error = %e, "failed to build object store for checkpoint validation");
147 return None;
148 }
149 };
150
151 let vnode_count = u16::try_from(config.state.vnode_capacity()).unwrap_or(u16::MAX);
152
153 if url.starts_with("file://") {
155 let path = url.strip_prefix("file://").unwrap_or(url);
156 Some(Box::new(
157 laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
158 std::path::Path::new(path),
159 3,
160 )
161 .with_vnode_count(vnode_count),
162 ))
163 } else {
164 let prefix = url
166 .split("://")
167 .nth(1)
168 .and_then(|rest| rest.split_once('/').map(|(_, p)| format!("{p}/")))
169 .unwrap_or_default();
170 Some(Box::new(
171 laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
172 obj_store, prefix, 3,
173 )
174 .with_vnode_count(vnode_count),
175 ))
176 }
177}