Skip to main content

laminar_sql/parser/
lookup_table.rs

1//! Parser for CREATE/DROP LOOKUP TABLE DDL statements.
2//!
3//! Lookup tables are dimension/reference tables used in enrichment joins.
4//! They can be backed by external connectors (PostgreSQL CDC, Redis, etc.)
5//! with configurable caching and predicate pushdown strategies.
6
7#[allow(clippy::disallowed_types)] // cold path: SQL parsing
8use std::collections::HashMap;
9
10use sqlparser::ast::{ColumnDef, ObjectName};
11use sqlparser::keywords::Keyword;
12use sqlparser::parser::Parser;
13use sqlparser::tokenizer::Token;
14
15use super::tokenizer::{expect_custom_keyword, parse_with_options};
16use super::ParseError;
17
18/// CREATE LOOKUP TABLE statement.
19#[derive(Debug, Clone, PartialEq)]
20pub struct CreateLookupTableStatement {
21    /// Table name.
22    pub name: ObjectName,
23    /// Column definitions.
24    pub columns: Vec<ColumnDef>,
25    /// Primary key column names.
26    pub primary_key: Vec<String>,
27    /// WITH clause options.
28    pub with_options: HashMap<String, String>,
29    /// Whether OR REPLACE was specified.
30    pub or_replace: bool,
31    /// Whether IF NOT EXISTS was specified.
32    pub if_not_exists: bool,
33}
34
35/// Validated lookup table properties from the WITH clause.
36#[derive(Debug, Clone, PartialEq)]
37pub struct LookupTableProperties {
38    /// Connector type.
39    pub connector: ConnectorType,
40    /// Connection string.
41    pub connection: Option<String>,
42    /// Lookup strategy.
43    pub strategy: LookupStrategy,
44    /// In-memory cache size.
45    pub cache_memory: Option<ByteSize>,
46    /// On-disk cache size.
47    pub cache_disk: Option<ByteSize>,
48    /// Cache TTL in seconds.
49    pub cache_ttl: Option<u64>,
50    /// Predicate pushdown mode.
51    pub pushdown_mode: PushdownMode,
52}
53
54/// Connector type for lookup tables.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ConnectorType {
57    /// PostgreSQL CDC connector.
58    PostgresCdc,
59    /// MySQL CDC connector.
60    MysqlCdc,
61    /// Redis connector.
62    Redis,
63    /// S3 Parquet connector.
64    S3Parquet,
65    /// Static in-memory data.
66    Static,
67    /// Custom connector type.
68    Custom(String),
69}
70
71impl ConnectorType {
72    /// Parse a connector type from a string.
73    ///
74    /// # Errors
75    ///
76    /// Returns `ParseError` if the connector type is empty.
77    pub fn parse(s: &str) -> Result<Self, ParseError> {
78        if s.is_empty() {
79            return Err(ParseError::ValidationError(
80                "connector type cannot be empty".to_string(),
81            ));
82        }
83        Ok(match s.to_lowercase().as_str() {
84            "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
85            "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
86            "redis" => Self::Redis,
87            "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
88            "static" | "memory" => Self::Static,
89            other => Self::Custom(other.to_string()),
90        })
91    }
92}
93
94/// Lookup strategy for how table data is distributed/cached.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum LookupStrategy {
97    /// Full table replicated on every node.
98    #[default]
99    Replicated,
100    /// Table partitioned across nodes by key.
101    Partitioned,
102    /// Rows fetched on demand (no pre-loading).
103    OnDemand,
104}
105
106impl LookupStrategy {
107    /// Parse a lookup strategy from a string.
108    ///
109    /// # Errors
110    ///
111    /// Returns `ParseError` if the strategy is unknown.
112    pub fn parse(s: &str) -> Result<Self, ParseError> {
113        match s.to_lowercase().as_str() {
114            "replicated" | "full" => Ok(Self::Replicated),
115            "partitioned" | "sharded" => Ok(Self::Partitioned),
116            "on-demand" | "on_demand" | "lazy" => Ok(Self::OnDemand),
117            other => Err(ParseError::ValidationError(format!(
118                "unknown lookup strategy: '{other}' \
119                 (expected: replicated, partitioned, on-demand)"
120            ))),
121        }
122    }
123}
124
125/// Predicate pushdown mode.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum PushdownMode {
128    /// Automatically decide based on connector capabilities.
129    #[default]
130    Auto,
131    /// Always push predicates to the source.
132    Enabled,
133    /// Never push predicates to the source.
134    Disabled,
135}
136
137impl PushdownMode {
138    /// Parse a pushdown mode from a string.
139    ///
140    /// # Errors
141    ///
142    /// Returns `ParseError` if the mode is unknown.
143    pub fn parse(s: &str) -> Result<Self, ParseError> {
144        match s.to_lowercase().as_str() {
145            "auto" => Ok(Self::Auto),
146            "enabled" | "true" | "on" => Ok(Self::Enabled),
147            "disabled" | "false" | "off" => Ok(Self::Disabled),
148            other => Err(ParseError::ValidationError(format!(
149                "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
150            ))),
151        }
152    }
153}
154
155/// A parsed byte size (e.g., "512mb", "1gb", "10kb").
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub struct ByteSize(pub u64);
158
159impl ByteSize {
160    /// Parse a human-readable byte size string.
161    ///
162    /// Supports suffixes: b, kb, mb, gb, tb (case-insensitive).
163    ///
164    /// # Errors
165    ///
166    /// Returns `ParseError` if the string cannot be parsed.
167    pub fn parse(s: &str) -> Result<Self, ParseError> {
168        let s = s.trim().to_lowercase();
169        let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
170            (n, 1024 * 1024 * 1024 * 1024)
171        } else if let Some(n) = s.strip_suffix("gb") {
172            (n, 1024 * 1024 * 1024)
173        } else if let Some(n) = s.strip_suffix("mb") {
174            (n, 1024 * 1024)
175        } else if let Some(n) = s.strip_suffix("kb") {
176            (n, 1024)
177        } else if let Some(n) = s.strip_suffix('b') {
178            (n, 1)
179        } else {
180            // Assume bytes if no suffix
181            (s.as_str(), 1)
182        };
183
184        let num: u64 = num_str
185            .trim()
186            .parse()
187            .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
188
189        Ok(Self(num * multiplier))
190    }
191
192    /// Returns the size in bytes.
193    #[must_use]
194    pub fn as_bytes(&self) -> u64 {
195        self.0
196    }
197}
198
199/// Parse a CREATE LOOKUP TABLE statement.
200///
201/// Syntax:
202/// ```sql
203/// CREATE [OR REPLACE] LOOKUP TABLE [IF NOT EXISTS] <name> (
204///   <col> <type> [NOT NULL],
205///   ...
206///   PRIMARY KEY (<col>, ...)
207/// ) WITH (
208///   'connector' = 'postgres-cdc',
209///   'connection' = 'postgresql://...',
210///   ...
211/// );
212/// ```
213///
214/// # Errors
215///
216/// Returns `ParseError` if the statement syntax is invalid.
217pub fn parse_create_lookup_table(
218    parser: &mut Parser,
219) -> Result<CreateLookupTableStatement, ParseError> {
220    // CREATE already consumed by the router; consume it here for standalone parsing
221    parser
222        .expect_keyword(Keyword::CREATE)
223        .map_err(ParseError::SqlParseError)?;
224
225    let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
226
227    expect_custom_keyword(parser, "LOOKUP")?;
228
229    parser
230        .expect_keyword(Keyword::TABLE)
231        .map_err(ParseError::SqlParseError)?;
232
233    let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
234
235    let name = parser
236        .parse_object_name(false)
237        .map_err(ParseError::SqlParseError)?;
238
239    // Parse column definitions: ( col1 TYPE, col2 TYPE, ..., PRIMARY KEY (col1, ...) )
240    parser
241        .expect_token(&Token::LParen)
242        .map_err(ParseError::SqlParseError)?;
243
244    let mut columns = Vec::new();
245    let mut primary_key = Vec::new();
246
247    loop {
248        // Check for PRIMARY KEY clause
249        if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
250            parser
251                .expect_token(&Token::LParen)
252                .map_err(ParseError::SqlParseError)?;
253
254            loop {
255                let ident = parser
256                    .parse_identifier()
257                    .map_err(ParseError::SqlParseError)?;
258                primary_key.push(ident.value);
259
260                if !parser.consume_token(&Token::Comma) {
261                    break;
262                }
263            }
264
265            parser
266                .expect_token(&Token::RParen)
267                .map_err(ParseError::SqlParseError)?;
268
269            // Consume optional trailing comma after PRIMARY KEY clause
270            let _ = parser.consume_token(&Token::Comma);
271        } else if parser.consume_token(&Token::RParen) {
272            // End of column definitions
273            break;
274        } else {
275            // Parse column definition
276            let col = parser
277                .parse_column_def()
278                .map_err(ParseError::SqlParseError)?;
279            columns.push(col);
280
281            // Comma or closing paren
282            if !parser.consume_token(&Token::Comma) {
283                parser
284                    .expect_token(&Token::RParen)
285                    .map_err(ParseError::SqlParseError)?;
286                break;
287            }
288        }
289    }
290
291    if columns.is_empty() {
292        return Err(ParseError::StreamingError(
293            "LOOKUP TABLE must have at least one column".to_string(),
294        ));
295    }
296
297    // Parse WITH clause
298    let with_options = parse_with_options(parser)?;
299    if with_options.is_empty() {
300        return Err(ParseError::StreamingError(
301            "LOOKUP TABLE requires a WITH clause".to_string(),
302        ));
303    }
304
305    Ok(CreateLookupTableStatement {
306        name,
307        columns,
308        primary_key,
309        with_options,
310        or_replace,
311        if_not_exists,
312    })
313}
314
315/// Parse a DROP LOOKUP TABLE statement.
316///
317/// Syntax: `DROP LOOKUP TABLE [IF EXISTS] <name>`
318///
319/// # Errors
320///
321/// Returns `ParseError` if the statement syntax is invalid.
322pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
323    parser
324        .expect_keyword(Keyword::DROP)
325        .map_err(ParseError::SqlParseError)?;
326
327    expect_custom_keyword(parser, "LOOKUP")?;
328
329    parser
330        .expect_keyword(Keyword::TABLE)
331        .map_err(ParseError::SqlParseError)?;
332
333    let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
334
335    let name = parser
336        .parse_object_name(false)
337        .map_err(ParseError::SqlParseError)?;
338
339    Ok((name, if_exists))
340}
341
342/// Validate and extract typed properties from raw WITH options.
343///
344/// # Errors
345///
346/// Returns `ParseError` if required properties are missing or invalid.
347pub fn validate_properties<S: ::std::hash::BuildHasher>(
348    options: &HashMap<String, String, S>,
349) -> Result<LookupTableProperties, ParseError> {
350    let connector_str = options.get("connector").ok_or_else(|| {
351        ParseError::ValidationError("missing required property: 'connector'".to_string())
352    })?;
353    let connector = ConnectorType::parse(connector_str)?;
354
355    let connection = options.get("connection").cloned();
356
357    let strategy = match options.get("strategy") {
358        Some(s) => LookupStrategy::parse(s)?,
359        None => LookupStrategy::default(),
360    };
361
362    let cache_memory = options
363        .get("cache.memory")
364        .map(|s| ByteSize::parse(s))
365        .transpose()?;
366
367    let cache_disk = options
368        .get("cache.disk")
369        .map(|s| ByteSize::parse(s))
370        .transpose()?;
371
372    let cache_ttl = options
373        .get("cache.ttl")
374        .map(|s| {
375            s.parse::<u64>()
376                .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
377        })
378        .transpose()?;
379
380    let pushdown_mode = match options.get("pushdown") {
381        Some(s) => PushdownMode::parse(s)?,
382        None => PushdownMode::default(),
383    };
384
385    Ok(LookupTableProperties {
386        connector,
387        connection,
388        strategy,
389        cache_memory,
390        cache_disk,
391        cache_ttl,
392        pushdown_mode,
393    })
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::parser::StreamingParser;
400    use crate::parser::StreamingStatement;
401
402    /// Helper to parse SQL and return the first statement.
403    fn parse_one(sql: &str) -> StreamingStatement {
404        let stmts = StreamingParser::parse_sql(sql).unwrap();
405        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
406        stmts.into_iter().next().unwrap()
407    }
408
409    #[test]
410    fn test_parse_basic_create_lookup_table() {
411        let stmt = parse_one(
412            "CREATE LOOKUP TABLE instruments (
413                symbol VARCHAR NOT NULL,
414                name VARCHAR,
415                PRIMARY KEY (symbol)
416            ) WITH (
417                'connector' = 'postgres-cdc',
418                'connection' = 'postgresql://localhost/db'
419            )",
420        );
421        match stmt {
422            StreamingStatement::CreateLookupTable(lt) => {
423                assert_eq!(lt.name.to_string(), "instruments");
424                assert_eq!(lt.columns.len(), 2);
425                assert_eq!(lt.primary_key, vec!["symbol"]);
426                assert!(!lt.or_replace);
427                assert!(!lt.if_not_exists);
428                assert_eq!(
429                    lt.with_options.get("connector"),
430                    Some(&"postgres-cdc".to_string())
431                );
432            }
433            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
434        }
435    }
436
437    #[test]
438    fn test_parse_or_replace_and_if_not_exists() {
439        let stmt = parse_one(
440            "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
441                id INT,
442                PRIMARY KEY (id)
443            ) WITH (
444                'connector' = 'static'
445            )",
446        );
447        match stmt {
448            StreamingStatement::CreateLookupTable(lt) => {
449                assert!(lt.or_replace);
450                assert!(lt.if_not_exists);
451            }
452            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
453        }
454    }
455
456    #[test]
457    fn test_parse_with_primary_key() {
458        let stmt = parse_one(
459            "CREATE LOOKUP TABLE t (
460                a INT,
461                b VARCHAR,
462                c FLOAT,
463                PRIMARY KEY (a, b)
464            ) WITH ('connector' = 'static')",
465        );
466        match stmt {
467            StreamingStatement::CreateLookupTable(lt) => {
468                assert_eq!(lt.primary_key, vec!["a", "b"]);
469                assert_eq!(lt.columns.len(), 3);
470            }
471            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
472        }
473    }
474
475    #[test]
476    fn test_parse_with_clause_properties() {
477        let stmt = parse_one(
478            "CREATE LOOKUP TABLE t (
479                id INT,
480                PRIMARY KEY (id)
481            ) WITH (
482                'connector' = 'postgres-cdc',
483                'connection' = 'postgresql://localhost/db',
484                'strategy' = 'replicated',
485                'cache.memory' = '512mb',
486                'pushdown' = 'auto'
487            )",
488        );
489        match stmt {
490            StreamingStatement::CreateLookupTable(lt) => {
491                let props = validate_properties(&lt.with_options).unwrap();
492                assert_eq!(props.connector, ConnectorType::PostgresCdc);
493                assert_eq!(
494                    props.connection.as_deref(),
495                    Some("postgresql://localhost/db")
496                );
497                assert_eq!(props.strategy, LookupStrategy::Replicated);
498                assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
499                assert_eq!(props.pushdown_mode, PushdownMode::Auto);
500            }
501            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
502        }
503    }
504
505    #[test]
506    fn test_parse_drop_lookup_table() {
507        let stmt = parse_one("DROP LOOKUP TABLE instruments");
508        match stmt {
509            StreamingStatement::DropLookupTable { name, if_exists } => {
510                assert_eq!(name.to_string(), "instruments");
511                assert!(!if_exists);
512            }
513            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
514        }
515    }
516
517    #[test]
518    fn test_parse_drop_lookup_table_if_exists() {
519        let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
520        match stmt {
521            StreamingStatement::DropLookupTable { name, if_exists } => {
522                assert_eq!(name.to_string(), "instruments");
523                assert!(if_exists);
524            }
525            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
526        }
527    }
528
529    #[test]
530    fn test_byte_size_parsing() {
531        assert_eq!(
532            ByteSize::parse("512mb").unwrap(),
533            ByteSize(512 * 1024 * 1024)
534        );
535        assert_eq!(
536            ByteSize::parse("1gb").unwrap(),
537            ByteSize(1024 * 1024 * 1024)
538        );
539        assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
540        assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
541        assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
542        assert_eq!(
543            ByteSize::parse("2tb").unwrap(),
544            ByteSize(2 * 1024 * 1024 * 1024 * 1024)
545        );
546    }
547
548    #[test]
549    fn test_connector_type_parsing() {
550        assert_eq!(
551            ConnectorType::parse("postgres-cdc").unwrap(),
552            ConnectorType::PostgresCdc
553        );
554        assert_eq!(
555            ConnectorType::parse("mysql-cdc").unwrap(),
556            ConnectorType::MysqlCdc
557        );
558        assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
559        assert_eq!(
560            ConnectorType::parse("s3-parquet").unwrap(),
561            ConnectorType::S3Parquet
562        );
563        assert_eq!(
564            ConnectorType::parse("static").unwrap(),
565            ConnectorType::Static
566        );
567        assert_eq!(
568            ConnectorType::parse("custom-src").unwrap(),
569            ConnectorType::Custom("custom-src".to_string())
570        );
571    }
572
573    #[test]
574    fn test_error_missing_columns() {
575        let result =
576            StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
577        assert!(result.is_err());
578    }
579
580    #[test]
581    fn test_error_missing_with_clause() {
582        let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
583        assert!(result.is_err());
584    }
585
586    #[test]
587    fn test_error_invalid_property() {
588        let mut options = HashMap::new();
589        options.insert("connector".to_string(), "postgres-cdc".to_string());
590        options.insert("strategy".to_string(), "invalid-strategy".to_string());
591        let result = validate_properties(&options);
592        assert!(result.is_err());
593    }
594}