1use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use tracing::{debug, info, warn};
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::schema::traits::FormatDecoder;
20use crate::schema::types::RawRecord;
21
22use super::config::{FileFormat, FileSourceConfig};
23use super::discovery::{DiscoveryConfig, FileDiscoveryEngine};
24use super::manifest::{FileEntry, FileIngestionManifest};
25use super::text_decoder::TextLineDecoder;
26
27pub struct FileSource {
32 config: Option<FileSourceConfig>,
34 schema: SchemaRef,
36 decoder: Option<Box<dyn FormatDecoder>>,
38 discovery: Option<FileDiscoveryEngine>,
40 manifest: FileIngestionManifest,
42 is_open: bool,
44}
45
46impl FileSource {
47 #[must_use]
49 pub fn new() -> Self {
50 let empty_schema = Arc::new(Schema::empty());
51 Self {
52 config: None,
53 schema: empty_schema,
54 decoder: None,
55 discovery: None,
56 manifest: FileIngestionManifest::new(),
57 is_open: false,
58 }
59 }
60}
61
62impl Default for FileSource {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl std::fmt::Debug for FileSource {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("FileSource")
71 .field("is_open", &self.is_open)
72 .field("schema_fields", &self.schema.fields().len())
73 .field("manifest_count", &self.manifest.active_count())
74 .finish()
75 }
76}
77
78#[async_trait]
79impl SourceConnector for FileSource {
80 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
81 let src_config = FileSourceConfig::from_connector_config(config)?;
82
83 let format = match src_config.format {
85 Some(f) => f,
86 None => FileFormat::from_extension(&src_config.path).ok_or_else(|| {
87 ConnectorError::ConfigurationError(
88 "cannot detect format from path; specify 'format' explicitly".into(),
89 )
90 })?,
91 };
92
93 let (decoder, schema) = build_decoder_and_schema(format, &src_config, config)?;
95
96 let final_schema = if src_config.include_metadata {
98 let mut fields: Vec<Field> =
99 schema.fields().iter().map(|f| f.as_ref().clone()).collect();
100 fields.push(Field::new(
101 "_metadata",
102 DataType::Struct(
103 vec![
104 Field::new("file_path", DataType::Utf8, false),
105 Field::new("file_name", DataType::Utf8, false),
106 Field::new("file_size", DataType::UInt64, false),
107 Field::new("file_modification_time", DataType::Int64, true),
108 ]
109 .into(),
110 ),
111 false,
112 ));
113 Arc::new(Schema::new(fields))
114 } else {
115 schema
116 };
117
118 let discovery_config = DiscoveryConfig {
120 path: src_config.path.clone(),
121 poll_interval: src_config.poll_interval,
122 stabilisation_delay: src_config.stabilisation_delay,
123 glob_pattern: src_config.glob_pattern.clone(),
124 };
125 let known = Arc::new(self.manifest.snapshot_for_dedup());
126 let discovery = FileDiscoveryEngine::start(discovery_config, known);
127
128 self.config = Some(src_config);
129 self.schema = final_schema;
130 self.decoder = Some(decoder);
131 self.discovery = Some(discovery);
132 self.is_open = true;
133
134 info!(
135 "file source opened: format={format:?}, schema_fields={}",
136 self.schema.fields().len()
137 );
138 Ok(())
139 }
140
141 async fn poll_batch(
142 &mut self,
143 _max_records: usize,
144 ) -> Result<Option<SourceBatch>, ConnectorError> {
145 let config = self
146 .config
147 .as_ref()
148 .ok_or_else(|| ConnectorError::InvalidState {
149 expected: "open".into(),
150 actual: "closed".into(),
151 })?;
152 let discovery = self
153 .discovery
154 .as_mut()
155 .ok_or_else(|| ConnectorError::InvalidState {
156 expected: "discovery running".into(),
157 actual: "no discovery".into(),
158 })?;
159 let decoder = self
160 .decoder
161 .as_ref()
162 .ok_or_else(|| ConnectorError::InvalidState {
163 expected: "decoder ready".into(),
164 actual: "no decoder".into(),
165 })?;
166
167 let files = discovery.drain(config.max_files_per_poll);
168 if files.is_empty() {
169 return Ok(None);
170 }
171
172 let mut all_batches: Vec<RecordBatch> = Vec::new();
173
174 for file in &files {
175 if !config.allow_overwrites && self.manifest.size_changed(&file.path, file.size) {
177 warn!(
178 "file source: skipping '{}' — size changed (was {}, now {})",
179 file.path,
180 self.manifest
181 .active_entries()
182 .find(|(p, _)| *p == file.path)
183 .map(|(_, e)| e.size)
184 .unwrap_or(0),
185 file.size
186 );
187 continue;
188 }
189
190 if self.manifest.contains(&file.path) {
192 continue;
193 }
194
195 if file.size > config.max_file_bytes as u64 {
197 warn!(
198 "file source: skipping '{}' — size {} exceeds max_file_bytes {}",
199 file.path, file.size, config.max_file_bytes
200 );
201 continue;
202 }
203
204 let bytes = match read_file_bytes(&file.path).await {
206 Ok(b) => b,
207 Err(e) => {
208 warn!("file source: cannot read '{}': {e}", file.path);
209 continue;
210 }
211 };
212
213 let record = RawRecord::new(bytes);
215 match decoder.decode_batch(&[record]) {
216 Ok(batch) if batch.num_rows() > 0 => {
217 let batch = if config.include_metadata {
218 append_metadata_column(&batch, &file.path, file.size, file.modified_ms)?
219 } else {
220 batch
221 };
222 all_batches.push(batch);
223 }
224 Ok(_) => {
225 debug!("file source: empty batch from '{}'", file.path);
226 }
227 Err(e) => {
228 warn!("file source: decode error for '{}': {e}", file.path);
229 continue;
230 }
231 }
232
233 self.manifest.insert(
235 file.path.clone(),
236 FileEntry {
237 size: file.size,
238 discovered_at: file.modified_ms,
239 ingested_at: now_millis(),
240 },
241 );
242 }
243
244 if let Some(cfg) = &self.config {
246 let max_age_ms = cfg.manifest_retention_age_days * 24 * 60 * 60 * 1000;
247 self.manifest
248 .maybe_evict(cfg.manifest_retention_count, max_age_ms);
249 }
250
251 if all_batches.is_empty() {
252 return Ok(None);
253 }
254
255 let combined = if all_batches.len() == 1 {
257 all_batches.into_iter().next().unwrap()
258 } else {
259 arrow_select::concat::concat_batches(&self.schema, &all_batches)
260 .map_err(|e| ConnectorError::ReadError(format!("batch concat error: {e}")))?
261 };
262
263 Ok(Some(SourceBatch::new(combined)))
264 }
265
266 fn schema(&self) -> SchemaRef {
267 self.schema.clone()
268 }
269
270 fn checkpoint(&self) -> SourceCheckpoint {
271 let mut cp = SourceCheckpoint::new(0);
272 self.manifest.to_checkpoint(&mut cp);
273 cp
274 }
275
276 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
277 match FileIngestionManifest::from_checkpoint(checkpoint) {
278 Ok(manifest) => {
279 info!(
280 "file source: restored manifest with {} active entries",
281 manifest.active_count()
282 );
283 self.manifest = manifest;
284 }
285 Err(e) => {
286 warn!("file source: manifest restore failed: {e} — starting fresh");
287 self.manifest = FileIngestionManifest::new();
288 }
289 }
290 Ok(())
291 }
292
293 fn health_check(&self) -> HealthStatus {
294 if self.is_open {
295 HealthStatus::Healthy
296 } else {
297 HealthStatus::Unknown
298 }
299 }
300
301 async fn close(&mut self) -> Result<(), ConnectorError> {
302 self.discovery = None;
303 self.decoder = None;
304 self.is_open = false;
305 info!("file source closed");
306 Ok(())
307 }
308
309 fn supports_replay(&self) -> bool {
310 true
311 }
312
313 fn as_schema_evolvable(&self) -> Option<&dyn crate::schema::traits::SchemaEvolvable> {
314 None
316 }
317}
318
319fn build_decoder_and_schema(
322 format: FileFormat,
323 src_config: &FileSourceConfig,
324 connector_config: &ConnectorConfig,
325) -> Result<(Box<dyn FormatDecoder>, SchemaRef), ConnectorError> {
326 match format {
327 FileFormat::Csv => {
328 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
329 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
330 });
331 let csv_config = crate::schema::CsvDecoderConfig {
332 delimiter: src_config.csv_delimiter,
333 has_header: src_config.csv_has_header,
334 ..crate::schema::CsvDecoderConfig::default()
335 };
336 let decoder = crate::schema::CsvDecoder::with_config(schema.clone(), csv_config);
337 Ok((Box::new(decoder), schema))
338 }
339 FileFormat::Json => {
340 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
341 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
342 });
343 let json_config = crate::schema::JsonDecoderConfig::default();
344 let decoder = crate::schema::JsonDecoder::with_config(schema.clone(), json_config);
345 Ok((Box::new(decoder), schema))
346 }
347 FileFormat::Text => {
348 let decoder = TextLineDecoder::new();
349 let schema = decoder.output_schema();
350 Ok((Box::new(decoder), schema))
351 }
352 FileFormat::Parquet => {
353 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
356 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
357 });
358 let decoder = crate::schema::parquet::ParquetDecoder::new(schema.clone());
359 Ok((Box::new(decoder), schema))
360 }
361 }
362}
363
364async fn read_file_bytes(path: &str) -> Result<Vec<u8>, ConnectorError> {
365 if path.starts_with("s3://")
367 || path.starts_with("gs://")
368 || path.starts_with("az://")
369 || path.starts_with("abfs://")
370 || path.starts_with("abfss://")
371 {
372 Err(ConnectorError::ConfigurationError(
373 "cloud file reading not yet implemented in poll_batch; use local paths".into(),
374 ))
375 } else {
376 tokio::fs::read(path)
377 .await
378 .map_err(|e| ConnectorError::ReadError(format!("cannot read file '{path}': {e}")))
379 }
380}
381
382fn append_metadata_column(
383 batch: &RecordBatch,
384 file_path: &str,
385 file_size: u64,
386 modified_ms: u64,
387) -> Result<RecordBatch, ConnectorError> {
388 use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, UInt64Array};
389
390 let n = batch.num_rows();
391 let file_name = std::path::Path::new(file_path)
392 .file_name()
393 .and_then(|n| n.to_str())
394 .unwrap_or(file_path);
395
396 let path_array: ArrayRef = Arc::new(StringArray::from(vec![file_path; n]));
397 let name_array: ArrayRef = Arc::new(StringArray::from(vec![file_name; n]));
398 let size_array: ArrayRef = Arc::new(UInt64Array::from(vec![file_size; n]));
399 #[allow(clippy::cast_possible_wrap)]
400 let mod_array: ArrayRef = Arc::new(Int64Array::from(vec![modified_ms as i64; n]));
401
402 let fields = vec![
403 Field::new("file_path", DataType::Utf8, false),
404 Field::new("file_name", DataType::Utf8, false),
405 Field::new("file_size", DataType::UInt64, false),
406 Field::new("file_modification_time", DataType::Int64, true),
407 ];
408 let struct_array = StructArray::try_new(
409 fields.into(),
410 vec![path_array, name_array, size_array, mod_array],
411 None,
412 )
413 .map_err(|e| ConnectorError::ReadError(format!("metadata struct error: {e}")))?;
414
415 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
417 columns.push(Arc::new(struct_array));
418
419 let mut fields: Vec<Field> = batch
420 .schema()
421 .fields()
422 .iter()
423 .map(|f| f.as_ref().clone())
424 .collect();
425 fields.push(Field::new(
426 "_metadata",
427 DataType::Struct(
428 vec![
429 Field::new("file_path", DataType::Utf8, false),
430 Field::new("file_name", DataType::Utf8, false),
431 Field::new("file_size", DataType::UInt64, false),
432 Field::new("file_modification_time", DataType::Int64, true),
433 ]
434 .into(),
435 ),
436 false,
437 ));
438
439 RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
440 .map_err(|e| ConnectorError::ReadError(format!("metadata append error: {e}")))
441}
442
443#[allow(clippy::cast_possible_truncation)]
444fn now_millis() -> u64 {
445 SystemTime::now()
446 .duration_since(UNIX_EPOCH)
447 .unwrap_or_default()
448 .as_millis() as u64
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn test_file_source_default() {
457 let source = FileSource::new();
458 assert!(!source.is_open);
459 assert_eq!(source.manifest.active_count(), 0);
460 }
461
462 #[tokio::test]
463 async fn test_open_missing_path() {
464 let mut source = FileSource::new();
465 let config = ConnectorConfig::new("files");
466 let result = source.open(&config).await;
467 assert!(result.is_err());
468 }
469
470 #[tokio::test]
471 async fn test_open_with_text_format() {
472 let mut source = FileSource::new();
473 let mut config = ConnectorConfig::new("files");
474 config.set("path", "/tmp");
475 config.set("format", "text");
476 let result = source.open(&config).await;
477 assert!(result.is_ok());
478 assert!(source.is_open);
479 assert_eq!(source.schema().field(0).name(), "line");
480 source.close().await.unwrap();
481 }
482
483 #[tokio::test]
484 async fn test_poll_batch_when_not_open() {
485 let mut source = FileSource::new();
486 let result = source.poll_batch(100).await;
487 assert!(result.is_err());
488 }
489
490 #[test]
491 fn test_checkpoint_roundtrip() {
492 let mut source = FileSource::new();
493 source.manifest.insert(
494 "test.csv".into(),
495 FileEntry {
496 size: 100,
497 discovered_at: 1000,
498 ingested_at: 2000,
499 },
500 );
501 let cp = source.checkpoint();
502 assert!(cp.get_offset("manifest").is_some());
503 }
504
505 #[tokio::test]
506 async fn test_restore_from_checkpoint() {
507 let mut source = FileSource::new();
508
509 let mut cp = SourceCheckpoint::new(1);
511 cp.set_offset(
512 "manifest",
513 r#"{"a.csv":{"size":100,"discovered_at":900,"ingested_at":1000}}"#,
514 );
515
516 source.restore(&cp).await.unwrap();
517 assert_eq!(source.manifest.active_count(), 1);
518 assert!(source.manifest.contains("a.csv"));
519 }
520
521 #[test]
522 fn test_health_check() {
523 let source = FileSource::new();
524 assert!(matches!(source.health_check(), HealthStatus::Unknown));
525 }
526
527 #[test]
528 fn test_supports_replay() {
529 let source = FileSource::new();
530 assert!(source.supports_replay());
531 }
532}