laminar_connectors/
generator.rs1use std::sync::Arc;
12use std::time::Instant;
13
14use arrow_array::{Int64Array, RecordBatch, StringArray};
15use arrow_schema::{DataType, Field, Schema, SchemaRef};
16use async_trait::async_trait;
17
18use crate::checkpoint::SourceCheckpoint;
19use crate::config::{ConfigKeySpec, ConnectorConfig, ConnectorInfo};
20use crate::connector::{SourceBatch, SourceConnector};
21use crate::error::ConnectorError;
22use crate::registry::ConnectorRegistry;
23
24pub struct GeneratorSource {
26 schema: SchemaRef,
27 rows_per_second: u64,
28 batch_max: usize,
29 max_rows: Option<u64>,
30 next_seq: u64,
33 anchor: Option<(Instant, u64)>,
37}
38
39impl GeneratorSource {
40 fn generator_schema() -> SchemaRef {
41 Arc::new(Schema::new(vec![
42 Field::new("seq", DataType::Int64, false),
43 Field::new("ts_ms", DataType::Int64, false),
44 Field::new("value", DataType::Utf8, false),
45 ]))
46 }
47
48 #[allow(
53 clippy::cast_possible_wrap,
54 clippy::cast_sign_loss,
55 clippy::cast_possible_truncation,
56 clippy::cast_precision_loss
57 )]
58 fn build_batch(&self, start: u64, n: usize) -> Result<RecordBatch, ConnectorError> {
59 let seqs: Vec<i64> = (0..n as u64).map(|i| (start + i) as i64).collect();
60 let ts: Vec<i64> = seqs
61 .iter()
62 .map(|&s| {
63 (s as u64)
64 .saturating_mul(1000)
65 .wrapping_div(self.rows_per_second) as i64
66 })
67 .collect();
68 let values: Vec<String> = seqs.iter().map(|s| format!("v{s}")).collect();
69 RecordBatch::try_new(
70 Arc::clone(&self.schema),
71 vec![
72 Arc::new(Int64Array::from(seqs)),
73 Arc::new(Int64Array::from(ts)),
74 Arc::new(StringArray::from(values)),
75 ],
76 )
77 .map_err(|e| ConnectorError::ReadError(e.to_string()))
78 }
79}
80
81impl Default for GeneratorSource {
82 fn default() -> Self {
83 Self {
84 schema: Self::generator_schema(),
85 rows_per_second: 1000,
86 batch_max: 1024,
87 max_rows: None,
88 next_seq: 0,
89 anchor: None,
90 }
91 }
92}
93
94#[async_trait]
95impl SourceConnector for GeneratorSource {
96 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
97 if let Some(rps) = config.get_parsed::<u64>("rows.per.second")? {
98 if rps == 0 {
99 return Err(ConnectorError::ConfigurationError(
100 "rows.per.second must be > 0".into(),
101 ));
102 }
103 self.rows_per_second = rps;
104 }
105 if let Some(n) = config.get_parsed::<usize>("batch.max.size")? {
106 self.batch_max = n.max(1);
107 }
108 self.max_rows = config.get_parsed::<u64>("max.rows")?;
109 self.anchor = None;
110 Ok(())
111 }
112
113 #[allow(
115 clippy::cast_possible_truncation,
116 clippy::cast_sign_loss,
117 clippy::cast_precision_loss
118 )]
119 async fn poll_batch(
120 &mut self,
121 max_records: usize,
122 ) -> Result<Option<SourceBatch>, ConnectorError> {
123 let (anchored_at, anchor_seq) = *self
124 .anchor
125 .get_or_insert_with(|| (Instant::now(), self.next_seq));
126 let mut allowed = anchor_seq.saturating_add(
127 (anchored_at.elapsed().as_secs_f64() * self.rows_per_second as f64) as u64,
128 );
129 if let Some(max) = self.max_rows {
130 allowed = allowed.min(max);
131 }
132 let pending = allowed.saturating_sub(self.next_seq);
133 let n = (pending as usize).min(max_records).min(self.batch_max);
134 if n == 0 {
135 return Ok(None);
136 }
137 let batch = self.build_batch(self.next_seq, n)?;
138 self.next_seq += n as u64;
139 Ok(Some(SourceBatch::new(batch)))
140 }
141
142 fn schema(&self) -> SchemaRef {
143 Arc::clone(&self.schema)
144 }
145
146 fn checkpoint(&self) -> SourceCheckpoint {
147 let mut cp = SourceCheckpoint::new(0);
148 cp.set_offset("seq", self.next_seq.to_string());
149 cp
150 }
151
152 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
153 if let Some(seq) = checkpoint.get_offset("seq") {
154 self.next_seq = seq.parse().map_err(|e| {
155 ConnectorError::ConfigurationError(format!("bad generator offset '{seq}': {e}"))
156 })?;
157 }
158 self.anchor = None;
161 Ok(())
162 }
163
164 async fn close(&mut self) -> Result<(), ConnectorError> {
165 Ok(())
166 }
167
168 fn supports_replay(&self) -> bool {
170 true
171 }
172}
173
174pub fn register_generator_source(registry: &ConnectorRegistry) {
177 let info = ConnectorInfo {
178 name: "generator".to_string(),
179 display_name: "Synthetic Data Generator".to_string(),
180 version: env!("CARGO_PKG_VERSION").to_string(),
181 is_source: true,
182 is_sink: false,
183 config_keys: vec![
184 ConfigKeySpec::optional("rows.per.second", "Emission rate", "1000"),
185 ConfigKeySpec::optional("batch.max.size", "Max rows per batch", "1024"),
186 ConfigKeySpec::optional(
187 "max.rows",
188 "Stop after this many rows (unbounded if unset)",
189 "",
190 ),
191 ],
192 };
193 registry.register_source(
194 "generator",
195 info,
196 Arc::new(|_registry: Option<&prometheus::Registry>| Box::new(GeneratorSource::default())),
197 );
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 #[tokio::test]
205 async fn deterministic_and_replayable_across_restore() {
206 let mut config = ConnectorConfig::new("generator");
207 config.set("rows.per.second", "1000000"); let mut a = GeneratorSource::default();
209 a.open(&config).await.unwrap();
210 let _ = a.poll_batch(8).await.unwrap();
213 std::thread::sleep(std::time::Duration::from_millis(10));
214 let first = a.poll_batch(8).await.unwrap().expect("rows");
215 assert_eq!(first.num_rows(), 8);
216
217 let cp = a.checkpoint();
220 let mut b = GeneratorSource::default();
221 b.open(&config).await.unwrap();
222 b.restore(&cp).await.unwrap();
223 let _ = b.poll_batch(4).await.unwrap();
224 std::thread::sleep(std::time::Duration::from_millis(10));
225 let from_a = a.poll_batch(4).await.unwrap().expect("rows").records;
226 let from_b = b.poll_batch(4).await.unwrap().expect("rows").records;
227 assert_eq!(from_a, from_b, "replay from offset must be byte-identical");
228 }
229
230 #[tokio::test]
231 async fn rate_limit_and_max_rows_bound_emission() {
232 let mut config = ConnectorConfig::new("generator");
233 config.set("rows.per.second", "1000");
234 config.set("max.rows", "3");
235 let mut g = GeneratorSource::default();
236 g.open(&config).await.unwrap();
237 let _ = g.poll_batch(100).await.unwrap(); std::thread::sleep(std::time::Duration::from_millis(50)); let batch = g.poll_batch(100).await.unwrap().expect("rows");
240 assert_eq!(batch.num_rows(), 3, "max.rows caps emission");
241 assert!(g.poll_batch(100).await.unwrap().is_none(), "exhausted");
242 }
243}