Skip to main content

laminardb/
main.rs

1//! LaminarDB standalone server binary.
2
3#![allow(clippy::disallowed_types)] // cold path: server startup and config only
4
5mod ai;
6#[cfg(feature = "cluster-unstable")]
7mod cluster;
8#[cfg(feature = "cluster-unstable")]
9mod cluster_config;
10mod config;
11mod http;
12mod metrics;
13mod pgwire;
14mod reload;
15mod server;
16mod watcher;
17
18// Platform-dependent allocator selection:
19// - Unix / non-MSVC: jemalloc (excellent fragmentation control, NUMA-aware)
20// - Windows MSVC: mimalloc (only high-perf allocator supporting MSVC)
21#[cfg(all(feature = "jemalloc", not(target_env = "msvc")))]
22#[global_allocator]
23static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
24
25#[cfg(all(feature = "mimalloc", target_env = "msvc"))]
26#[global_allocator]
27static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
28
29use std::path::PathBuf;
30
31use anyhow::Result;
32use clap::Parser;
33use tracing::info;
34use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
35
36#[derive(Parser, Debug)]
37#[command(author, version, about = "LaminarDB streaming database server")]
38struct Args {
39    #[arg(short, long, default_value = "laminardb.toml")]
40    config: String,
41    #[arg(long, default_value = "info")]
42    log_level: String,
43    #[arg(long)]
44    admin_bind: Option<String>,
45    /// Postgres wire bind address (e.g. `127.0.0.1:5433`). Wildcard binds rejected.
46    #[arg(long)]
47    pgwire_bind: Option<String>,
48    /// Validate checkpoints and exit without starting the server.
49    #[arg(long)]
50    validate_checkpoints: bool,
51}
52
53#[tokio::main]
54async fn main() -> Result<()> {
55    let args = Args::parse();
56
57    tracing_subscriber::registry()
58        .with(
59            tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
60                format!(
61                    "laminar_server={l},laminar_db={l},laminar_core={l},\
62                     laminar_sql={l},laminar_connectors={l},laminar_storage={l}",
63                    l = args.log_level
64                )
65                .into()
66            }),
67        )
68        .with(tracing_subscriber::fmt::layer())
69        .init();
70
71    info!("Starting LaminarDB server");
72    info!("Version: {}", env!("CARGO_PKG_VERSION"));
73    info!("Config file: {}", args.config);
74
75    let config_path = PathBuf::from(&args.config);
76    let mut config = config::load_config(&config_path)?;
77
78    if let Some(bind) = args.admin_bind {
79        config.server.bind = bind;
80    }
81    if let Some(pg) = args.pgwire_bind {
82        // Empty string disables; a value overrides config.
83        config.server.pgwire_bind = if pg.is_empty() { None } else { Some(pg) };
84    }
85
86    if args.validate_checkpoints {
87        return validate_checkpoints_and_exit(&config).await;
88    }
89
90    let handle = server::run_server(config, config_path).await?;
91    handle.wait_for_shutdown().await?;
92
93    Ok(())
94}
95
96async fn validate_checkpoints_and_exit(config: &config::ServerConfig) -> Result<()> {
97    let store = build_checkpoint_store(config);
98    let Some(store) = store else {
99        info!("No checkpoint configuration found — nothing to validate");
100        return Ok(());
101    };
102
103    info!("Validating checkpoints...");
104    let report = store
105        .recover_latest_validated()
106        .await
107        .map_err(|e| anyhow::anyhow!("validation failed: {e}"))?;
108
109    info!(
110        "Examined {} checkpoint(s) in {:?}",
111        report.examined, report.elapsed
112    );
113    for (id, reason) in &report.skipped {
114        info!("  INVALID checkpoint {id}: {reason}");
115    }
116    match report.chosen_id {
117        Some(id) => info!("  VALID checkpoint {id} selected for recovery"),
118        None if report.examined == 0 => info!("  No checkpoints found (fresh start)"),
119        None => info!("  WARNING: No valid checkpoint found — recovery would start fresh"),
120    }
121
122    // Also run orphan detection
123    let orphans = store
124        .cleanup_orphans()
125        .await
126        .map_err(|e| anyhow::anyhow!("orphan cleanup failed: {e}"))?;
127    if orphans > 0 {
128        info!("Cleaned up {orphans} orphaned state file(s)");
129    }
130
131    Ok(())
132}
133
134fn build_checkpoint_store(
135    config: &config::ServerConfig,
136) -> Option<Box<dyn laminar_storage::checkpoint_store::CheckpointStore>> {
137    let cp = &config.checkpoint;
138    let url = &cp.url;
139
140    let obj_store = match laminar_storage::object_store_builder::build_object_store(
141        url,
142        &cp.storage,
143    ) {
144        Ok(s) => s,
145        Err(e) => {
146            tracing::error!(url = %url, error = %e, "failed to build object store for checkpoint validation");
147            return None;
148        }
149    };
150
151    let vnode_count = u16::try_from(config.state.vnode_capacity()).unwrap_or(u16::MAX);
152
153    // file:// URLs use the local FS path directly; cloud URLs need a prefix.
154    if url.starts_with("file://") {
155        let path = url.strip_prefix("file://").unwrap_or(url);
156        Some(Box::new(
157            laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
158                std::path::Path::new(path),
159                3,
160            )
161            .with_vnode_count(vnode_count),
162        ))
163    } else {
164        // Cloud URL: extract prefix from URL path (bucket is handled by object_store).
165        let prefix = url
166            .split("://")
167            .nth(1)
168            .and_then(|rest| rest.split_once('/').map(|(_, p)| format!("{p}/")))
169            .unwrap_or_default();
170        Some(Box::new(
171            laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
172                obj_store, prefix, 3,
173            )
174            .with_vnode_count(vnode_count),
175        ))
176    }
177}