1use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use tracing::{debug, info, warn};
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::schema::traits::FormatDecoder;
19use crate::schema::types::RawRecord;
20
21use super::config::{FileFormat, FileSourceConfig};
22use super::discovery::{DiscoveryConfig, FileDiscoveryEngine};
23use super::manifest::{FileEntry, FileIngestionManifest};
24use super::text_decoder::TextLineDecoder;
25
26pub struct FileSource {
31 config: Option<FileSourceConfig>,
33 schema: SchemaRef,
35 decoder: Option<Box<dyn FormatDecoder>>,
37 discovery: Option<FileDiscoveryEngine>,
39 manifest: FileIngestionManifest,
41 is_open: bool,
43}
44
45impl FileSource {
46 #[must_use]
48 pub fn new() -> Self {
49 Self::with_registry(None)
50 }
51
52 #[must_use]
54 pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
55 let empty_schema = Arc::new(Schema::empty());
56 Self {
57 config: None,
58 schema: empty_schema,
59 decoder: None,
60 discovery: None,
61 manifest: FileIngestionManifest::new(),
62 is_open: false,
63 }
64 }
65}
66
67impl Default for FileSource {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl std::fmt::Debug for FileSource {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("FileSource")
76 .field("is_open", &self.is_open)
77 .field("schema_fields", &self.schema.fields().len())
78 .field("manifest_count", &self.manifest.active_count())
79 .finish()
80 }
81}
82
83#[async_trait]
84impl SourceConnector for FileSource {
85 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
86 let src_config = FileSourceConfig::from_connector_config(config)?;
87
88 if is_cloud_url(&src_config.path) {
93 return Err(ConnectorError::ConfigurationError(format!(
94 "cloud paths are not supported by the 'files' source yet: {}",
95 src_config.path
96 )));
97 }
98
99 let format = match src_config.format {
101 Some(f) => f,
102 None => FileFormat::from_extension(&src_config.path).ok_or_else(|| {
103 ConnectorError::ConfigurationError(
104 "cannot detect format from path; specify 'format' explicitly".into(),
105 )
106 })?,
107 };
108
109 let (decoder, schema) = build_decoder_and_schema(format, &src_config, config)?;
111
112 let final_schema = if src_config.include_metadata {
114 let mut fields: Vec<Field> =
115 schema.fields().iter().map(|f| f.as_ref().clone()).collect();
116 fields.push(Field::new(
117 "_metadata",
118 DataType::Struct(
119 vec![
120 Field::new("file_path", DataType::Utf8, false),
121 Field::new("file_name", DataType::Utf8, false),
122 Field::new("file_size", DataType::UInt64, false),
123 Field::new(
124 "file_modification_time",
125 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
126 true,
127 ),
128 ]
129 .into(),
130 ),
131 false,
132 ));
133 Arc::new(Schema::new(fields))
134 } else {
135 schema
136 };
137
138 let discovery_config = DiscoveryConfig {
140 path: src_config.path.clone(),
141 poll_interval: src_config.poll_interval,
142 stabilisation_delay: src_config.stabilisation_delay,
143 glob_pattern: src_config.glob_pattern.clone(),
144 };
145 let known = Arc::new(self.manifest.snapshot_for_dedup());
146 let discovery = FileDiscoveryEngine::start(discovery_config, known);
147
148 self.config = Some(src_config);
149 self.schema = final_schema;
150 self.decoder = Some(decoder);
151 self.discovery = Some(discovery);
152 self.is_open = true;
153
154 info!(
155 "file source opened: format={format:?}, schema_fields={}",
156 self.schema.fields().len()
157 );
158 Ok(())
159 }
160
161 async fn poll_batch(
162 &mut self,
163 _max_records: usize,
164 ) -> Result<Option<SourceBatch>, ConnectorError> {
165 let config = self
166 .config
167 .as_ref()
168 .ok_or_else(|| ConnectorError::InvalidState {
169 expected: "open".into(),
170 actual: "closed".into(),
171 })?;
172 let discovery = self
173 .discovery
174 .as_mut()
175 .ok_or_else(|| ConnectorError::InvalidState {
176 expected: "discovery running".into(),
177 actual: "no discovery".into(),
178 })?;
179 let decoder = self
180 .decoder
181 .as_ref()
182 .ok_or_else(|| ConnectorError::InvalidState {
183 expected: "decoder ready".into(),
184 actual: "no decoder".into(),
185 })?;
186
187 let files = discovery.drain(config.max_files_per_poll);
188 if files.is_empty() {
189 return Ok(None);
190 }
191
192 let mut all_batches: Vec<RecordBatch> = Vec::new();
193
194 for file in &files {
195 if !config.allow_overwrites && self.manifest.size_changed(&file.path, file.size) {
197 warn!(
198 "file source: skipping '{}' — size changed (was {}, now {})",
199 file.path,
200 self.manifest
201 .active_entries()
202 .find(|(p, _)| *p == file.path)
203 .map(|(_, e)| e.size)
204 .unwrap_or(0),
205 file.size
206 );
207 continue;
208 }
209
210 if self.manifest.contains(&file.path) {
212 continue;
213 }
214
215 if file.size > config.max_file_bytes as u64 {
217 warn!(
218 "file source: skipping '{}' — size {} exceeds max_file_bytes {}",
219 file.path, file.size, config.max_file_bytes
220 );
221 continue;
222 }
223
224 let bytes = match read_file_bytes(&file.path).await {
226 Ok(b) => b,
227 Err(e) => {
228 warn!("file source: cannot read '{}': {e}", file.path);
229 continue;
230 }
231 };
232
233 let record = RawRecord::new(bytes);
235 match decoder.decode_batch(&[record]) {
236 Ok(batch) if batch.num_rows() > 0 => {
237 let batch = if config.include_metadata {
238 append_metadata_column(&batch, &file.path, file.size, file.modified_ms)?
239 } else {
240 batch
241 };
242 all_batches.push(batch);
243 }
244 Ok(_) => {
245 debug!("file source: empty batch from '{}'", file.path);
246 }
247 Err(e) => {
248 warn!("file source: decode error for '{}': {e}", file.path);
249 continue;
250 }
251 }
252
253 self.manifest.insert(
255 file.path.clone(),
256 FileEntry {
257 size: file.size,
258 discovered_at: file.modified_ms,
259 ingested_at: now_millis(),
260 },
261 );
262 }
263
264 if let Some(cfg) = &self.config {
266 let max_age_ms = cfg.manifest_retention_age_days * 24 * 60 * 60 * 1000;
267 self.manifest
268 .maybe_evict(cfg.manifest_retention_count, max_age_ms);
269 }
270
271 if all_batches.is_empty() {
272 return Ok(None);
273 }
274
275 let combined = if all_batches.len() == 1 {
277 all_batches.into_iter().next().unwrap()
278 } else {
279 arrow_select::concat::concat_batches(&self.schema, &all_batches)
280 .map_err(|e| ConnectorError::ReadError(format!("batch concat error: {e}")))?
281 };
282
283 Ok(Some(SourceBatch::new(combined)))
284 }
285
286 fn schema(&self) -> SchemaRef {
287 self.schema.clone()
288 }
289
290 fn checkpoint(&self) -> SourceCheckpoint {
291 let mut cp = SourceCheckpoint::new(0);
292 self.manifest.to_checkpoint(&mut cp);
293 cp
294 }
295
296 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
297 match FileIngestionManifest::from_checkpoint(checkpoint) {
298 Ok(manifest) => {
299 info!(
300 "file source: restored manifest with {} active entries",
301 manifest.active_count()
302 );
303 self.manifest = manifest;
304 }
305 Err(e) => {
306 warn!("file source: manifest restore failed: {e} — starting fresh");
307 self.manifest = FileIngestionManifest::new();
308 }
309 }
310 Ok(())
311 }
312
313 async fn close(&mut self) -> Result<(), ConnectorError> {
314 self.discovery = None;
315 self.decoder = None;
316 self.is_open = false;
317 info!("file source closed");
318 Ok(())
319 }
320
321 fn supports_replay(&self) -> bool {
322 true
323 }
324}
325
326fn build_decoder_and_schema(
329 format: FileFormat,
330 src_config: &FileSourceConfig,
331 connector_config: &ConnectorConfig,
332) -> Result<(Box<dyn FormatDecoder>, SchemaRef), ConnectorError> {
333 match format {
334 FileFormat::Csv => {
335 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
336 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
337 });
338 let csv_config = crate::schema::CsvDecoderConfig {
339 delimiter: src_config.csv_delimiter,
340 has_header: src_config.csv_has_header,
341 ..crate::schema::CsvDecoderConfig::default()
342 };
343 let decoder = crate::schema::CsvDecoder::with_config(schema.clone(), csv_config);
344 Ok((Box::new(decoder), schema))
345 }
346 FileFormat::Json => {
347 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
348 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
349 });
350 let json_config = crate::schema::JsonDecoderConfig::default();
351 let decoder = crate::schema::JsonDecoder::with_config(schema.clone(), json_config);
352 Ok((Box::new(decoder), schema))
353 }
354 FileFormat::Text => {
355 let decoder = TextLineDecoder::new();
356 let schema = decoder.output_schema();
357 Ok((Box::new(decoder), schema))
358 }
359 FileFormat::Parquet => {
360 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
363 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
364 });
365 let decoder = crate::schema::parquet::ParquetDecoder::new(schema.clone());
366 Ok((Box::new(decoder), schema))
367 }
368 FileFormat::ArrowIpc => {
369 let schema = connector_config.arrow_schema().unwrap_or_else(|| {
373 Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
374 });
375 let decoder = super::arrow_ipc_codec::ArrowIpcDecoder::new(schema.clone());
376 Ok((Box::new(decoder), schema))
377 }
378 }
379}
380
381fn is_cloud_url(path: &str) -> bool {
382 const CLOUD_SCHEMES: &[&str] = &["s3", "s3a", "s3n", "gs", "gcs", "az", "abfs", "abfss"];
383 let Some((scheme, _)) = path.split_once("://") else {
384 return false;
385 };
386 CLOUD_SCHEMES.iter().any(|s| scheme.eq_ignore_ascii_case(s))
387}
388
389async fn read_file_bytes(path: &str) -> Result<Vec<u8>, ConnectorError> {
390 debug_assert!(
392 !is_cloud_url(path),
393 "cloud paths must be rejected at open()"
394 );
395 tokio::fs::read(path)
396 .await
397 .map_err(|e| ConnectorError::ReadError(format!("cannot read file '{path}': {e}")))
398}
399
400fn append_metadata_column(
401 batch: &RecordBatch,
402 file_path: &str,
403 file_size: u64,
404 modified_ms: u64,
405) -> Result<RecordBatch, ConnectorError> {
406 use arrow_array::{ArrayRef, StringArray, StructArray, UInt64Array};
407
408 let n = batch.num_rows();
409 let file_name = std::path::Path::new(file_path)
410 .file_name()
411 .and_then(|n| n.to_str())
412 .unwrap_or(file_path);
413
414 let path_array: ArrayRef = Arc::new(StringArray::from(vec![file_path; n]));
415 let name_array: ArrayRef = Arc::new(StringArray::from(vec![file_name; n]));
416 let size_array: ArrayRef = Arc::new(UInt64Array::from(vec![file_size; n]));
417 #[allow(clippy::cast_possible_wrap)]
418 let mod_array: ArrayRef = Arc::new(arrow_array::TimestampMillisecondArray::from(vec![
419 modified_ms as i64;
420 n
421 ]));
422
423 let fields = vec![
424 Field::new("file_path", DataType::Utf8, false),
425 Field::new("file_name", DataType::Utf8, false),
426 Field::new("file_size", DataType::UInt64, false),
427 Field::new(
428 "file_modification_time",
429 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
430 true,
431 ),
432 ];
433 let struct_array = StructArray::try_new(
434 fields.into(),
435 vec![path_array, name_array, size_array, mod_array],
436 None,
437 )
438 .map_err(|e| ConnectorError::ReadError(format!("metadata struct error: {e}")))?;
439
440 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
442 columns.push(Arc::new(struct_array));
443
444 let mut fields: Vec<Field> = batch
445 .schema()
446 .fields()
447 .iter()
448 .map(|f| f.as_ref().clone())
449 .collect();
450 fields.push(Field::new(
451 "_metadata",
452 DataType::Struct(
453 vec![
454 Field::new("file_path", DataType::Utf8, false),
455 Field::new("file_name", DataType::Utf8, false),
456 Field::new("file_size", DataType::UInt64, false),
457 Field::new(
458 "file_modification_time",
459 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
460 true,
461 ),
462 ]
463 .into(),
464 ),
465 false,
466 ));
467
468 RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
469 .map_err(|e| ConnectorError::ReadError(format!("metadata append error: {e}")))
470}
471
472#[allow(clippy::cast_possible_truncation)]
473fn now_millis() -> u64 {
474 SystemTime::now()
475 .duration_since(UNIX_EPOCH)
476 .unwrap_or_default()
477 .as_millis() as u64
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483
484 #[test]
485 fn test_file_source_default() {
486 let source = FileSource::new();
487 assert!(!source.is_open);
488 assert_eq!(source.manifest.active_count(), 0);
489 }
490
491 #[tokio::test]
492 async fn test_open_missing_path() {
493 let mut source = FileSource::new();
494 let config = ConnectorConfig::new("files");
495 let result = source.open(&config).await;
496 assert!(result.is_err());
497 }
498
499 #[tokio::test]
500 async fn test_open_with_text_format() {
501 let mut source = FileSource::new();
502 let mut config = ConnectorConfig::new("files");
503 config.set("path", "/tmp");
504 config.set("format", "text");
505 let result = source.open(&config).await;
506 assert!(result.is_ok());
507 assert!(source.is_open);
508 assert_eq!(source.schema().field(0).name(), "line");
509 source.close().await.unwrap();
510 }
511
512 #[tokio::test]
513 async fn test_poll_batch_when_not_open() {
514 let mut source = FileSource::new();
515 let result = source.poll_batch(100).await;
516 assert!(result.is_err());
517 }
518
519 #[test]
520 fn test_checkpoint_roundtrip() {
521 let mut source = FileSource::new();
522 source.manifest.insert(
523 "test.csv".into(),
524 FileEntry {
525 size: 100,
526 discovered_at: 1000,
527 ingested_at: 2000,
528 },
529 );
530 let cp = source.checkpoint();
531 assert!(cp.get_offset("manifest").is_some());
532 }
533
534 #[tokio::test]
535 async fn test_restore_from_checkpoint() {
536 let mut source = FileSource::new();
537
538 let mut cp = SourceCheckpoint::new(1);
540 cp.set_offset(
541 "manifest",
542 r#"{"a.csv":{"size":100,"discovered_at":900,"ingested_at":1000}}"#,
543 );
544
545 source.restore(&cp).await.unwrap();
546 assert_eq!(source.manifest.active_count(), 1);
547 assert!(source.manifest.contains("a.csv"));
548 }
549
550 #[test]
551 fn test_supports_replay() {
552 let source = FileSource::new();
553 assert!(source.supports_replay());
554 }
555}