Skip to main content

laminar_connectors/
generator.rs

1//! Synthetic data generator source — no external infrastructure.
2//!
3//! Emits a deterministic sequence at a configured rate: row `i` is
4//! always `(seq = i, ts_ms = i * 1000 / rows_per_second, value = "v{i}")`,
5//! so output is a pure function of the offset. That makes the source
6//! fully replayable (exactly-once capable) and lets harnesses verify
7//! sink completeness by recomputing the expected rows — its primary
8//! consumer is the cluster soak test, but it works anywhere a
9//! self-driving source is needed (demos, benchmarks).
10
11use std::sync::Arc;
12use std::time::Instant;
13
14use arrow_array::{Int64Array, RecordBatch, StringArray};
15use arrow_schema::{DataType, Field, Schema, SchemaRef};
16use async_trait::async_trait;
17
18use crate::checkpoint::SourceCheckpoint;
19use crate::config::{ConfigKeySpec, ConnectorConfig, ConnectorInfo};
20use crate::connector::{SourceBatch, SourceConnector};
21use crate::error::ConnectorError;
22use crate::registry::ConnectorRegistry;
23
24/// Deterministic rate-limited source. See module docs.
25pub struct GeneratorSource {
26    schema: SchemaRef,
27    rows_per_second: u64,
28    batch_max: usize,
29    max_rows: Option<u64>,
30    /// Next sequence number to emit (== rows emitted so far across
31    /// restarts; restored from the checkpoint).
32    next_seq: u64,
33    /// Rate-limit anchor: emission is allowed up to
34    /// `anchor_seq + elapsed_since(anchor) * rows_per_second`.
35    /// Re-anchored on open/restore so a restart doesn't burst or stall.
36    anchor: Option<(Instant, u64)>,
37}
38
39impl GeneratorSource {
40    fn generator_schema() -> SchemaRef {
41        Arc::new(Schema::new(vec![
42            Field::new("seq", DataType::Int64, false),
43            Field::new("ts_ms", DataType::Int64, false),
44            Field::new("value", DataType::Utf8, false),
45        ]))
46    }
47
48    /// Build rows `[start, start + n)` — a pure function of `start`,
49    /// which is what makes the source replayable.
50    // Cast lints: seq is a monotonic generator counter and rates are
51    // operator-supplied config — both far below the 2^52/2^63 edges.
52    #[allow(
53        clippy::cast_possible_wrap,
54        clippy::cast_sign_loss,
55        clippy::cast_possible_truncation,
56        clippy::cast_precision_loss
57    )]
58    fn build_batch(&self, start: u64, n: usize) -> Result<RecordBatch, ConnectorError> {
59        let seqs: Vec<i64> = (0..n as u64).map(|i| (start + i) as i64).collect();
60        let ts: Vec<i64> = seqs
61            .iter()
62            .map(|&s| {
63                (s as u64)
64                    .saturating_mul(1000)
65                    .wrapping_div(self.rows_per_second) as i64
66            })
67            .collect();
68        let values: Vec<String> = seqs.iter().map(|s| format!("v{s}")).collect();
69        RecordBatch::try_new(
70            Arc::clone(&self.schema),
71            vec![
72                Arc::new(Int64Array::from(seqs)),
73                Arc::new(Int64Array::from(ts)),
74                Arc::new(StringArray::from(values)),
75            ],
76        )
77        .map_err(|e| ConnectorError::ReadError(e.to_string()))
78    }
79}
80
81impl Default for GeneratorSource {
82    fn default() -> Self {
83        Self {
84            schema: Self::generator_schema(),
85            rows_per_second: 1000,
86            batch_max: 1024,
87            max_rows: None,
88            next_seq: 0,
89            anchor: None,
90        }
91    }
92}
93
94#[async_trait]
95impl SourceConnector for GeneratorSource {
96    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
97        if let Some(rps) = config.get_parsed::<u64>("rows.per.second")? {
98            if rps == 0 {
99                return Err(ConnectorError::ConfigurationError(
100                    "rows.per.second must be > 0".into(),
101                ));
102            }
103            self.rows_per_second = rps;
104        }
105        if let Some(n) = config.get_parsed::<usize>("batch.max.size")? {
106            self.batch_max = n.max(1);
107        }
108        self.max_rows = config.get_parsed::<u64>("max.rows")?;
109        self.anchor = None;
110        Ok(())
111    }
112
113    // Cast lints: see build_batch.
114    #[allow(
115        clippy::cast_possible_truncation,
116        clippy::cast_sign_loss,
117        clippy::cast_precision_loss
118    )]
119    async fn poll_batch(
120        &mut self,
121        max_records: usize,
122    ) -> Result<Option<SourceBatch>, ConnectorError> {
123        let (anchored_at, anchor_seq) = *self
124            .anchor
125            .get_or_insert_with(|| (Instant::now(), self.next_seq));
126        let mut allowed = anchor_seq.saturating_add(
127            (anchored_at.elapsed().as_secs_f64() * self.rows_per_second as f64) as u64,
128        );
129        if let Some(max) = self.max_rows {
130            allowed = allowed.min(max);
131        }
132        let pending = allowed.saturating_sub(self.next_seq);
133        let n = (pending as usize).min(max_records).min(self.batch_max);
134        if n == 0 {
135            return Ok(None);
136        }
137        let batch = self.build_batch(self.next_seq, n)?;
138        self.next_seq += n as u64;
139        Ok(Some(SourceBatch::new(batch)))
140    }
141
142    fn schema(&self) -> SchemaRef {
143        Arc::clone(&self.schema)
144    }
145
146    fn checkpoint(&self) -> SourceCheckpoint {
147        let mut cp = SourceCheckpoint::new(0);
148        cp.set_offset("seq", self.next_seq.to_string());
149        cp
150    }
151
152    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
153        if let Some(seq) = checkpoint.get_offset("seq") {
154            self.next_seq = seq.parse().map_err(|e| {
155                ConnectorError::ConfigurationError(format!("bad generator offset '{seq}': {e}"))
156            })?;
157        }
158        // Re-anchor so the rate limit resumes from here rather than
159        // bursting to "catch up" with pre-restart wall-clock time.
160        self.anchor = None;
161        Ok(())
162    }
163
164    async fn close(&mut self) -> Result<(), ConnectorError> {
165        Ok(())
166    }
167
168    /// Output is a pure function of the offset — replay is exact.
169    fn supports_replay(&self) -> bool {
170        true
171    }
172}
173
174/// Registers the generator source so
175/// `CREATE SOURCE ... WITH (connector = 'generator')` resolves.
176pub fn register_generator_source(registry: &ConnectorRegistry) {
177    let info = ConnectorInfo {
178        name: "generator".to_string(),
179        display_name: "Synthetic Data Generator".to_string(),
180        version: env!("CARGO_PKG_VERSION").to_string(),
181        is_source: true,
182        is_sink: false,
183        config_keys: vec![
184            ConfigKeySpec::optional("rows.per.second", "Emission rate", "1000"),
185            ConfigKeySpec::optional("batch.max.size", "Max rows per batch", "1024"),
186            ConfigKeySpec::optional(
187                "max.rows",
188                "Stop after this many rows (unbounded if unset)",
189                "",
190            ),
191        ],
192    };
193    registry.register_source(
194        "generator",
195        info,
196        Arc::new(|_registry: Option<&prometheus::Registry>| Box::new(GeneratorSource::default())),
197    );
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    #[tokio::test]
205    async fn deterministic_and_replayable_across_restore() {
206        let mut config = ConnectorConfig::new("generator");
207        config.set("rows.per.second", "1000000"); // effectively unthrottled
208        let mut a = GeneratorSource::default();
209        a.open(&config).await.unwrap();
210        // First poll anchors the rate limit (no tokens accrue before
211        // polling starts); rows become available after it.
212        let _ = a.poll_batch(8).await.unwrap();
213        std::thread::sleep(std::time::Duration::from_millis(10));
214        let first = a.poll_batch(8).await.unwrap().expect("rows");
215        assert_eq!(first.num_rows(), 8);
216
217        // Restore a fresh instance from a's checkpoint at seq=8 and
218        // verify the next rows equal what `a` produces next.
219        let cp = a.checkpoint();
220        let mut b = GeneratorSource::default();
221        b.open(&config).await.unwrap();
222        b.restore(&cp).await.unwrap();
223        let _ = b.poll_batch(4).await.unwrap();
224        std::thread::sleep(std::time::Duration::from_millis(10));
225        let from_a = a.poll_batch(4).await.unwrap().expect("rows").records;
226        let from_b = b.poll_batch(4).await.unwrap().expect("rows").records;
227        assert_eq!(from_a, from_b, "replay from offset must be byte-identical");
228    }
229
230    #[tokio::test]
231    async fn rate_limit_and_max_rows_bound_emission() {
232        let mut config = ConnectorConfig::new("generator");
233        config.set("rows.per.second", "1000");
234        config.set("max.rows", "3");
235        let mut g = GeneratorSource::default();
236        g.open(&config).await.unwrap();
237        let _ = g.poll_batch(100).await.unwrap(); // anchor
238        std::thread::sleep(std::time::Duration::from_millis(50)); // ~50 tokens
239        let batch = g.poll_batch(100).await.unwrap().expect("rows");
240        assert_eq!(batch.num_rows(), 3, "max.rows caps emission");
241        assert!(g.poll_batch(100).await.unwrap().is_none(), "exhausted");
242    }
243}