1use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_core::checkpoint::CheckpointBarrier;
12use laminar_core::operator::Output;
13use laminar_core::tpc::TaggedOutput;
14use laminar_core::tpc::TpcConfig;
15use rustc_hash::{FxHashMap, FxHashSet};
16
17use super::callback::{PipelineCallback, SourceRegistration};
18use super::config::PipelineConfig;
19use super::tpc_runtime::TpcRuntime;
20use crate::error::DbError;
21
22struct PendingBarrier {
24 checkpoint_id: u64,
25 sources_total: usize,
26 sources_aligned: FxHashSet<usize>,
27 source_checkpoints: FxHashMap<String, SourceCheckpoint>,
28 started_at: Instant,
29 active: bool,
31}
32
33impl PendingBarrier {
34 fn new() -> Self {
35 Self {
36 checkpoint_id: 0,
37 sources_total: 0,
38 sources_aligned: FxHashSet::default(),
39 source_checkpoints: FxHashMap::default(),
40 started_at: Instant::now(),
41 active: false,
42 }
43 }
44
45 fn reset(&mut self, checkpoint_id: u64, sources_total: usize) {
46 self.checkpoint_id = checkpoint_id;
47 self.sources_total = sources_total;
48 self.sources_aligned.clear();
49 self.source_checkpoints.clear();
50 self.started_at = Instant::now();
51 self.active = true;
52 }
53}
54
55const IDLE_FALLBACK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
58
59pub struct TpcPipelineCoordinator {
64 config: PipelineConfig,
65 runtime: TpcRuntime,
66 shutdown: Arc<tokio::sync::Notify>,
67 has_new_data: Arc<std::sync::atomic::AtomicBool>,
69 data_notify: Arc<tokio::sync::Notify>,
71 source_name_cache: Vec<String>,
73 drain_buffer: Vec<TaggedOutput>,
75 source_batches_buf: FxHashMap<String, Vec<RecordBatch>>,
77 barriers_buf: Vec<(usize, CheckpointBarrier)>,
79 pending_barrier: PendingBarrier,
81 late_events: u64,
83 next_checkpoint_id: u64,
85 last_checkpoint: Instant,
87 consecutive_sql_errors: u32,
89 checkpoint_request_flags: Vec<Arc<std::sync::atomic::AtomicBool>>,
92}
93
94impl TpcPipelineCoordinator {
95 pub fn new(
101 sources: Vec<SourceRegistration>,
102 config: PipelineConfig,
103 tpc_config: &TpcConfig,
104 shutdown: Arc<tokio::sync::Notify>,
105 ) -> Result<Self, DbError> {
106 let mut runtime =
107 TpcRuntime::new(tpc_config).map_err(|e| DbError::Config(e.to_string()))?;
108
109 if config.delivery_guarantee
111 == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
112 {
113 for src in &sources {
114 if !src.supports_replay {
115 return Err(DbError::Config(format!(
116 "[LDB-5031] exactly-once requires source '{}' to support replay, \
117 but supports_replay=false",
118 src.name
119 )));
120 }
121 }
122 if config.checkpoint_interval.is_none() {
123 return Err(DbError::Config(
124 "[LDB-5032] exactly-once requires checkpointing to be enabled \
125 (checkpoint_interval must be set)"
126 .into(),
127 ));
128 }
129 }
130
131 let mut checkpoint_request_flags = Vec::new();
133 for src in &sources {
134 if let Some(flag) = src.connector.checkpoint_requested() {
135 checkpoint_request_flags.push(flag);
136 }
137 }
138
139 for (idx, src) in sources.into_iter().enumerate() {
140 runtime
141 .attach_source(
142 idx,
143 src.name,
144 src.connector,
145 src.config,
146 &config,
147 src.restore_checkpoint,
148 )
149 .map_err(|e| DbError::Config(format!("failed to spawn source thread: {e}")))?;
150 }
151
152 let source_name_cache: Vec<String> =
153 runtime.source_names().iter().map(String::clone).collect();
154 let (has_new_data, data_notify) = runtime.output_signal();
155
156 Ok(Self {
157 config,
158 runtime,
159 shutdown,
160 has_new_data,
161 data_notify,
162 source_name_cache,
163 drain_buffer: Vec::with_capacity(4096),
164 source_batches_buf: FxHashMap::default(),
165 barriers_buf: Vec::new(),
166 pending_barrier: PendingBarrier::new(),
167 late_events: 0,
168 next_checkpoint_id: 1,
169 last_checkpoint: Instant::now(),
170 consecutive_sql_errors: 0,
171 checkpoint_request_flags,
172 })
173 }
174
175 #[allow(clippy::too_many_lines)]
180 pub async fn run(mut self, mut callback: Box<dyn PipelineCallback>) {
181 let batch_window = self.config.batch_window;
182 let barrier_timeout = self.config.barrier_alignment_timeout;
183 let startup_time = Instant::now();
184 let mut source_health_checked = false;
185
186 loop {
187 tokio::select! {
191 biased;
192 () = self.shutdown.notified() => break,
193 () = self.data_notify.notified() => {
194 if !batch_window.is_zero() {
199 tokio::time::sleep(batch_window).await;
200 }
201 }
202 () = tokio::time::sleep(IDLE_FALLBACK_TIMEOUT) => {}
205 }
206
207 self.has_new_data
209 .store(false, std::sync::atomic::Ordering::Release);
210
211 self.drain_buffer.clear();
213 self.runtime.poll_all_outputs(&mut self.drain_buffer);
214 if self.drain_buffer.is_empty() {
215 if !source_health_checked
218 && startup_time.elapsed() > std::time::Duration::from_secs(1)
219 {
220 source_health_checked = true;
221 let failed = self.runtime.failed_sources();
222 if !failed.is_empty() {
223 tracing::error!(
224 sources = ?failed,
225 "[LDB-5033] source threads failed to start"
226 );
227 if self.config.delivery_guarantee
228 == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
229 {
230 tracing::error!("aborting: exactly-once requires all sources to start");
231 break;
232 }
233 }
234 }
235
236 self.maybe_inject_checkpoint(&mut *callback).await;
237 callback.poll_tables().await;
238 if self.pending_barrier.active
241 && self.pending_barrier.started_at.elapsed() > barrier_timeout
242 {
243 let missing: Vec<&str> = self
244 .source_name_cache
245 .iter()
246 .enumerate()
247 .filter(|(i, _)| !self.pending_barrier.sources_aligned.contains(i))
248 .map(|(_, name)| name.as_str())
249 .collect();
250 tracing::warn!(
251 checkpoint_id = self.pending_barrier.checkpoint_id,
252 aligned = self.pending_barrier.sources_aligned.len(),
253 total = self.pending_barrier.sources_total,
254 missing_sources = ?missing,
255 "Barrier alignment timeout — cancelling checkpoint"
256 );
257 self.pending_barrier.active = false;
258 }
259 continue;
260 }
261
262 self.source_batches_buf.clear();
266 self.barriers_buf.clear();
267
268 let mut cycle_events: u64 = 0;
269 let mut cycle_batches: u64 = 0;
270 let cycle_start = Instant::now();
271
272 for tagged in self.drain_buffer.drain(..) {
273 match tagged.output {
274 Output::Event(event) => {
275 if let Some(name) = self.source_name_cache.get(tagged.source_idx) {
276 callback.extract_watermark(name, &event.data);
277 cycle_events += event.data.num_rows() as u64;
278 if let Some(filtered) = callback.filter_late_rows(name, &event.data) {
279 if let Some(vec) = self.source_batches_buf.get_mut(name.as_str()) {
280 vec.push(filtered);
281 } else {
282 self.source_batches_buf.insert(name.clone(), vec![filtered]);
283 }
284 cycle_batches += 1;
285 }
286 }
287 }
288 Output::Barrier(barrier) => {
289 self.barriers_buf.push((tagged.source_idx, barrier));
290 }
291 Output::Watermark(_ts) => {
292 }
295 Output::CheckpointComplete(data) => {
296 tracing::debug!(
297 checkpoint_id = data.checkpoint_id,
298 operators = data.operator_states.len(),
299 "core checkpoint complete (operator states not yet persisted — \
300 cores currently run no operators)"
301 );
302 }
303 Output::LateEvent(_event) => {
304 self.late_events += 1;
305 tracing::trace!(total_late = self.late_events, "late event past watermark");
306 }
307 Output::SideOutput(_) | Output::Changelog(_) => {
308 tracing::warn!(
309 source_idx = tagged.source_idx,
310 "SideOutput/Changelog leaked past DAG boundary — dropped"
311 );
312 }
313 }
314 }
315
316 let mut barriers = std::mem::take(&mut self.barriers_buf);
319 for (source_idx, barrier) in barriers.drain(..) {
320 self.handle_barrier(source_idx, &barrier, &mut *callback)
321 .await;
322 }
323 self.barriers_buf = barriers;
325
326 if !self.source_batches_buf.is_empty() {
328 let wm = callback.current_watermark();
329 match callback.execute_cycle(&self.source_batches_buf, wm).await {
330 Ok(results) => {
331 self.consecutive_sql_errors = 0;
332 callback.push_to_streams(&results);
333 callback.write_to_sinks(&results).await;
334 }
335 Err(e) => {
336 self.consecutive_sql_errors += 1;
337 tracing::warn!(
338 error = %e,
339 consecutive = self.consecutive_sql_errors,
340 "[LDB-3020] SQL cycle error"
341 );
342 if self.consecutive_sql_errors >= 100 {
343 tracing::error!(
344 "[LDB-3021] {} consecutive SQL errors — shutting down pipeline",
345 self.consecutive_sql_errors
346 );
347 break;
348 }
349 }
350 }
351 #[allow(clippy::cast_possible_truncation)]
352 let elapsed_ns = cycle_start.elapsed().as_nanos() as u64;
354 callback.record_cycle(cycle_events, cycle_batches, elapsed_ns);
355 }
356
357 self.maybe_inject_checkpoint(&mut *callback).await;
359
360 callback.poll_tables().await;
362
363 if self.pending_barrier.active
365 && self.pending_barrier.started_at.elapsed() > barrier_timeout
366 {
367 let missing: Vec<&str> = self
368 .source_name_cache
369 .iter()
370 .enumerate()
371 .filter(|(i, _)| !self.pending_barrier.sources_aligned.contains(i))
372 .map(|(_, name)| name.as_str())
373 .collect();
374 tracing::warn!(
375 checkpoint_id = self.pending_barrier.checkpoint_id,
376 aligned = self.pending_barrier.sources_aligned.len(),
377 total = self.pending_barrier.sources_total,
378 missing_sources = ?missing,
379 "Barrier alignment timeout — cancelling checkpoint"
380 );
381 self.pending_barrier.active = false;
382 }
383 }
384
385 self.drain_buffer.clear();
387 self.runtime.poll_all_outputs(&mut self.drain_buffer);
388
389 let connectors = self.runtime.shutdown();
391 for (_name, mut connector) in connectors {
392 if let Err(e) = connector.close().await {
393 tracing::warn!(error = %e, "Error closing connector on shutdown");
394 }
395 }
396 }
397
398 async fn handle_barrier(
401 &mut self,
402 source_idx: usize,
403 barrier: &CheckpointBarrier,
404 callback: &mut dyn PipelineCallback,
405 ) {
406 if !self.pending_barrier.active {
407 self.pending_barrier
408 .reset(barrier.checkpoint_id, self.runtime.num_sources());
409 }
410
411 if self.pending_barrier.checkpoint_id != barrier.checkpoint_id {
412 tracing::debug!(
413 expected = self.pending_barrier.checkpoint_id,
414 actual = barrier.checkpoint_id,
415 source_idx,
416 "ignoring barrier with mismatched checkpoint ID"
417 );
418 return;
419 }
420
421 self.pending_barrier.sources_aligned.insert(source_idx);
422
423 if let Some(name) = self.source_name_cache.get(source_idx) {
425 let cp = self.runtime.source_checkpoint(source_idx);
426 self.pending_barrier
427 .source_checkpoints
428 .insert(name.clone(), cp);
429 }
430
431 if self.pending_barrier.sources_aligned.len() >= self.pending_barrier.sources_total {
433 let source_checkpoints: FxHashMap<String, SourceCheckpoint> =
435 self.pending_barrier.source_checkpoints.drain().collect();
436 self.pending_barrier.active = false;
437
438 let success = callback.checkpoint_with_barrier(source_checkpoints).await;
439 if !success {
440 if self.config.delivery_guarantee
441 == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
442 {
443 tracing::error!("[LDB-6011] barrier checkpoint failed under exactly-once");
444 } else {
445 tracing::warn!("checkpoint with barrier failed");
446 }
447 }
448 }
449 }
450
451 async fn maybe_inject_checkpoint(&mut self, callback: &mut dyn PipelineCallback) {
454 if self.pending_barrier.active {
455 return; }
457
458 let source_requested = self.checkpoint_request_flags.iter().any(|flag| {
460 flag.compare_exchange(
461 true,
462 false,
463 std::sync::atomic::Ordering::AcqRel,
464 std::sync::atomic::Ordering::Relaxed,
465 )
466 .is_ok()
467 });
468
469 let is_exactly_once = self.config.delivery_guarantee
470 == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce;
471
472 let Some(interval) = self.config.checkpoint_interval else {
473 if source_requested && !is_exactly_once {
474 let offsets = self.runtime.snapshot_all_source_checkpoints();
477 let _ = callback.maybe_checkpoint(true, offsets).await;
478 }
479 return;
480 };
481
482 if !source_requested && self.last_checkpoint.elapsed() < interval {
483 return;
484 }
485
486 self.last_checkpoint = Instant::now();
487 let checkpoint_id = self.next_checkpoint_id;
488 self.next_checkpoint_id += 1;
489
490 for idx in 0..self.runtime.num_sources() {
492 self.runtime
493 .injector(idx)
494 .trigger(checkpoint_id, laminar_core::checkpoint::flags::NONE);
495 }
496
497 if !is_exactly_once {
499 let offsets = self.runtime.snapshot_all_source_checkpoints();
500 let _ = callback.maybe_checkpoint(false, offsets).await;
501 }
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn test_pending_barrier_creation_and_reset() {
511 let mut pending = PendingBarrier::new();
512 assert!(!pending.active);
513
514 pending.reset(1, 3);
515 assert!(pending.active);
516 assert_eq!(pending.checkpoint_id, 1);
517 assert_eq!(pending.sources_total, 3);
518 assert!(pending.sources_aligned.is_empty());
519
520 pending.sources_aligned.insert(0);
521 pending.reset(2, 5);
522 assert_eq!(pending.checkpoint_id, 2);
523 assert!(pending.sources_aligned.is_empty());
524 }
525}