1use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Instant;
6
7use prometheus::Registry;
8
9use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
10use axum::extract::{Path, State};
11use axum::http::StatusCode;
12use axum::response::IntoResponse;
13use axum::routing::{get, post};
14use axum::{Json, Router};
15use serde::{Deserialize, Serialize};
16use tower_http::cors::CorsLayer;
17use tracing::{info, warn};
18
19use laminar_db::LaminarDB;
20
21use crate::config::ServerConfig;
22use crate::metrics::ServerMetrics;
23use crate::reload::{self, ReloadGuard};
24use crate::server::ServerError;
25
26pub struct AppState {
27 pub db: Arc<LaminarDB>,
28 pub config_path: PathBuf,
29 pub current_config: parking_lot::RwLock<ServerConfig>,
33 pub reload_guard: ReloadGuard,
34 pub registry: Arc<Registry>,
35 pub server_metrics: ServerMetrics,
36}
37
38pub fn build_router(state: Arc<AppState>) -> Router {
39 Router::new()
40 .route("/health", get(health_check))
41 .route("/ready", get(readiness_check))
42 .route("/metrics", get(prometheus_metrics))
43 .route("/api/v1/sources", get(list_sources))
44 .route("/api/v1/sinks", get(list_sinks))
45 .route("/api/v1/streams", get(list_streams))
46 .route("/api/v1/streams/{name}", get(get_stream))
47 .route("/api/v1/checkpoint", post(trigger_checkpoint))
48 .route("/api/v1/sql", post(execute_sql))
49 .route("/api/v1/reload", post(handle_reload))
50 .route("/api/v1/cluster", get(cluster_status))
51 .route("/ws/{name}", get(ws_upgrade))
52 .layer(CorsLayer::permissive())
53 .layer(axum::middleware::from_fn(request_logging))
54 .with_state(state)
55}
56
57pub async fn serve(router: Router, bind: &str) -> Result<tokio::task::JoinHandle<()>, ServerError> {
58 let listener = tokio::net::TcpListener::bind(bind)
59 .await
60 .map_err(|e| ServerError::Http(format!("failed to bind to {bind}: {e}")))?;
61
62 let handle = tokio::spawn(async move {
63 if let Err(e) = axum::serve(listener, router).await {
64 tracing::error!("HTTP server error: {e}");
65 }
66 });
67
68 Ok(handle)
69}
70
71#[derive(Debug, Serialize)]
73struct HealthResponse {
74 status: &'static str,
75 version: &'static str,
76 pipeline_state: &'static str,
77}
78
79#[derive(Debug, Serialize)]
80struct SourceResponse {
81 name: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 watermark_column: Option<String>,
84}
85
86#[derive(Debug, Serialize)]
87struct StreamResponse {
88 name: String,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 sql: Option<String>,
91}
92
93#[derive(Debug, Serialize)]
94struct SinkResponse {
95 name: String,
96}
97
98#[derive(Debug, Serialize)]
99struct CheckpointResponse {
100 success: bool,
101 checkpoint_id: u64,
102 epoch: u64,
103 duration_ms: u64,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 error: Option<String>,
106}
107
108#[derive(Debug, Deserialize)]
109struct SqlRequest {
110 sql: String,
111}
112
113#[derive(Debug, Serialize)]
114struct SqlResponse {
115 result_type: String,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 object_name: Option<String>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 rows_affected: Option<u64>,
120}
121
122#[derive(Debug, Serialize)]
123struct ErrorBody {
124 error: String,
125}
126
127fn error_response(status: StatusCode, msg: impl Into<String>) -> impl IntoResponse {
128 (status, Json(ErrorBody { error: msg.into() }))
129}
130
131async fn request_logging(
132 req: axum::http::Request<axum::body::Body>,
133 next: axum::middleware::Next,
134) -> impl IntoResponse {
135 let method = req.method().clone();
136 let uri = req.uri().clone();
137 let start = Instant::now();
138
139 let response = next.run(req).await;
140
141 let duration_ms = start.elapsed().as_millis();
142 let status = response.status();
143 info!("{method} {uri} -> {status} ({duration_ms}ms)");
144
145 response
146}
147
148async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
149 let pipeline_state = state.db.pipeline_state();
150 let status = if pipeline_state == "Stopped" {
151 StatusCode::SERVICE_UNAVAILABLE
152 } else {
153 StatusCode::OK
154 };
155
156 (
157 status,
158 Json(HealthResponse {
159 status: if status == StatusCode::OK {
160 "healthy"
161 } else {
162 "unhealthy"
163 },
164 version: env!("CARGO_PKG_VERSION"),
165 pipeline_state,
166 }),
167 )
168}
169
170async fn readiness_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
171 let pipeline_state = state.db.pipeline_state();
172 if pipeline_state == "Running" {
173 (
174 StatusCode::OK,
175 Json(HealthResponse {
176 status: "ready",
177 version: env!("CARGO_PKG_VERSION"),
178 pipeline_state,
179 }),
180 )
181 .into_response()
182 } else {
183 error_response(
184 StatusCode::SERVICE_UNAVAILABLE,
185 format!("pipeline is {pipeline_state}, not Running"),
186 )
187 .into_response()
188 }
189}
190
191async fn prometheus_metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
192 #[allow(clippy::cast_possible_wrap)]
194 state
195 .server_metrics
196 .uptime_seconds
197 .set(state.db.uptime().as_secs() as i64);
198
199 (
200 StatusCode::OK,
201 [(
202 axum::http::header::CONTENT_TYPE,
203 "text/plain; version=0.0.4; charset=utf-8",
204 )],
205 crate::metrics::render(&state.registry),
206 )
207}
208
209async fn list_sources(State(state): State<Arc<AppState>>) -> impl IntoResponse {
210 let sources: Vec<SourceResponse> = state
211 .db
212 .sources()
213 .into_iter()
214 .map(|s| SourceResponse {
215 name: s.name,
216 watermark_column: s.watermark_column,
217 })
218 .collect();
219 Json(sources)
220}
221
222async fn list_sinks(State(state): State<Arc<AppState>>) -> impl IntoResponse {
223 let sinks: Vec<SinkResponse> = state
224 .db
225 .sinks()
226 .into_iter()
227 .map(|s| SinkResponse { name: s.name })
228 .collect();
229 Json(sinks)
230}
231
232async fn list_streams(State(state): State<Arc<AppState>>) -> impl IntoResponse {
233 let streams: Vec<StreamResponse> = state
234 .db
235 .streams()
236 .into_iter()
237 .map(|s| StreamResponse {
238 name: s.name,
239 sql: s.sql,
240 })
241 .collect();
242 Json(streams)
243}
244
245async fn get_stream(
246 State(state): State<Arc<AppState>>,
247 Path(name): Path<String>,
248) -> impl IntoResponse {
249 let streams = state.db.streams();
250 match streams.into_iter().find(|s| s.name == name) {
251 Some(s) => Json(StreamResponse {
252 name: s.name,
253 sql: s.sql,
254 })
255 .into_response(),
256 None => error_response(StatusCode::NOT_FOUND, format!("stream '{name}' not found"))
257 .into_response(),
258 }
259}
260
261async fn trigger_checkpoint(State(state): State<Arc<AppState>>) -> impl IntoResponse {
262 match state.db.checkpoint().await {
263 Ok(result) => {
264 let status = if result.success {
265 StatusCode::OK
266 } else {
267 StatusCode::INTERNAL_SERVER_ERROR
268 };
269 #[allow(clippy::cast_possible_truncation)]
270 let duration_ms = result.duration.as_millis() as u64;
271 (
272 status,
273 Json(CheckpointResponse {
274 success: result.success,
275 checkpoint_id: result.checkpoint_id,
276 epoch: result.epoch,
277 duration_ms,
278 error: result.error,
279 }),
280 )
281 .into_response()
282 }
283 Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
284 }
285}
286
287async fn execute_sql(
288 State(state): State<Arc<AppState>>,
289 Json(req): Json<SqlRequest>,
290) -> impl IntoResponse {
291 match state.db.execute(&req.sql).await {
292 Ok(result) => {
293 use laminar_db::ExecuteResult;
294 let resp = match result {
295 ExecuteResult::Ddl(info) => SqlResponse {
296 result_type: info.statement_type,
297 object_name: Some(info.object_name),
298 rows_affected: None,
299 },
300 ExecuteResult::RowsAffected(n) => SqlResponse {
301 result_type: "rows_affected".to_string(),
302 object_name: None,
303 rows_affected: Some(n),
304 },
305 ExecuteResult::Query(_) => SqlResponse {
306 result_type: "query".to_string(),
307 object_name: None,
308 rows_affected: None,
309 },
310 ExecuteResult::Metadata(_) => SqlResponse {
311 result_type: "metadata".to_string(),
312 object_name: None,
313 rows_affected: None,
314 },
315 };
316 Json(resp).into_response()
317 }
318 Err(e) => error_response(StatusCode::BAD_REQUEST, e.to_string()).into_response(),
319 }
320}
321
322async fn handle_reload(State(state): State<Arc<AppState>>) -> impl IntoResponse {
323 let _guard = match state.reload_guard.try_acquire() {
325 Some(g) => g,
326 None => {
327 return error_response(StatusCode::CONFLICT, "a reload is already in progress")
328 .into_response();
329 }
330 };
331
332 let new_config = match crate::config::load_config(&state.config_path) {
334 Ok(c) => c,
335 Err(e) => {
336 warn!("Reload failed: config error: {e}");
337 return error_response(StatusCode::BAD_REQUEST, e.to_string()).into_response();
338 }
339 };
340
341 let diff = {
345 let current = state.current_config.read();
346 reload::diff_configs(¤t, &new_config)
347 };
348
349 if diff.is_empty() && diff.warnings.is_empty() {
350 return Json(reload::ReloadResult {
351 success: true,
352 applied: vec![],
353 failed: vec![],
354 warnings: vec!["no changes detected".to_string()],
355 })
356 .into_response();
357 }
358
359 let result = reload::apply_reload(&state.db, &diff).await;
361
362 state.server_metrics.reload_total.inc();
364
365 if result.success {
367 let mut current = state.current_config.write();
368 *current = new_config;
369 info!(
370 "Configuration reloaded successfully ({} ops)",
371 result.applied.len()
372 );
373 } else {
374 warn!(
375 "Configuration reload partially failed: {} applied, {} failed",
376 result.applied.len(),
377 result.failed.len()
378 );
379 }
380
381 let status = if result.success {
382 StatusCode::OK
383 } else {
384 StatusCode::MULTI_STATUS
385 };
386
387 (status, Json(result)).into_response()
388}
389
390async fn cluster_status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
391 let config = state.current_config.read();
392 if config.server.mode != "cluster" {
393 return error_response(
394 StatusCode::NOT_FOUND,
395 "cluster endpoint is only available when server.mode = \"cluster\"",
396 )
397 .into_response();
398 }
399
400 let node_id = config.node_id.clone().unwrap_or_default();
401 drop(config);
402
403 #[derive(Serialize)]
404 struct ClusterStatusResponse {
405 mode: &'static str,
406 node_id: String,
407 pipeline_state: &'static str,
408 }
409
410 let pipeline_state = state.db.pipeline_state();
411 Json(ClusterStatusResponse {
412 mode: "cluster",
413 node_id,
414 pipeline_state,
415 })
416 .into_response()
417}
418
419const MAX_WS_CONNECTIONS: i64 = 10_000;
424const WS_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
425
426async fn ws_upgrade(
427 State(state): State<Arc<AppState>>,
428 Path(name): Path<String>,
429 ws: WebSocketUpgrade,
430) -> impl IntoResponse {
431 if state.server_metrics.ws_connections.get() >= MAX_WS_CONNECTIONS {
432 return error_response(
433 StatusCode::SERVICE_UNAVAILABLE,
434 "too many WebSocket connections".to_string(),
435 )
436 .into_response();
437 }
438
439 if !state.db.streams().iter().any(|s| s.name == name) {
440 return error_response(StatusCode::NOT_FOUND, format!("stream '{name}' not found"))
441 .into_response();
442 }
443
444 let sub = match state.db.subscribe_raw(&name) {
446 Ok(s) => s,
447 Err(_) => {
448 return error_response(
449 StatusCode::INTERNAL_SERVER_ERROR,
450 format!("failed to subscribe to '{name}'"),
451 )
452 .into_response();
453 }
454 };
455
456 let st = Arc::clone(&state);
457 ws.on_upgrade(move |socket| async move {
458 st.server_metrics.ws_connections.inc();
459 ws_client(socket, sub, name).await;
460 st.server_metrics.ws_connections.dec();
461 })
462 .into_response()
463}
464
465async fn ws_client(
466 mut socket: WebSocket,
467 mut sub: laminar_core::streaming::Subscription<laminar_db::ArrowRecord>,
468 name: String,
469) {
470 let mut heartbeat = tokio::time::interval(WS_HEARTBEAT_INTERVAL);
471 heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
472 let mut seq: u64 = 0;
473
474 loop {
475 tokio::select! {
476 biased;
477 result = sub.recv_async() => {
478 match result {
479 Ok(batch) => {
480 let json = match batch_to_json(&batch) {
481 Ok(j) => j,
482 Err(e) => {
483 warn!(stream = %name, error = %e, "serialize error");
484 continue;
485 }
486 };
487 let out = serde_json::json!({
488 "type": "data",
489 "subscription_id": &name,
490 "data": json,
491 "sequence": seq,
492 });
493 seq += 1;
494 if socket.send(Message::Text(out.to_string().into())).await.is_err() {
495 break;
496 }
497 }
498 Err(_) => break, }
500 }
501 _ = heartbeat.tick() => {
502 let hb = serde_json::json!({
503 "type": "heartbeat",
504 "server_time": chrono::Utc::now().timestamp_millis(),
505 });
506 if socket.send(Message::Text(hb.to_string().into())).await.is_err() {
507 break;
508 }
509 }
510 msg = socket.recv() => {
511 #[allow(clippy::collapsible_match)]
513 match msg {
514 Some(Ok(Message::Close(_))) | None => break,
515 Some(Ok(Message::Ping(data))) => {
516 if socket.send(Message::Pong(data)).await.is_err() { break; }
517 }
518 _ => {}
519 }
520 }
521 }
522 }
523}
524
525fn batch_to_json(batch: &arrow_array::RecordBatch) -> Result<serde_json::Value, String> {
526 let mut buf = Vec::new();
527 let mut writer = arrow_json::ArrayWriter::new(&mut buf);
528 writer.write(batch).map_err(|e| e.to_string())?;
529 writer.finish().map_err(|e| e.to_string())?;
530 Ok(serde_json::from_slice(&buf).unwrap_or(serde_json::Value::Array(vec![])))
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use axum::body::Body;
537 use axum::http::Request;
538 use tower::ServiceExt;
539
540 fn test_state() -> Arc<AppState> {
541 let registry = Arc::new(crate::metrics::build_registry([
542 ("instance".into(), "test".into()),
543 ("pipeline".into(), "test".into()),
544 ]));
545 let engine_metrics = Arc::new(laminar_db::EngineMetrics::new(®istry));
546 let db = Arc::new(LaminarDB::open().unwrap());
547 db.set_engine_metrics(engine_metrics);
548 let server_metrics = crate::metrics::ServerMetrics::new(®istry);
549 Arc::new(AppState {
550 db,
551 config_path: PathBuf::from("test.toml"),
552 current_config: parking_lot::RwLock::new(crate::config::ServerConfig {
553 server: crate::config::ServerSection::default(),
554 state: laminar_core::state::StateBackendConfig::default(),
555 checkpoint: crate::config::CheckpointSection::default(),
556 sources: vec![],
557 lookups: vec![],
558 pipelines: vec![],
559 sinks: vec![],
560 discovery: None,
561 coordination: None,
562 node_id: None,
563 sql: None,
564 ai: Default::default(),
565 models: Default::default(),
566 }),
567 reload_guard: ReloadGuard::new(),
568
569 registry,
570 server_metrics,
571 })
572 }
573
574 #[tokio::test]
575 async fn test_health_check() {
576 let state = test_state();
577 let app = build_router(state);
578
579 let req = Request::builder()
580 .uri("/health")
581 .body(Body::empty())
582 .unwrap();
583 let resp = app.oneshot(req).await.unwrap();
584 assert_eq!(resp.status(), StatusCode::OK);
585
586 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
587 .await
588 .unwrap();
589 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
590 assert_eq!(json["status"], "healthy");
591 assert!(json["version"].is_string());
592 }
593
594 #[tokio::test]
595 async fn test_readiness_not_running() {
596 let state = test_state();
597 let app = build_router(state);
598
599 let req = Request::builder()
600 .uri("/ready")
601 .body(Body::empty())
602 .unwrap();
603 let resp = app.oneshot(req).await.unwrap();
604 assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
606 }
607
608 #[tokio::test]
609 async fn test_metrics() {
610 let state = test_state();
611 let app = build_router(state);
612
613 let req = Request::builder()
614 .uri("/metrics")
615 .body(Body::empty())
616 .unwrap();
617 let resp = app.oneshot(req).await.unwrap();
618 assert_eq!(resp.status(), StatusCode::OK);
619
620 let ct = resp
621 .headers()
622 .get("content-type")
623 .unwrap()
624 .to_str()
625 .unwrap();
626 assert!(
627 ct.contains("text/plain"),
628 "expected text/plain content-type, got: {ct}"
629 );
630
631 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
632 .await
633 .unwrap();
634 let text = String::from_utf8(body.to_vec()).unwrap();
635 assert!(
636 text.contains("laminardb_events_ingested_total"),
637 "missing events_ingested_total"
638 );
639 assert!(
640 text.contains("laminardb_cycles_total"),
641 "missing cycles_total"
642 );
643 assert!(
644 text.contains("laminardb_checkpoints_completed_total"),
645 "missing checkpoints_completed_total"
646 );
647 }
648
649 #[tokio::test]
650 async fn test_list_sources_empty() {
651 let state = test_state();
652 let app = build_router(state);
653
654 let req = Request::builder()
655 .uri("/api/v1/sources")
656 .body(Body::empty())
657 .unwrap();
658 let resp = app.oneshot(req).await.unwrap();
659 assert_eq!(resp.status(), StatusCode::OK);
660
661 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
662 .await
663 .unwrap();
664 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
665 assert!(json.as_array().unwrap().is_empty());
666 }
667
668 #[tokio::test]
669 async fn test_list_sinks_empty() {
670 let state = test_state();
671 let app = build_router(state);
672
673 let req = Request::builder()
674 .uri("/api/v1/sinks")
675 .body(Body::empty())
676 .unwrap();
677 let resp = app.oneshot(req).await.unwrap();
678 assert_eq!(resp.status(), StatusCode::OK);
679 }
680
681 #[tokio::test]
682 async fn test_list_streams_empty() {
683 let state = test_state();
684 let app = build_router(state);
685
686 let req = Request::builder()
687 .uri("/api/v1/streams")
688 .body(Body::empty())
689 .unwrap();
690 let resp = app.oneshot(req).await.unwrap();
691 assert_eq!(resp.status(), StatusCode::OK);
692 }
693
694 #[tokio::test]
695 async fn test_get_stream_not_found() {
696 let state = test_state();
697 let app = build_router(state);
698
699 let req = Request::builder()
700 .uri("/api/v1/streams/nonexistent")
701 .body(Body::empty())
702 .unwrap();
703 let resp = app.oneshot(req).await.unwrap();
704 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
705 }
706
707 #[tokio::test]
708 async fn test_execute_sql_create_source() {
709 let state = test_state();
710 let app = build_router(state);
711
712 let req = Request::builder()
713 .method("POST")
714 .uri("/api/v1/sql")
715 .header("content-type", "application/json")
716 .body(Body::from(
717 serde_json::to_string(&serde_json::json!({
718 "sql": "CREATE SOURCE test_src (id BIGINT, name VARCHAR)"
719 }))
720 .unwrap(),
721 ))
722 .unwrap();
723 let resp = app.oneshot(req).await.unwrap();
724 assert_eq!(resp.status(), StatusCode::OK);
725
726 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
727 .await
728 .unwrap();
729 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
730 assert_eq!(json["result_type"], "CREATE SOURCE");
731 }
732
733 #[tokio::test]
734 async fn test_execute_sql_invalid() {
735 let state = test_state();
736 let app = build_router(state);
737
738 let req = Request::builder()
739 .method("POST")
740 .uri("/api/v1/sql")
741 .header("content-type", "application/json")
742 .body(Body::from(
743 serde_json::to_string(&serde_json::json!({
744 "sql": "NOT VALID SQL AT ALL BLAH"
745 }))
746 .unwrap(),
747 ))
748 .unwrap();
749 let resp = app.oneshot(req).await.unwrap();
750 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
751 }
752
753 #[tokio::test]
754 async fn test_reload_invalid_config_path() {
755 let state = test_state();
757 let app = build_router(state);
758
759 let req = Request::builder()
760 .method("POST")
761 .uri("/api/v1/reload")
762 .body(Body::empty())
763 .unwrap();
764 let resp = app.oneshot(req).await.unwrap();
765 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
766 }
767
768 #[tokio::test]
769 async fn test_reload_concurrent_returns_conflict() {
770 let state = test_state();
771 let _guard = state.reload_guard.try_acquire().unwrap();
773
774 let app = build_router(state);
775 let req = Request::builder()
776 .method("POST")
777 .uri("/api/v1/reload")
778 .body(Body::empty())
779 .unwrap();
780 let resp = app.oneshot(req).await.unwrap();
781 assert_eq!(resp.status(), StatusCode::CONFLICT);
782 }
783
784 #[tokio::test]
785 async fn test_reload_with_valid_config() {
786 use std::io::Write;
787
788 let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
790 writeln!(tmpfile, "[server]").unwrap();
791 let path = tmpfile.path().to_path_buf();
792
793 let registry = Arc::new(crate::metrics::build_registry([
794 ("instance".into(), "test".into()),
795 ("pipeline".into(), "test".into()),
796 ]));
797 let db = Arc::new(LaminarDB::open().unwrap());
798 let engine_metrics = Arc::new(laminar_db::EngineMetrics::new(®istry));
799 db.set_engine_metrics(engine_metrics);
800 let server_metrics = crate::metrics::ServerMetrics::new(®istry);
801 let state = Arc::new(AppState {
802 db,
803 config_path: path,
804 current_config: parking_lot::RwLock::new(crate::config::ServerConfig {
805 server: crate::config::ServerSection::default(),
806 state: laminar_core::state::StateBackendConfig::default(),
807 checkpoint: crate::config::CheckpointSection::default(),
808 sources: vec![],
809 lookups: vec![],
810 pipelines: vec![],
811 sinks: vec![],
812 discovery: None,
813 coordination: None,
814 node_id: None,
815 sql: None,
816 ai: Default::default(),
817 models: Default::default(),
818 }),
819 reload_guard: ReloadGuard::new(),
820
821 registry,
822 server_metrics,
823 });
824
825 let app = build_router(state.clone());
826 let req = Request::builder()
827 .method("POST")
828 .uri("/api/v1/reload")
829 .body(Body::empty())
830 .unwrap();
831 let resp = app.oneshot(req).await.unwrap();
832 assert_eq!(resp.status(), StatusCode::OK);
833
834 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
835 .await
836 .unwrap();
837 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
838 assert_eq!(json["success"], true);
839 }
840
841 #[tokio::test]
842 async fn test_metrics_contains_help_and_type() {
843 let state = test_state();
844 let app = build_router(state);
845
846 let req = Request::builder()
847 .uri("/metrics")
848 .body(Body::empty())
849 .unwrap();
850 let resp = app.oneshot(req).await.unwrap();
851 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
852 .await
853 .unwrap();
854 let text = String::from_utf8(body.to_vec()).unwrap();
855 assert!(text.contains("# HELP"), "missing # HELP annotation");
857 assert!(text.contains("# TYPE"), "missing # TYPE annotation");
858 }
859}