Skip to main content

laminar_core/cluster/
testing.rs

1//! MiniCluster — in-process harness for cluster-control integration
2//! tests.
3//!
4//! Spawns N chitchat instances on loopback UDP, each wrapped in a
5//! [`GossipDiscovery`](crate::cluster::discovery::GossipDiscovery) +
6//! [`ClusterController`](crate::cluster::control::ClusterController) pair.
7//! Shared by the integration test matrix in
8//! `tests/cluster_integration.rs`; designed to be reusable from
9//! downstream crates (laminar-db, laminar-server) as dev-dependency.
10//!
11//! ## Scope
12//!
13//! - Real chitchat gossip; `gossip_interval` tuned down to 50 ms to
14//!   keep convergence under a second.
15//! - Loopback UDP only; each node binds to `127.0.0.1:0` (ephemeral)
16//!   and records the assigned port before handing off to chitchat.
17//! - No LaminarDB engine — just the cluster primitives. Full engine
18//!   harness is a follow-up when the checkpoint flow consumes the
19//!   controller.
20//!
21//! ## Not production API
22//!
23//! Gated on `cluster-unstable`. The APIs here are test helpers;
24//! expect churn.
25
26use std::net::{SocketAddr, UdpSocket};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30use async_trait::async_trait;
31use chitchat::transport::{Socket, Transport, UdpTransport};
32use object_store::{
33    path::Path as OsPath, CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
34    ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
35};
36use parking_lot::Mutex;
37use rustc_hash::FxHashSet;
38use tokio::sync::watch;
39
40use super::control::{AssignmentSnapshotStore, ChitchatKv, ClusterController, ClusterKv};
41use super::discovery::{
42    Discovery, GossipDiscovery, GossipDiscoveryConfig, NodeId, NodeInfo, NodeMetadata, NodeState,
43};
44
45/// Shared per-cluster partition rules. Each (src, dst) pair in the
46/// set causes sends from `src` to `dst` to be silently dropped. The
47/// rules are bidirectional when set via [`Self::partition`]; one-way
48/// partitions can be constructed by adding a single pair via
49/// [`Self::drop_pair`].
50pub struct NetworkRules {
51    dropped: Mutex<FxHashSet<(SocketAddr, SocketAddr)>>,
52}
53
54impl std::fmt::Debug for NetworkRules {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("NetworkRules")
57            .field("drop_count", &self.dropped.lock().len())
58            .finish()
59    }
60}
61
62impl NetworkRules {
63    /// Fresh rules with no partitions active.
64    #[must_use]
65    pub fn new() -> Self {
66        Self {
67            dropped: Mutex::new(FxHashSet::default()),
68        }
69    }
70
71    /// Partition the cluster so no traffic flows between `side_a` and
72    /// `side_b` in either direction. Within each side, traffic is
73    /// unaffected.
74    pub fn partition(&self, side_a: &[SocketAddr], side_b: &[SocketAddr]) {
75        let mut set = self.dropped.lock();
76        for a in side_a {
77            for b in side_b {
78                set.insert((*a, *b));
79                set.insert((*b, *a));
80            }
81        }
82    }
83
84    /// Add a one-way drop rule for a specific (src, dst) pair.
85    pub fn drop_pair(&self, src: SocketAddr, dst: SocketAddr) {
86        self.dropped.lock().insert((src, dst));
87    }
88
89    /// Clear every active partition; full connectivity restored.
90    pub fn heal(&self) {
91        self.dropped.lock().clear();
92    }
93
94    /// True if the (src, dst) pair is currently partitioned.
95    #[must_use]
96    pub fn is_dropped(&self, src: SocketAddr, dst: SocketAddr) -> bool {
97        self.dropped.lock().contains(&(src, dst))
98    }
99}
100
101impl Default for NetworkRules {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// Fault mode for [`FaultyObjectStore`], flippable at runtime.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum ObjectStoreFault {
110    /// Pass through to the underlying store.
111    None,
112    /// Every write returns `Error::Generic`.
113    FailWrites,
114    /// Every read returns `Error::NotFound`.
115    FailReads,
116    /// Both reads and writes fail.
117    FailAll,
118}
119
120impl ObjectStoreFault {
121    fn fails_writes(self) -> bool {
122        matches!(self, Self::FailWrites | Self::FailAll)
123    }
124    fn fails_reads(self) -> bool {
125        matches!(self, Self::FailReads | Self::FailAll)
126    }
127}
128
129/// Object store middleware with runtime-flippable fault injection.
130pub struct FaultyObjectStore {
131    inner: Arc<dyn ObjectStore>,
132    fault: Mutex<ObjectStoreFault>,
133}
134
135impl std::fmt::Debug for FaultyObjectStore {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("FaultyObjectStore")
138            .field("fault", &self.fault())
139            .finish_non_exhaustive()
140    }
141}
142
143impl std::fmt::Display for FaultyObjectStore {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        write!(f, "FaultyObjectStore({:?})", self.fault())
146    }
147}
148
149impl FaultyObjectStore {
150    /// Wrap `inner` with fault injection disabled.
151    #[must_use]
152    pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
153        Self {
154            inner,
155            fault: Mutex::new(ObjectStoreFault::None),
156        }
157    }
158
159    /// Current fault mode.
160    #[must_use]
161    pub fn fault(&self) -> ObjectStoreFault {
162        *self.fault.lock()
163    }
164
165    /// Switch fault mode. Takes effect on the next operation.
166    pub fn set_fault(&self, mode: ObjectStoreFault) {
167        *self.fault.lock() = mode;
168    }
169
170    fn check_write(&self) -> object_store::Result<()> {
171        if self.fault().fails_writes() {
172            return Err(object_store::Error::Generic {
173                store: "FaultyObjectStore",
174                source: "injected write failure".into(),
175            });
176        }
177        Ok(())
178    }
179
180    fn check_read(&self, path: &OsPath) -> object_store::Result<()> {
181        if self.fault().fails_reads() {
182            return Err(object_store::Error::NotFound {
183                path: path.to_string(),
184                source: "injected read failure".into(),
185            });
186        }
187        Ok(())
188    }
189}
190
191#[async_trait]
192impl ObjectStore for FaultyObjectStore {
193    async fn put_opts(
194        &self,
195        location: &OsPath,
196        payload: PutPayload,
197        opts: PutOptions,
198    ) -> object_store::Result<PutResult> {
199        self.check_write()?;
200        self.inner.put_opts(location, payload, opts).await
201    }
202
203    async fn put_multipart_opts(
204        &self,
205        location: &OsPath,
206        opts: PutMultipartOptions,
207    ) -> object_store::Result<Box<dyn MultipartUpload>> {
208        self.check_write()?;
209        self.inner.put_multipart_opts(location, opts).await
210    }
211
212    async fn get_opts(
213        &self,
214        location: &OsPath,
215        options: GetOptions,
216    ) -> object_store::Result<GetResult> {
217        self.check_read(location)?;
218        self.inner.get_opts(location, options).await
219    }
220
221    fn delete_stream(
222        &self,
223        locations: futures::stream::BoxStream<'static, object_store::Result<OsPath>>,
224    ) -> futures::stream::BoxStream<'static, object_store::Result<OsPath>> {
225        // Under FailWrites, replace every incoming location with an
226        // injected error; otherwise delegate.
227        if self.fault().fails_writes() {
228            use futures::StreamExt;
229            locations
230                .map(|_| {
231                    Err(object_store::Error::Generic {
232                        store: "FaultyObjectStore",
233                        source: "injected write failure (delete_stream)".into(),
234                    })
235                })
236                .boxed()
237        } else {
238            self.inner.delete_stream(locations)
239        }
240    }
241
242    fn list(
243        &self,
244        prefix: Option<&OsPath>,
245    ) -> futures::stream::BoxStream<'static, object_store::Result<ObjectMeta>> {
246        self.inner.list(prefix)
247    }
248
249    async fn list_with_delimiter(
250        &self,
251        prefix: Option<&OsPath>,
252    ) -> object_store::Result<ListResult> {
253        self.inner.list_with_delimiter(prefix).await
254    }
255
256    async fn copy_opts(
257        &self,
258        from: &OsPath,
259        to: &OsPath,
260        options: CopyOptions,
261    ) -> object_store::Result<()> {
262        self.check_write()?;
263        self.inner.copy_opts(from, to, options).await
264    }
265}
266
267/// Chitchat transport that delegates to a real [`UdpTransport`] but
268/// consults a shared [`NetworkRules`] before each send. Packets
269/// destined for a partitioned peer are silently dropped.
270pub struct PartitionableTransport {
271    rules: Arc<NetworkRules>,
272    inner: UdpTransport,
273}
274
275impl PartitionableTransport {
276    /// Wrap the default [`UdpTransport`] with the given rule set.
277    #[must_use]
278    pub fn new(rules: Arc<NetworkRules>) -> Self {
279        Self {
280            rules,
281            inner: UdpTransport,
282        }
283    }
284}
285
286#[async_trait]
287impl Transport for PartitionableTransport {
288    async fn open(&self, listen_addr: SocketAddr) -> anyhow::Result<Box<dyn Socket>> {
289        let socket = self.inner.open(listen_addr).await?;
290        Ok(Box::new(PartitionableSocket {
291            my_addr: listen_addr,
292            rules: Arc::clone(&self.rules),
293            inner: socket,
294        }))
295    }
296}
297
298struct PartitionableSocket {
299    my_addr: SocketAddr,
300    rules: Arc<NetworkRules>,
301    inner: Box<dyn Socket>,
302}
303
304#[async_trait]
305impl Socket for PartitionableSocket {
306    async fn send(&mut self, to: SocketAddr, msg: chitchat::ChitchatMessage) -> anyhow::Result<()> {
307        if self.rules.is_dropped(self.my_addr, to) {
308            // Silently drop — simulates a network partition. Chitchat's
309            // phi-accrual observes the absence, eventually marks the
310            // peer suspected.
311            return Ok(());
312        }
313        self.inner.send(to, msg).await
314    }
315
316    async fn recv(&mut self) -> anyhow::Result<(SocketAddr, chitchat::ChitchatMessage)> {
317        self.inner.recv().await
318    }
319}
320
321/// Returns a free loopback UDP port. Binds and drops a socket to
322/// discover an ephemeral port; race with concurrent binders is
323/// accepted for test use.
324fn grab_port() -> u16 {
325    let sock = UdpSocket::bind("127.0.0.1:0").expect("bind 127.0.0.1:0");
326    let port = sock.local_addr().expect("local_addr").port();
327    drop(sock);
328    port
329}
330
331/// One instance of the mini-cluster: a gossip discovery + controller.
332pub struct NodeHandle {
333    /// This node's identity.
334    pub instance_id: NodeId,
335    /// Gossip address this node listens on.
336    pub gossip_addr: String,
337    /// The cluster control facade. `Arc`-shared so tests can observe
338    /// leader status while the harness retains ownership.
339    pub controller: Arc<ClusterController>,
340    /// Underlying discovery. Kept to drive shutdown.
341    discovery: GossipDiscovery,
342}
343
344impl NodeHandle {
345    /// Gracefully stop this node. Before shutting down the gossip
346    /// socket, publishes `state = Left` via chitchat so peers see the
347    /// status change via the next gossip round (~100 ms), rather than
348    /// waiting for phi-accrual to flag the missing heartbeats.
349    ///
350    /// This matches the contract other production gossip stacks
351    /// (Serf, Akka cluster) provide — clean leave is a protocol
352    /// primitive, not a timing hack. Tests that want to simulate a
353    /// crash (no clean leave) should use [`Self::crash`] instead.
354    pub async fn kill(mut self) {
355        let left = NodeInfo {
356            state: NodeState::Left,
357            ..current_info(&self)
358        };
359        let _ = self.discovery.announce(left).await;
360        // Brief pause so the Left announcement propagates via gossip
361        // before the socket closes.
362        tokio::time::sleep(Duration::from_millis(150)).await;
363        let _ = self.discovery.stop().await;
364    }
365
366    /// Simulate a crash: shut down this node *without* announcing a
367    /// `Left` state. Peers rely on phi-accrual to eventually detect the
368    /// failure, which in the default chitchat config can take tens of
369    /// seconds — exactly the behavior a real crash produces. Use
370    /// [`Self::kill`] for scenarios that expect prompt failover.
371    ///
372    /// Unlike a plain `drop`, this calls `discovery.stop().await` to
373    /// shut down the chitchat background task and release the UDP
374    /// socket. Skipping that was previously leaking tasks and ports
375    /// between crash-based tests; the *protocol-visible* behavior
376    /// matches a real crash either way because `stop` does not emit
377    /// any Left announcement (that's `kill`'s job).
378    pub async fn crash(mut self) {
379        let _ = self.discovery.stop().await;
380    }
381}
382
383fn current_info(node: &NodeHandle) -> NodeInfo {
384    NodeInfo {
385        id: node.instance_id,
386        name: format!("minicluster-n{}", node.instance_id.0),
387        rpc_address: String::new(),
388        raft_address: String::new(),
389        state: NodeState::Active,
390        metadata: NodeMetadata {
391            cores: 1,
392            ..NodeMetadata::default()
393        },
394        last_heartbeat_ms: 0,
395    }
396}
397
398/// Builder + wrapper for a set of in-process cluster nodes.
399pub struct MiniCluster {
400    /// The nodes in ID order: `nodes[0]` has the lowest
401    /// `instance_id` and is the leader under normal operation.
402    pub nodes: Vec<NodeHandle>,
403    /// Shared network rules. `None` for plain [`Self::spawn`]
404    /// clusters (default UDP transport); `Some` when built via
405    /// [`Self::spawn_partitionable`].
406    pub rules: Option<Arc<NetworkRules>>,
407    /// Shared assignment snapshot store handed to every node's
408    /// controller (so a snapshot written by one node is visible to
409    /// all). `None` unless built via [`Self::spawn_with_snapshot`].
410    pub snapshot: Option<Arc<AssignmentSnapshotStore>>,
411}
412
413impl MiniCluster {
414    /// Spin up `n` nodes with the default UDP transport. See
415    /// [`Self::spawn_partitionable`] for a variant that allows
416    /// simulating network partitions.
417    ///
418    /// # Panics
419    /// Panics if any chitchat instance fails to start (UDP bind
420    /// failure, usually indicates port contention).
421    pub async fn spawn(n: usize) -> Self {
422        Self::spawn_inner(n, None, None).await
423    }
424
425    /// Spin up `n` nodes with a [`PartitionableTransport`] wrapping
426    /// the default UDP transport. The returned cluster carries
427    /// [`Self::rules`] so tests can call
428    /// [`NetworkRules::partition`] / [`NetworkRules::heal`] during
429    /// the test.
430    ///
431    /// # Panics
432    /// Same as [`Self::spawn`].
433    pub async fn spawn_partitionable(n: usize) -> Self {
434        let rules = Arc::new(NetworkRules::new());
435        Self::spawn_inner(n, Some(rules), None).await
436    }
437
438    /// Spin up `n` nodes sharing the given
439    /// [`AssignmentSnapshotStore`]. Each node's controller is
440    /// constructed with the same store, so a snapshot written by
441    /// any node is visible to every other node and survives across
442    /// a full cluster restart (provided the object store does).
443    pub async fn spawn_with_snapshot(n: usize, snapshot: Arc<AssignmentSnapshotStore>) -> Self {
444        Self::spawn_inner(n, None, Some(snapshot)).await
445    }
446
447    /// Join one new node with the given `instance_id` into an
448    /// already-running cluster. Seeds from `nodes[0]` for discovery.
449    /// Useful for testing rejoin-after-kill and elastic scale-up
450    /// scenarios.
451    ///
452    /// # Panics
453    /// Panics if the cluster is empty (nothing to seed from) or if
454    /// chitchat fails to bind a loopback port.
455    pub async fn join_node(&mut self, instance_id: NodeId) {
456        assert!(!self.nodes.is_empty(), "cannot join empty cluster");
457        // Seed from every currently-present node so the rejoiner has
458        // multiple contact points regardless of which one answers
459        // first.
460        let seeds: Vec<String> = self.nodes.iter().map(|n| n.gossip_addr.clone()).collect();
461        let port = grab_port();
462        let gossip_addr = format!("127.0.0.1:{port}");
463
464        let local_node = NodeInfo {
465            id: instance_id,
466            name: format!("minicluster-rejoin-{}", instance_id.0),
467            rpc_address: String::new(),
468            raft_address: String::new(),
469            state: NodeState::Active,
470            metadata: NodeMetadata {
471                cores: 1,
472                ..NodeMetadata::default()
473            },
474            last_heartbeat_ms: 0,
475        };
476
477        let cfg = GossipDiscoveryConfig {
478            gossip_address: gossip_addr.clone(),
479            seed_nodes: seeds,
480            gossip_interval: Duration::from_millis(50),
481            phi_threshold: 3.0,
482            dead_node_grace_period: Duration::from_secs(1),
483            cluster_id: "minicluster".to_string(),
484            node_id: instance_id,
485            local_node,
486        };
487        let mut discovery = GossipDiscovery::new(cfg);
488        match &self.rules {
489            Some(rules) => {
490                let transport = PartitionableTransport::new(Arc::clone(rules));
491                discovery
492                    .start_with_transport(&transport)
493                    .await
494                    .expect("partitionable chitchat start on rejoin");
495            }
496            None => discovery.start().await.expect("chitchat start on rejoin"),
497        }
498
499        let handle = discovery
500            .chitchat_handle()
501            .expect("chitchat handle available after start");
502        let kv: Arc<dyn ClusterKv> = Arc::new(ChitchatKv::from_handle(handle));
503        let members_rx = discovery.membership_watch();
504        let controller = Arc::new(ClusterController::new(
505            instance_id,
506            kv,
507            self.snapshot.clone(),
508            members_rx,
509        ));
510
511        self.nodes.push(NodeHandle {
512            instance_id,
513            gossip_addr,
514            controller,
515            discovery,
516        });
517    }
518
519    async fn spawn_inner(
520        n: usize,
521        rules: Option<Arc<NetworkRules>>,
522        snapshot: Option<Arc<AssignmentSnapshotStore>>,
523    ) -> Self {
524        assert!(n >= 1, "MiniCluster needs at least one node");
525
526        let ports: Vec<u16> = (0..n).map(|_| grab_port()).collect();
527        let seed = format!("127.0.0.1:{}", ports[0]);
528        let transport = rules
529            .as_ref()
530            .map(|r| PartitionableTransport::new(Arc::clone(r)));
531
532        let mut nodes = Vec::with_capacity(n);
533        for (idx, port) in ports.iter().enumerate() {
534            let instance_id = NodeId((idx as u64) + 1); // skip UNASSIGNED=0
535            let gossip_addr = format!("127.0.0.1:{port}");
536
537            let local_node = NodeInfo {
538                id: instance_id,
539                name: format!("minicluster-n{idx}"),
540                rpc_address: String::new(),
541                raft_address: String::new(),
542                state: NodeState::Active,
543                metadata: NodeMetadata {
544                    cores: 1,
545                    ..NodeMetadata::default()
546                },
547                last_heartbeat_ms: 0,
548            };
549
550            let seeds = if idx == 0 {
551                Vec::new()
552            } else {
553                vec![seed.clone()]
554            };
555            // Aggressive timings: tests trade false-positive risk for
556            // fast failover feedback. Production configs use the
557            // chitchat defaults (phi=8.0, grace≈3s).
558            let cfg = GossipDiscoveryConfig {
559                gossip_address: gossip_addr.clone(),
560                seed_nodes: seeds,
561                gossip_interval: Duration::from_millis(50),
562                phi_threshold: 3.0,
563                dead_node_grace_period: Duration::from_secs(1),
564                cluster_id: "minicluster".to_string(),
565                node_id: instance_id,
566                local_node,
567            };
568            let mut discovery = GossipDiscovery::new(cfg);
569            match &transport {
570                Some(t) => discovery
571                    .start_with_transport(t)
572                    .await
573                    .expect("partitionable chitchat start"),
574                None => discovery.start().await.expect("chitchat start on loopback"),
575            }
576
577            let handle = discovery
578                .chitchat_handle()
579                .expect("chitchat handle available after start");
580            let kv: Arc<dyn ClusterKv> = Arc::new(ChitchatKv::from_handle(handle));
581            let members_rx: watch::Receiver<Vec<NodeInfo>> = discovery.membership_watch();
582            let controller = Arc::new(ClusterController::new(
583                instance_id,
584                kv,
585                snapshot.clone(),
586                members_rx,
587            ));
588
589            nodes.push(NodeHandle {
590                instance_id,
591                gossip_addr,
592                controller,
593                discovery,
594            });
595        }
596        Self {
597            nodes,
598            rules,
599            snapshot,
600        }
601    }
602
603    /// Parse and collect node gossip addresses — useful for building
604    /// partition rules via [`NetworkRules::partition`].
605    ///
606    /// # Panics
607    /// Panics if any node's `gossip_addr` doesn't parse as a
608    /// `SocketAddr` (only possible if the `MiniCluster` was constructed
609    /// with malformed input).
610    #[must_use]
611    pub fn addrs(&self) -> Vec<SocketAddr> {
612        self.nodes
613            .iter()
614            .map(|n| n.gossip_addr.parse().expect("valid gossip_addr"))
615            .collect()
616    }
617
618    /// Wait until every node sees every other node as a peer, or
619    /// until `deadline` passes.
620    ///
621    /// # Errors
622    /// Returns `Err(String)` on timeout, describing which nodes
623    /// haven't converged yet, or if a discovery `peers()` call fails.
624    pub async fn wait_for_convergence(&self, deadline: Duration) -> Result<(), String> {
625        let start = Instant::now();
626        loop {
627            let mut all_converged = true;
628            let mut missing_summary = Vec::new();
629            for node in &self.nodes {
630                let peers = node
631                    .discovery
632                    .peers()
633                    .await
634                    .map_err(|e| format!("peers() failed on {}: {e}", node.instance_id.0))?;
635                let expected = self.nodes.len() - 1;
636                if peers.len() < expected {
637                    all_converged = false;
638                    missing_summary.push(format!(
639                        "node {} sees {} peers (expected {})",
640                        node.instance_id.0,
641                        peers.len(),
642                        expected
643                    ));
644                }
645            }
646            if all_converged {
647                return Ok(());
648            }
649            if start.elapsed() >= deadline {
650                return Err(format!(
651                    "convergence timeout after {:?}: {}",
652                    deadline,
653                    missing_summary.join("; "),
654                ));
655            }
656            tokio::time::sleep(Duration::from_millis(100)).await;
657        }
658    }
659
660    /// Shut every node down cleanly.
661    pub async fn shutdown(mut self) {
662        for node in self.nodes.drain(..) {
663            node.kill().await;
664        }
665    }
666}