1use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use tracing::{debug, info, warn};
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::schema::traits::FormatDecoder;
20use crate::schema::types::RawRecord;
21
22use super::config::{FileFormat, FileSourceConfig};
23use super::discovery::{DiscoveryConfig, FileDiscoveryEngine};
24use super::manifest::{FileEntry, FileIngestionManifest};
25use super::text_decoder::TextLineDecoder;
26
27pub struct FileSource {
32 config: Option<FileSourceConfig>,
34 schema: SchemaRef,
36 decoder: Option<Box<dyn FormatDecoder>>,
38 discovery: Option<FileDiscoveryEngine>,
40 manifest: FileIngestionManifest,
42 is_open: bool,
44}
45
46impl FileSource {
47 #[must_use]
49 pub fn new() -> Self {
50 Self::with_registry(None)
51 }
52
53 #[must_use]
55 pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
56 let empty_schema = Arc::new(Schema::empty());
57 Self {
58 config: None,
59 schema: empty_schema,
60 decoder: None,
61 discovery: None,
62 manifest: FileIngestionManifest::new(),
63 is_open: false,
64 }
65 }
66}
67
68impl Default for FileSource {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl std::fmt::Debug for FileSource {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("FileSource")
77 .field("is_open", &self.is_open)
78 .field("schema_fields", &self.schema.fields().len())
79 .field("manifest_count", &self.manifest.active_count())
80 .finish()
81 }
82}
83
84#[async_trait]
85impl SourceConnector for FileSource {
86 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
87 let src_config = FileSourceConfig::from_connector_config(config)?;
88
89 let format = match src_config.format {
91 Some(f) => f,
92 None => FileFormat::from_extension(&src_config.path).ok_or_else(|| {
93 ConnectorError::ConfigurationError(
94 "cannot detect format from path; specify 'format' explicitly".into(),
95 )
96 })?,
97 };
98
99 let (decoder, schema) = build_decoder_and_schema(format, &src_config, config)?;
101
102 let final_schema = if src_config.include_metadata {
104 let mut fields: Vec<Field> =
105 schema.fields().iter().map(|f| f.as_ref().clone()).collect();
106 fields.push(Field::new(
107 "_metadata",
108 DataType::Struct(
109 vec![
110 Field::new("file_path", DataType::Utf8, false),
111 Field::new("file_name", DataType::Utf8, false),
112 Field::new("file_size", DataType::UInt64, false),
113 Field::new(
114 "file_modification_time",
115 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
116 true,
117 ),
118 ]
119 .into(),
120 ),
121 false,
122 ));
123 Arc::new(Schema::new(fields))
124 } else {
125 schema
126 };
127
128 let discovery_config = DiscoveryConfig {
130 path: src_config.path.clone(),
131 poll_interval: src_config.poll_interval,
132 stabilisation_delay: src_config.stabilisation_delay,
133 glob_pattern: src_config.glob_pattern.clone(),
134 };
135 let known = Arc::new(self.manifest.snapshot_for_dedup());
136 let discovery = FileDiscoveryEngine::start(discovery_config, known);
137
138 self.config = Some(src_config);
139 self.schema = final_schema;
140 self.decoder = Some(decoder);
141 self.discovery = Some(discovery);
142 self.is_open = true;
143
144 info!(
145 "file source opened: format={format:?}, schema_fields={}",
146 self.schema.fields().len()
147 );
148 Ok(())
149 }
150
151 async fn poll_batch(
152 &mut self,
153 _max_records: usize,
154 ) -> Result<Option<SourceBatch>, ConnectorError> {
155 let config = self
156 .config
157 .as_ref()
158 .ok_or_else(|| ConnectorError::InvalidState {
159 expected: "open".into(),
160 actual: "closed".into(),
161 })?;
162 let discovery = self
163 .discovery
164 .as_mut()
165 .ok_or_else(|| ConnectorError::InvalidState {
166 expected: "discovery running".into(),
167 actual: "no discovery".into(),
168 })?;
169 let decoder = self
170 .decoder
171 .as_ref()
172 .ok_or_else(|| ConnectorError::InvalidState {
173 expected: "decoder ready".into(),
174 actual: "no decoder".into(),
175 })?;
176
177 let files = discovery.drain(config.max_files_per_poll);
178 if files.is_empty() {
179 return Ok(None);
180 }
181
182 let mut all_batches: Vec<RecordBatch> = Vec::new();
183
184 for file in &files {
185 if !config.allow_overwrites && self.manifest.size_changed(&file.path, file.size) {
187 warn!(
188 "file source: skipping '{}' — size changed (was {}, now {})",
189 file.path,
190 self.manifest
191 .active_entries()
192 .find(|(p, _)| *p == file.path)
193 .map(|(_, e)| e.size)
194 .unwrap_or(0),
195 file.size
196 );
197 continue;
198 }
199
200 if self.manifest.contains(&file.path) {
202 continue;
203 }
204
205 if file.size > config.max_file_bytes as u64 {
207 warn!(
208 "file source: skipping '{}' — size {} exceeds max_file_bytes {}",
209 file.path, file.size, config.max_file_bytes
210 );
211 continue;
212 }
213
214 let bytes = match read_file_bytes(&file.path).await {
216 Ok(b) => b,
217 Err(e) => {
218 warn!("file source: cannot read '{}': {e}", file.path);
219 continue;
220 }
221 };
222
223 let record = RawRecord::new(bytes);
225 match decoder.decode_batch(&[record]) {
226 Ok(batch) if batch.num_rows() > 0 => {
227 let batch = if config.include_metadata {
228 append_metadata_column(&batch, &file.path, file.size, file.modified_ms)?
229 } else {
230 batch
231 };
232 all_batches.push(batch);
233 }
234 Ok(_) => {
235 debug!("file source: empty batch from '{}'", file.path);
236 }
237 Err(e) => {
238 warn!("file source: decode error for '{}': {e}", file.path);
239 continue;
240 }
241 }
242
243 self.manifest.insert(
245 file.path.clone(),
246 FileEntry {
247 size: file.size,
248 discovered_at: file.modified_ms,
249 ingested_at: now_millis(),
250 },
251 );
252 }
253
254 if let Some(cfg) = &self.config {
256 let max_age_ms = cfg.manifest_retention_age_days * 24 * 60 * 60 * 1000;
257 self.manifest
258 .maybe_evict(cfg.manifest_retention_count, max_age_ms);
259 }
260
261 if all_batches.is_empty() {
262 return Ok(None);
263 }
264
265 let combined = if all_batches.len() == 1 {
267 all_batches.into_iter().next().unwrap()
268 } else {
269 arrow_select::concat::concat_batches(&self.schema, &all_batches)
270 .map_err(|e| ConnectorError::ReadError(format!("batch concat error: {e}")))?
271 };
272
273 Ok(Some(SourceBatch::new(combined)))
274 }
275
276 fn schema(&self) -> SchemaRef {
277 self.schema.clone()
278 }
279
280 fn checkpoint(&self) -> SourceCheckpoint {
281 let mut cp = SourceCheckpoint::new(0);
282 self.manifest.to_checkpoint(&mut cp);
283 cp
284 }
285
286 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
287 match FileIngestionManifest::from_checkpoint(checkpoint) {
288 Ok(manifest) => {
289 info!(
290 "file source: restored manifest with {} active entries",
291 manifest.active_count()
292 );
293 self.manifest = manifest;
294 }
295 Err(e) => {
296 warn!("file source: manifest restore failed: {e} — starting fresh");
297 self.manifest = FileIngestionManifest::new();
298 }
299 }
300 Ok(())
301 }
302
303 fn health_check(&self) -> HealthStatus {
304 if self.is_open {
305 HealthStatus::Healthy
306 } else {
307 HealthStatus::Unknown
308 }
309 }
310
311 async fn close(&mut self) -> Result<(), ConnectorError> {
312 self.discovery = None;
313 self.decoder = None;
314 self.is_open = false;
315 info!("file source closed");
316 Ok(())
317 }
318
319 fn supports_replay(&self) -> bool {
320 true
321 }
322}
323
324fn build_decoder_and_schema(
327 format: FileFormat,
328 src_config: &FileSourceConfig,
329 connector_config: &ConnectorConfig,
330) -> Result<(Box<dyn FormatDecoder>, SchemaRef), ConnectorError> {
331 match format {
332 FileFormat::Csv => {
333 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
334 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
335 });
336 let csv_config = crate::schema::CsvDecoderConfig {
337 delimiter: src_config.csv_delimiter,
338 has_header: src_config.csv_has_header,
339 ..crate::schema::CsvDecoderConfig::default()
340 };
341 let decoder = crate::schema::CsvDecoder::with_config(schema.clone(), csv_config);
342 Ok((Box::new(decoder), schema))
343 }
344 FileFormat::Json => {
345 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
346 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
347 });
348 let json_config = crate::schema::JsonDecoderConfig::default();
349 let decoder = crate::schema::JsonDecoder::with_config(schema.clone(), json_config);
350 Ok((Box::new(decoder), schema))
351 }
352 FileFormat::Text => {
353 let decoder = TextLineDecoder::new();
354 let schema = decoder.output_schema();
355 Ok((Box::new(decoder), schema))
356 }
357 FileFormat::Parquet => {
358 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
361 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
362 });
363 let decoder = crate::schema::parquet::ParquetDecoder::new(schema.clone());
364 Ok((Box::new(decoder), schema))
365 }
366 FileFormat::ArrowIpc => {
367 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
371 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
372 });
373 let decoder = super::arrow_ipc_codec::ArrowIpcDecoder::new(schema.clone());
374 Ok((Box::new(decoder), schema))
375 }
376 }
377}
378
379async fn read_file_bytes(path: &str) -> Result<Vec<u8>, ConnectorError> {
380 if path.starts_with("s3://")
382 || path.starts_with("gs://")
383 || path.starts_with("az://")
384 || path.starts_with("abfs://")
385 || path.starts_with("abfss://")
386 {
387 Err(ConnectorError::ConfigurationError(
388 "cloud file reading not yet implemented in poll_batch; use local paths".into(),
389 ))
390 } else {
391 tokio::fs::read(path)
392 .await
393 .map_err(|e| ConnectorError::ReadError(format!("cannot read file '{path}': {e}")))
394 }
395}
396
397fn append_metadata_column(
398 batch: &RecordBatch,
399 file_path: &str,
400 file_size: u64,
401 modified_ms: u64,
402) -> Result<RecordBatch, ConnectorError> {
403 use arrow_array::{ArrayRef, StringArray, StructArray, UInt64Array};
404
405 let n = batch.num_rows();
406 let file_name = std::path::Path::new(file_path)
407 .file_name()
408 .and_then(|n| n.to_str())
409 .unwrap_or(file_path);
410
411 let path_array: ArrayRef = Arc::new(StringArray::from(vec![file_path; n]));
412 let name_array: ArrayRef = Arc::new(StringArray::from(vec![file_name; n]));
413 let size_array: ArrayRef = Arc::new(UInt64Array::from(vec![file_size; n]));
414 #[allow(clippy::cast_possible_wrap)]
415 let mod_array: ArrayRef = Arc::new(arrow_array::TimestampMillisecondArray::from(vec![
416 modified_ms as i64;
417 n
418 ]));
419
420 let fields = vec![
421 Field::new("file_path", DataType::Utf8, false),
422 Field::new("file_name", DataType::Utf8, false),
423 Field::new("file_size", DataType::UInt64, false),
424 Field::new(
425 "file_modification_time",
426 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
427 true,
428 ),
429 ];
430 let struct_array = StructArray::try_new(
431 fields.into(),
432 vec![path_array, name_array, size_array, mod_array],
433 None,
434 )
435 .map_err(|e| ConnectorError::ReadError(format!("metadata struct error: {e}")))?;
436
437 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
439 columns.push(Arc::new(struct_array));
440
441 let mut fields: Vec<Field> = batch
442 .schema()
443 .fields()
444 .iter()
445 .map(|f| f.as_ref().clone())
446 .collect();
447 fields.push(Field::new(
448 "_metadata",
449 DataType::Struct(
450 vec![
451 Field::new("file_path", DataType::Utf8, false),
452 Field::new("file_name", DataType::Utf8, false),
453 Field::new("file_size", DataType::UInt64, false),
454 Field::new(
455 "file_modification_time",
456 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
457 true,
458 ),
459 ]
460 .into(),
461 ),
462 false,
463 ));
464
465 RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
466 .map_err(|e| ConnectorError::ReadError(format!("metadata append error: {e}")))
467}
468
469#[allow(clippy::cast_possible_truncation)]
470fn now_millis() -> u64 {
471 SystemTime::now()
472 .duration_since(UNIX_EPOCH)
473 .unwrap_or_default()
474 .as_millis() as u64
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480
481 #[test]
482 fn test_file_source_default() {
483 let source = FileSource::new();
484 assert!(!source.is_open);
485 assert_eq!(source.manifest.active_count(), 0);
486 }
487
488 #[tokio::test]
489 async fn test_open_missing_path() {
490 let mut source = FileSource::new();
491 let config = ConnectorConfig::new("files");
492 let result = source.open(&config).await;
493 assert!(result.is_err());
494 }
495
496 #[tokio::test]
497 async fn test_open_with_text_format() {
498 let mut source = FileSource::new();
499 let mut config = ConnectorConfig::new("files");
500 config.set("path", "/tmp");
501 config.set("format", "text");
502 let result = source.open(&config).await;
503 assert!(result.is_ok());
504 assert!(source.is_open);
505 assert_eq!(source.schema().field(0).name(), "line");
506 source.close().await.unwrap();
507 }
508
509 #[tokio::test]
510 async fn test_poll_batch_when_not_open() {
511 let mut source = FileSource::new();
512 let result = source.poll_batch(100).await;
513 assert!(result.is_err());
514 }
515
516 #[test]
517 fn test_checkpoint_roundtrip() {
518 let mut source = FileSource::new();
519 source.manifest.insert(
520 "test.csv".into(),
521 FileEntry {
522 size: 100,
523 discovered_at: 1000,
524 ingested_at: 2000,
525 },
526 );
527 let cp = source.checkpoint();
528 assert!(cp.get_offset("manifest").is_some());
529 }
530
531 #[tokio::test]
532 async fn test_restore_from_checkpoint() {
533 let mut source = FileSource::new();
534
535 let mut cp = SourceCheckpoint::new(1);
537 cp.set_offset(
538 "manifest",
539 r#"{"a.csv":{"size":100,"discovered_at":900,"ingested_at":1000}}"#,
540 );
541
542 source.restore(&cp).await.unwrap();
543 assert_eq!(source.manifest.active_count(), 1);
544 assert!(source.manifest.contains("a.csv"));
545 }
546
547 #[test]
548 fn test_health_check() {
549 let source = FileSource::new();
550 assert!(matches!(source.health_check(), HealthStatus::Unknown));
551 }
552
553 #[test]
554 fn test_supports_replay() {
555 let source = FileSource::new();
556 assert!(source.supports_replay());
557 }
558}