← Back to projects

Config-driven data pipeline for union dues

production

79 bargaining units, and every one submits dues in a different shape — wide or long, Excel or CSV, columns renamed, identifiers that don't line up, fees buried in other fields — with no canonical format and no pipeline to reconcile any of it.

The problem

Every unit's dues arrived as its own snowflake file: metadata rows before the header, multi-sheet workbooks, wide vs. long layouts, and SIN / employee-ID / name identifiers each with their own quirks (a trailing .0 on IDs, SINs with spaces or dashes). With no standard process, turning one into rows a CRM would accept meant ~30 minutes of hand-massaging per file — the member-to-record match rate sat around 50%, and the ad-hoc approach kept minting duplicate records.

Why it mattered

Dues are how the union funds itself; a row that doesn't reconcile is a payment that can't post and a member whose standing is wrong on the books. Doing it by hand didn't scale past a handful of units, and every month's new files reset the treadmill. It had to become a pipeline, not a chore.

What I built

I built a registry-driven ingestion pipeline (~40,000 lines, ~1,200 tests). Each unit is a Pydantic-validated YAML config that declares its column mappings, layout, and a chain of transforms drawn from a registry of 19 composable steps; the runner validates the whole chain against the registry before it touches a row. Every file takes the same path: ingest → map to a canonical schema → transform → reconcile each row to a member (three-pass: SIN, then unit/job assignment, then payment history) → reconcile against already-uploaded records so only new or changed dues post → fail-closed validation that splits clean rows from a typed exceptions file. Every run, checkpoint, and transform is written to a SQLite audit trail — row deltas, timing, a config snapshot, and a data hash per checkpoint — so any number the pipeline produces is traceable back to the file it came from. Units email their files each period, so a run kicks off on demand rather than on a clock.

  1. Step 1
    Ingest

    Read each unit's file as-is (CSV or Excel, multi-sheet, metadata rows), driven by a Pydantic-validated YAML config.

  2. Step 2
    Transform

    A registry of 19 composable transforms folds every layout into one canonical schema; the runner validates the declared chain before running it.

  3. Step 3
    Reconcile

    Match each row to a member — three-pass (SIN, then unit/job, then payment history) — and against already-uploaded records so only new or changed dues post.

  4. Step 4
    Validate, split & audit

    Fail-closed checks route clean rows vs. a typed issues file; every run, checkpoint, and transform is logged to a SQLite audit trail.

One registry-driven path for 79 units — every dues file becomes reconciled, audited, upload-ready data, or a typed exception.

Key decisions

A transform registry + per-unit config over 79 bespoke scripts. Transforms are registered by name and composed declaratively in each unit's YAML; onboarding a unit is a validated config, not a one-off script that rots and drifts out of sync.

One canonical schema, validated at the edge. Every unit normalizes into the same shape, and configs are Pydantic-validated (and the transform chain checked against the registry) before execution — so a bad config fails fast instead of corrupting output.

Idempotent, reconcile-first loads. The pipeline compares source against already- uploaded records (with a $0.01 amount tolerance) and only posts new or changed dues, so re-running a unit is always safe and never double-charges a member.

Outcome

~30 min a file, by hand ~3 min on the pipeline
  • ~30 minutes a file, by hand → ~3 minutes on the pipeline — a standard, repeatable path where there was none.
  • All 79 units now funnel through one pipeline into a single canonical schema — no bespoke holdouts; onboarding a new unit is a validated YAML config, not a script.
  • Killed the duplicate records the old ad-hoc process created — every reconciled member updates cleanly in the CRM, and the match rate lifted off its inherited ~50% floor.
  • ~40,000-line module, 19 composable transforms, ~1,200 tests — the canonical schema is the single source of truth for code and fixtures.
  • Every run is auditable — a SQLite trail logs per-transform row deltas, timing, config snapshots, and per-checkpoint data hashes.

What a production version needs

All 79 units now run through the pipeline; the remaining work is hardening, not coverage:

  • Move the master data off local CSVs — reconciliation currently reads flat member/jobs/dues exports; a proper database source would remove a brittle step.
  • Extend the contracts to incoming files — configs are validated, but a schema check on each source file would catch upstream format drift before ingestion, not during.

Pipeline at a glance

Top-down flow: a unit file (CSV, Excel, or multi-sheet) is ingested via a Pydantic-validated config, transformed by a 19-step registry into a canonical schema, reconciled three ways (SIN, then unit/job, then payment history), then fail-closed validation splits upload-ready rows from a typed issues file; the transform, reconcile, and validate steps all write to a SQLite audit trail.
One registry-driven path per unit — every dues file becomes clean, audited, upload-ready data, or a typed exception with a reason.
Stack
PythonpandasPydanticSQLitepytest