Skip to main content

collapse_changelog

Function collapse_changelog 

Source
pub fn collapse_changelog(
    batch: &RecordBatch,
    merge_key: &[String],
) -> Result<RecordBatch, ConnectorError>
Expand description

Collapse a concatenated changelog epoch batch into a key-unique batch carrying a _op column of U (upsert) or D (delete), one row per merge_key.

Two input encodings are detected automatically:

  • Z-set (the __weight column is present): identical full rows are consolidated by summing their weights, net-zero rows are dropped, then the survivors are grouped by merge_key. A key with a net-positive (live) row becomes a U carrying that value; a key with only net-negative rows becomes a D. The __weight column is stripped from the output.
  • CDC (no __weight): the last-arriving row per merge_key wins (row order is arrival order). Its op is normalized to D for deletes (_op โˆˆ {D, U-}) and U for everything else. A batch with neither column is treated as all-upsert.

The output reuses the existing key-by-key MERGE (_op โˆˆ {U, D}) unchanged, and contains at most one row per merge key, so the writer never sees a cardinality violation.

ยงErrors

  • ConnectorError::ConfigurationError if merge_key is empty, names a column absent from the batch, or is not unique over the collapsed output (more than one live row for a single key โ€” a misdeclared merge key).
  • ConnectorError::Internal if an Arrow row-conversion or take fails, or the __weight column is not Int64.