Skip to main content

laminardb/
http.rs

1//! HTTP API for LaminarDB server.
2
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Instant;
6
7use prometheus::Registry;
8
9use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
10use axum::extract::{Path, State};
11use axum::http::StatusCode;
12use axum::response::IntoResponse;
13use axum::routing::{get, post};
14use axum::{Json, Router};
15use serde::{Deserialize, Serialize};
16use tower_http::cors::CorsLayer;
17use tracing::{info, warn};
18
19use laminar_db::LaminarDB;
20
21use crate::config::ServerConfig;
22use crate::metrics::ServerMetrics;
23use crate::reload::{self, ReloadGuard};
24use crate::server::ServerError;
25
26pub struct AppState {
27    pub db: Arc<LaminarDB>,
28    pub config_path: PathBuf,
29    /// `parking_lot::RwLock` is correct here — every reader drops the
30    /// guard before awaiting any I/O (see `reload_config`/`cluster_status`/
31    /// watcher.rs). No writer holds the lock across `.await` either.
32    pub current_config: parking_lot::RwLock<ServerConfig>,
33    pub reload_guard: ReloadGuard,
34    pub registry: Arc<Registry>,
35    pub server_metrics: ServerMetrics,
36}
37
38pub fn build_router(state: Arc<AppState>) -> Router {
39    Router::new()
40        .route("/health", get(health_check))
41        .route("/ready", get(readiness_check))
42        .route("/metrics", get(prometheus_metrics))
43        .route("/api/v1/sources", get(list_sources))
44        .route("/api/v1/sinks", get(list_sinks))
45        .route("/api/v1/streams", get(list_streams))
46        .route("/api/v1/streams/{name}", get(get_stream))
47        .route("/api/v1/checkpoint", post(trigger_checkpoint))
48        .route("/api/v1/sql", post(execute_sql))
49        .route("/api/v1/reload", post(handle_reload))
50        .route("/api/v1/cluster", get(cluster_status))
51        .route("/ws/{name}", get(ws_upgrade))
52        .layer(CorsLayer::permissive())
53        .layer(axum::middleware::from_fn(request_logging))
54        .with_state(state)
55}
56
57pub async fn serve(router: Router, bind: &str) -> Result<tokio::task::JoinHandle<()>, ServerError> {
58    let listener = tokio::net::TcpListener::bind(bind)
59        .await
60        .map_err(|e| ServerError::Http(format!("failed to bind to {bind}: {e}")))?;
61
62    let handle = tokio::spawn(async move {
63        if let Err(e) = axum::serve(listener, router).await {
64            tracing::error!("HTTP server error: {e}");
65        }
66    });
67
68    Ok(handle)
69}
70
71/// Health check response.
72#[derive(Debug, Serialize)]
73struct HealthResponse {
74    status: &'static str,
75    version: &'static str,
76    pipeline_state: &'static str,
77}
78
79#[derive(Debug, Serialize)]
80struct SourceResponse {
81    name: String,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    watermark_column: Option<String>,
84}
85
86#[derive(Debug, Serialize)]
87struct StreamResponse {
88    name: String,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    sql: Option<String>,
91}
92
93#[derive(Debug, Serialize)]
94struct SinkResponse {
95    name: String,
96}
97
98#[derive(Debug, Serialize)]
99struct CheckpointResponse {
100    success: bool,
101    checkpoint_id: u64,
102    epoch: u64,
103    duration_ms: u64,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    error: Option<String>,
106}
107
108#[derive(Debug, Deserialize)]
109struct SqlRequest {
110    sql: String,
111}
112
113#[derive(Debug, Serialize)]
114struct SqlResponse {
115    result_type: String,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    object_name: Option<String>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    rows_affected: Option<u64>,
120}
121
122#[derive(Debug, Serialize)]
123struct ErrorBody {
124    error: String,
125}
126
127fn error_response(status: StatusCode, msg: impl Into<String>) -> impl IntoResponse {
128    (status, Json(ErrorBody { error: msg.into() }))
129}
130
131async fn request_logging(
132    req: axum::http::Request<axum::body::Body>,
133    next: axum::middleware::Next,
134) -> impl IntoResponse {
135    let method = req.method().clone();
136    let uri = req.uri().clone();
137    let start = Instant::now();
138
139    let response = next.run(req).await;
140
141    let duration_ms = start.elapsed().as_millis();
142    let status = response.status();
143    info!("{method} {uri} -> {status} ({duration_ms}ms)");
144
145    response
146}
147
148async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
149    let pipeline_state = state.db.pipeline_state();
150    let status = if pipeline_state == "Stopped" {
151        StatusCode::SERVICE_UNAVAILABLE
152    } else {
153        StatusCode::OK
154    };
155
156    (
157        status,
158        Json(HealthResponse {
159            status: if status == StatusCode::OK {
160                "healthy"
161            } else {
162                "unhealthy"
163            },
164            version: env!("CARGO_PKG_VERSION"),
165            pipeline_state,
166        }),
167    )
168}
169
170async fn readiness_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
171    let pipeline_state = state.db.pipeline_state();
172    if pipeline_state == "Running" {
173        (
174            StatusCode::OK,
175            Json(HealthResponse {
176                status: "ready",
177                version: env!("CARGO_PKG_VERSION"),
178                pipeline_state,
179            }),
180        )
181            .into_response()
182    } else {
183        error_response(
184            StatusCode::SERVICE_UNAVAILABLE,
185            format!("pipeline is {pipeline_state}, not Running"),
186        )
187        .into_response()
188    }
189}
190
191async fn prometheus_metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
192    // Update uptime gauge on each scrape — cheap, and always fresh.
193    #[allow(clippy::cast_possible_wrap)]
194    state
195        .server_metrics
196        .uptime_seconds
197        .set(state.db.uptime().as_secs() as i64);
198
199    (
200        StatusCode::OK,
201        [(
202            axum::http::header::CONTENT_TYPE,
203            "text/plain; version=0.0.4; charset=utf-8",
204        )],
205        crate::metrics::render(&state.registry),
206    )
207}
208
209async fn list_sources(State(state): State<Arc<AppState>>) -> impl IntoResponse {
210    let sources: Vec<SourceResponse> = state
211        .db
212        .sources()
213        .into_iter()
214        .map(|s| SourceResponse {
215            name: s.name,
216            watermark_column: s.watermark_column,
217        })
218        .collect();
219    Json(sources)
220}
221
222async fn list_sinks(State(state): State<Arc<AppState>>) -> impl IntoResponse {
223    let sinks: Vec<SinkResponse> = state
224        .db
225        .sinks()
226        .into_iter()
227        .map(|s| SinkResponse { name: s.name })
228        .collect();
229    Json(sinks)
230}
231
232async fn list_streams(State(state): State<Arc<AppState>>) -> impl IntoResponse {
233    let streams: Vec<StreamResponse> = state
234        .db
235        .streams()
236        .into_iter()
237        .map(|s| StreamResponse {
238            name: s.name,
239            sql: s.sql,
240        })
241        .collect();
242    Json(streams)
243}
244
245async fn get_stream(
246    State(state): State<Arc<AppState>>,
247    Path(name): Path<String>,
248) -> impl IntoResponse {
249    let streams = state.db.streams();
250    match streams.into_iter().find(|s| s.name == name) {
251        Some(s) => Json(StreamResponse {
252            name: s.name,
253            sql: s.sql,
254        })
255        .into_response(),
256        None => error_response(StatusCode::NOT_FOUND, format!("stream '{name}' not found"))
257            .into_response(),
258    }
259}
260
261async fn trigger_checkpoint(State(state): State<Arc<AppState>>) -> impl IntoResponse {
262    match state.db.checkpoint().await {
263        Ok(result) => {
264            let status = if result.success {
265                StatusCode::OK
266            } else {
267                StatusCode::INTERNAL_SERVER_ERROR
268            };
269            #[allow(clippy::cast_possible_truncation)]
270            let duration_ms = result.duration.as_millis() as u64;
271            (
272                status,
273                Json(CheckpointResponse {
274                    success: result.success,
275                    checkpoint_id: result.checkpoint_id,
276                    epoch: result.epoch,
277                    duration_ms,
278                    error: result.error,
279                }),
280            )
281                .into_response()
282        }
283        Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
284    }
285}
286
287async fn execute_sql(
288    State(state): State<Arc<AppState>>,
289    Json(req): Json<SqlRequest>,
290) -> impl IntoResponse {
291    match state.db.execute(&req.sql).await {
292        Ok(result) => {
293            use laminar_db::ExecuteResult;
294            let resp = match result {
295                ExecuteResult::Ddl(info) => SqlResponse {
296                    result_type: info.statement_type,
297                    object_name: Some(info.object_name),
298                    rows_affected: None,
299                },
300                ExecuteResult::RowsAffected(n) => SqlResponse {
301                    result_type: "rows_affected".to_string(),
302                    object_name: None,
303                    rows_affected: Some(n),
304                },
305                ExecuteResult::Query(_) => SqlResponse {
306                    result_type: "query".to_string(),
307                    object_name: None,
308                    rows_affected: None,
309                },
310                ExecuteResult::Metadata(_) => SqlResponse {
311                    result_type: "metadata".to_string(),
312                    object_name: None,
313                    rows_affected: None,
314                },
315            };
316            Json(resp).into_response()
317        }
318        Err(e) => error_response(StatusCode::BAD_REQUEST, e.to_string()).into_response(),
319    }
320}
321
322async fn handle_reload(State(state): State<Arc<AppState>>) -> impl IntoResponse {
323    // Acquire concurrency guard
324    let _guard = match state.reload_guard.try_acquire() {
325        Some(g) => g,
326        None => {
327            return error_response(StatusCode::CONFLICT, "a reload is already in progress")
328                .into_response();
329        }
330    };
331
332    // Load and validate the new config
333    let new_config = match crate::config::load_config(&state.config_path) {
334        Ok(c) => c,
335        Err(e) => {
336            warn!("Reload failed: config error: {e}");
337            return error_response(StatusCode::BAD_REQUEST, e.to_string()).into_response();
338        }
339    };
340
341    // Diff against current config
342    // Tight guard scope so the `!Send` parking_lot guard doesn't cross
343    // the next `.await`.
344    let diff = {
345        let current = state.current_config.read();
346        reload::diff_configs(&current, &new_config)
347    };
348
349    if diff.is_empty() && diff.warnings.is_empty() {
350        return Json(reload::ReloadResult {
351            success: true,
352            applied: vec![],
353            failed: vec![],
354            warnings: vec!["no changes detected".to_string()],
355        })
356        .into_response();
357    }
358
359    // Apply the diff
360    let result = reload::apply_reload(&state.db, &diff).await;
361
362    // Update metrics
363    state.server_metrics.reload_total.inc();
364
365    // Update current config on success
366    if result.success {
367        let mut current = state.current_config.write();
368        *current = new_config;
369        info!(
370            "Configuration reloaded successfully ({} ops)",
371            result.applied.len()
372        );
373    } else {
374        warn!(
375            "Configuration reload partially failed: {} applied, {} failed",
376            result.applied.len(),
377            result.failed.len()
378        );
379    }
380
381    let status = if result.success {
382        StatusCode::OK
383    } else {
384        StatusCode::MULTI_STATUS
385    };
386
387    (status, Json(result)).into_response()
388}
389
390async fn cluster_status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
391    let config = state.current_config.read();
392    if config.server.mode != "cluster" {
393        return error_response(
394            StatusCode::NOT_FOUND,
395            "cluster endpoint is only available when server.mode = \"cluster\"",
396        )
397        .into_response();
398    }
399
400    let node_id = config.node_id.clone().unwrap_or_default();
401    drop(config);
402
403    #[derive(Serialize)]
404    struct ClusterStatusResponse {
405        mode: &'static str,
406        node_id: String,
407        pipeline_state: &'static str,
408    }
409
410    let pipeline_state = state.db.pipeline_state();
411    Json(ClusterStatusResponse {
412        mode: "cluster",
413        node_id,
414        pipeline_state,
415    })
416    .into_response()
417}
418
419// ---------------------------------------------------------------------------
420// WebSocket stream subscriptions
421// ---------------------------------------------------------------------------
422
423const MAX_WS_CONNECTIONS: i64 = 10_000;
424const WS_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
425
426async fn ws_upgrade(
427    State(state): State<Arc<AppState>>,
428    Path(name): Path<String>,
429    ws: WebSocketUpgrade,
430) -> impl IntoResponse {
431    if state.server_metrics.ws_connections.get() >= MAX_WS_CONNECTIONS {
432        return error_response(
433            StatusCode::SERVICE_UNAVAILABLE,
434            "too many WebSocket connections".to_string(),
435        )
436        .into_response();
437    }
438
439    if !state.db.streams().iter().any(|s| s.name == name) {
440        return error_response(StatusCode::NOT_FOUND, format!("stream '{name}' not found"))
441            .into_response();
442    }
443
444    // Each WS client gets its own broadcast subscription — fan-out is in the Sink.
445    let sub = match state.db.subscribe_raw(&name) {
446        Ok(s) => s,
447        Err(_) => {
448            return error_response(
449                StatusCode::INTERNAL_SERVER_ERROR,
450                format!("failed to subscribe to '{name}'"),
451            )
452            .into_response();
453        }
454    };
455
456    let st = Arc::clone(&state);
457    ws.on_upgrade(move |socket| async move {
458        st.server_metrics.ws_connections.inc();
459        ws_client(socket, sub, name).await;
460        st.server_metrics.ws_connections.dec();
461    })
462    .into_response()
463}
464
465async fn ws_client(
466    mut socket: WebSocket,
467    mut sub: laminar_core::streaming::Subscription<laminar_db::ArrowRecord>,
468    name: String,
469) {
470    let mut heartbeat = tokio::time::interval(WS_HEARTBEAT_INTERVAL);
471    heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
472    let mut seq: u64 = 0;
473
474    loop {
475        tokio::select! {
476            biased;
477            result = sub.recv_async() => {
478                match result {
479                    Ok(batch) => {
480                        let json = match batch_to_json(&batch) {
481                            Ok(j) => j,
482                            Err(e) => {
483                                warn!(stream = %name, error = %e, "serialize error");
484                                continue;
485                            }
486                        };
487                        let out = serde_json::json!({
488                            "type": "data",
489                            "subscription_id": &name,
490                            "data": json,
491                            "sequence": seq,
492                        });
493                        seq += 1;
494                        if socket.send(Message::Text(out.to_string().into())).await.is_err() {
495                            break;
496                        }
497                    }
498                    Err(_) => break, // disconnected
499                }
500            }
501            _ = heartbeat.tick() => {
502                let hb = serde_json::json!({
503                    "type": "heartbeat",
504                    "server_time": chrono::Utc::now().timestamp_millis(),
505                });
506                if socket.send(Message::Text(hb.to_string().into())).await.is_err() {
507                    break;
508                }
509            }
510            msg = socket.recv() => {
511                // `data` moves into `Pong`, so the inner `if` can't fold into the guard.
512                #[allow(clippy::collapsible_match)]
513                match msg {
514                    Some(Ok(Message::Close(_))) | None => break,
515                    Some(Ok(Message::Ping(data))) => {
516                        if socket.send(Message::Pong(data)).await.is_err() { break; }
517                    }
518                    _ => {}
519                }
520            }
521        }
522    }
523}
524
525fn batch_to_json(batch: &arrow_array::RecordBatch) -> Result<serde_json::Value, String> {
526    let mut buf = Vec::new();
527    let mut writer = arrow_json::ArrayWriter::new(&mut buf);
528    writer.write(batch).map_err(|e| e.to_string())?;
529    writer.finish().map_err(|e| e.to_string())?;
530    Ok(serde_json::from_slice(&buf).unwrap_or(serde_json::Value::Array(vec![])))
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use axum::body::Body;
537    use axum::http::Request;
538    use tower::ServiceExt;
539
540    fn test_state() -> Arc<AppState> {
541        let registry = Arc::new(crate::metrics::build_registry([
542            ("instance".into(), "test".into()),
543            ("pipeline".into(), "test".into()),
544        ]));
545        let engine_metrics = Arc::new(laminar_db::EngineMetrics::new(&registry));
546        let db = Arc::new(LaminarDB::open().unwrap());
547        db.set_engine_metrics(engine_metrics);
548        let server_metrics = crate::metrics::ServerMetrics::new(&registry);
549        Arc::new(AppState {
550            db,
551            config_path: PathBuf::from("test.toml"),
552            current_config: parking_lot::RwLock::new(crate::config::ServerConfig {
553                server: crate::config::ServerSection::default(),
554                state: laminar_core::state::StateBackendConfig::default(),
555                checkpoint: crate::config::CheckpointSection::default(),
556                sources: vec![],
557                lookups: vec![],
558                pipelines: vec![],
559                sinks: vec![],
560                discovery: None,
561                coordination: None,
562                node_id: None,
563                sql: None,
564                ai: Default::default(),
565                models: Default::default(),
566            }),
567            reload_guard: ReloadGuard::new(),
568
569            registry,
570            server_metrics,
571        })
572    }
573
574    #[tokio::test]
575    async fn test_health_check() {
576        let state = test_state();
577        let app = build_router(state);
578
579        let req = Request::builder()
580            .uri("/health")
581            .body(Body::empty())
582            .unwrap();
583        let resp = app.oneshot(req).await.unwrap();
584        assert_eq!(resp.status(), StatusCode::OK);
585
586        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
587            .await
588            .unwrap();
589        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
590        assert_eq!(json["status"], "healthy");
591        assert!(json["version"].is_string());
592    }
593
594    #[tokio::test]
595    async fn test_readiness_not_running() {
596        let state = test_state();
597        let app = build_router(state);
598
599        let req = Request::builder()
600            .uri("/ready")
601            .body(Body::empty())
602            .unwrap();
603        let resp = app.oneshot(req).await.unwrap();
604        // Pipeline is in Created state, not Running
605        assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
606    }
607
608    #[tokio::test]
609    async fn test_metrics() {
610        let state = test_state();
611        let app = build_router(state);
612
613        let req = Request::builder()
614            .uri("/metrics")
615            .body(Body::empty())
616            .unwrap();
617        let resp = app.oneshot(req).await.unwrap();
618        assert_eq!(resp.status(), StatusCode::OK);
619
620        let ct = resp
621            .headers()
622            .get("content-type")
623            .unwrap()
624            .to_str()
625            .unwrap();
626        assert!(
627            ct.contains("text/plain"),
628            "expected text/plain content-type, got: {ct}"
629        );
630
631        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
632            .await
633            .unwrap();
634        let text = String::from_utf8(body.to_vec()).unwrap();
635        assert!(
636            text.contains("laminardb_events_ingested_total"),
637            "missing events_ingested_total"
638        );
639        assert!(
640            text.contains("laminardb_cycles_total"),
641            "missing cycles_total"
642        );
643        assert!(
644            text.contains("laminardb_checkpoints_completed_total"),
645            "missing checkpoints_completed_total"
646        );
647    }
648
649    #[tokio::test]
650    async fn test_list_sources_empty() {
651        let state = test_state();
652        let app = build_router(state);
653
654        let req = Request::builder()
655            .uri("/api/v1/sources")
656            .body(Body::empty())
657            .unwrap();
658        let resp = app.oneshot(req).await.unwrap();
659        assert_eq!(resp.status(), StatusCode::OK);
660
661        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
662            .await
663            .unwrap();
664        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
665        assert!(json.as_array().unwrap().is_empty());
666    }
667
668    #[tokio::test]
669    async fn test_list_sinks_empty() {
670        let state = test_state();
671        let app = build_router(state);
672
673        let req = Request::builder()
674            .uri("/api/v1/sinks")
675            .body(Body::empty())
676            .unwrap();
677        let resp = app.oneshot(req).await.unwrap();
678        assert_eq!(resp.status(), StatusCode::OK);
679    }
680
681    #[tokio::test]
682    async fn test_list_streams_empty() {
683        let state = test_state();
684        let app = build_router(state);
685
686        let req = Request::builder()
687            .uri("/api/v1/streams")
688            .body(Body::empty())
689            .unwrap();
690        let resp = app.oneshot(req).await.unwrap();
691        assert_eq!(resp.status(), StatusCode::OK);
692    }
693
694    #[tokio::test]
695    async fn test_get_stream_not_found() {
696        let state = test_state();
697        let app = build_router(state);
698
699        let req = Request::builder()
700            .uri("/api/v1/streams/nonexistent")
701            .body(Body::empty())
702            .unwrap();
703        let resp = app.oneshot(req).await.unwrap();
704        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
705    }
706
707    #[tokio::test]
708    async fn test_execute_sql_create_source() {
709        let state = test_state();
710        let app = build_router(state);
711
712        let req = Request::builder()
713            .method("POST")
714            .uri("/api/v1/sql")
715            .header("content-type", "application/json")
716            .body(Body::from(
717                serde_json::to_string(&serde_json::json!({
718                    "sql": "CREATE SOURCE test_src (id BIGINT, name VARCHAR)"
719                }))
720                .unwrap(),
721            ))
722            .unwrap();
723        let resp = app.oneshot(req).await.unwrap();
724        assert_eq!(resp.status(), StatusCode::OK);
725
726        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
727            .await
728            .unwrap();
729        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
730        assert_eq!(json["result_type"], "CREATE SOURCE");
731    }
732
733    #[tokio::test]
734    async fn test_execute_sql_invalid() {
735        let state = test_state();
736        let app = build_router(state);
737
738        let req = Request::builder()
739            .method("POST")
740            .uri("/api/v1/sql")
741            .header("content-type", "application/json")
742            .body(Body::from(
743                serde_json::to_string(&serde_json::json!({
744                    "sql": "NOT VALID SQL AT ALL BLAH"
745                }))
746                .unwrap(),
747            ))
748            .unwrap();
749        let resp = app.oneshot(req).await.unwrap();
750        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
751    }
752
753    #[tokio::test]
754    async fn test_reload_invalid_config_path() {
755        // test_state has config_path = "test.toml" which doesn't exist → 400
756        let state = test_state();
757        let app = build_router(state);
758
759        let req = Request::builder()
760            .method("POST")
761            .uri("/api/v1/reload")
762            .body(Body::empty())
763            .unwrap();
764        let resp = app.oneshot(req).await.unwrap();
765        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
766    }
767
768    #[tokio::test]
769    async fn test_reload_concurrent_returns_conflict() {
770        let state = test_state();
771        // Hold the guard before making the request
772        let _guard = state.reload_guard.try_acquire().unwrap();
773
774        let app = build_router(state);
775        let req = Request::builder()
776            .method("POST")
777            .uri("/api/v1/reload")
778            .body(Body::empty())
779            .unwrap();
780        let resp = app.oneshot(req).await.unwrap();
781        assert_eq!(resp.status(), StatusCode::CONFLICT);
782    }
783
784    #[tokio::test]
785    async fn test_reload_with_valid_config() {
786        use std::io::Write;
787
788        // Create a real temp config file
789        let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
790        writeln!(tmpfile, "[server]").unwrap();
791        let path = tmpfile.path().to_path_buf();
792
793        let registry = Arc::new(crate::metrics::build_registry([
794            ("instance".into(), "test".into()),
795            ("pipeline".into(), "test".into()),
796        ]));
797        let db = Arc::new(LaminarDB::open().unwrap());
798        let engine_metrics = Arc::new(laminar_db::EngineMetrics::new(&registry));
799        db.set_engine_metrics(engine_metrics);
800        let server_metrics = crate::metrics::ServerMetrics::new(&registry);
801        let state = Arc::new(AppState {
802            db,
803            config_path: path,
804            current_config: parking_lot::RwLock::new(crate::config::ServerConfig {
805                server: crate::config::ServerSection::default(),
806                state: laminar_core::state::StateBackendConfig::default(),
807                checkpoint: crate::config::CheckpointSection::default(),
808                sources: vec![],
809                lookups: vec![],
810                pipelines: vec![],
811                sinks: vec![],
812                discovery: None,
813                coordination: None,
814                node_id: None,
815                sql: None,
816                ai: Default::default(),
817                models: Default::default(),
818            }),
819            reload_guard: ReloadGuard::new(),
820
821            registry,
822            server_metrics,
823        });
824
825        let app = build_router(state.clone());
826        let req = Request::builder()
827            .method("POST")
828            .uri("/api/v1/reload")
829            .body(Body::empty())
830            .unwrap();
831        let resp = app.oneshot(req).await.unwrap();
832        assert_eq!(resp.status(), StatusCode::OK);
833
834        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
835            .await
836            .unwrap();
837        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
838        assert_eq!(json["success"], true);
839    }
840
841    #[tokio::test]
842    async fn test_metrics_contains_help_and_type() {
843        let state = test_state();
844        let app = build_router(state);
845
846        let req = Request::builder()
847            .uri("/metrics")
848            .body(Body::empty())
849            .unwrap();
850        let resp = app.oneshot(req).await.unwrap();
851        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
852            .await
853            .unwrap();
854        let text = String::from_utf8(body.to_vec()).unwrap();
855        // Prometheus text format includes HELP and TYPE annotations
856        assert!(text.contains("# HELP"), "missing # HELP annotation");
857        assert!(text.contains("# TYPE"), "missing # TYPE annotation");
858    }
859}