Skip to main content

laminar_core/cluster/control/
tls.rs

1//! Process-wide mutual TLS for the cluster control plane (barrier, query,
2//! shuffle). One identity per node/process, installed once at startup.
3
4use std::sync::OnceLock;
5
6use tonic::transport::{Certificate, ClientTlsConfig, Endpoint, Identity, ServerTlsConfig};
7
8/// TLS material shared by every control-plane server and client in this process.
9pub struct ClusterTls {
10    server: ServerTlsConfig,
11    client: ClientTlsConfig,
12}
13
14impl ClusterTls {
15    /// Build mTLS configs from PEM: this node's `cert`+`key`, the `ca` that signed
16    /// every peer cert, and the `server_name` SAN peers are verified against.
17    #[must_use]
18    pub fn from_pem(cert: &[u8], key: &[u8], ca: &[u8], server_name: &str) -> Self {
19        let identity = Identity::from_pem(cert, key);
20        let ca = Certificate::from_pem(ca);
21        let server = ServerTlsConfig::new()
22            .identity(identity.clone())
23            .client_ca_root(ca.clone());
24        let client = ClientTlsConfig::new()
25            .ca_certificate(ca)
26            .identity(identity)
27            .domain_name(server_name.to_string());
28        Self { server, client }
29    }
30}
31
32static CLUSTER_TLS: OnceLock<ClusterTls> = OnceLock::new();
33
34/// Install the process-wide control-plane TLS. Call once at startup before any
35/// control-plane server or client is created; later calls are ignored.
36pub fn set_cluster_tls(tls: ClusterTls) {
37    let _ = CLUSTER_TLS.set(tls);
38}
39
40/// Server config for the shared control-plane / shuffle listeners.
41pub(crate) fn server_tls() -> Option<&'static ServerTlsConfig> {
42    CLUSTER_TLS.get().map(|t| &t.server)
43}
44
45/// Client endpoint for `host_port`, using control-plane TLS + `https` when
46/// installed, plaintext `http` otherwise.
47pub(crate) fn client_endpoint(host_port: &str) -> Result<Endpoint, String> {
48    let tls = CLUSTER_TLS.get();
49    let scheme = if tls.is_some() { "https" } else { "http" };
50    let endpoint =
51        Endpoint::from_shared(format!("{scheme}://{host_port}")).map_err(|e| e.to_string())?;
52    match tls {
53        Some(t) => endpoint
54            .tls_config(t.client.clone())
55            .map_err(|e| e.to_string()),
56        None => Ok(endpoint),
57    }
58}