Config-driven data pipeline for union dues
production79 bargaining units, and every one submits dues in a different shape — wide or long, Excel or CSV, columns renamed, identifiers that don't line up, fees buried in other fields — with no canonical format and no pipeline to reconcile any of it.
The problem
Every unit's dues arrived as its own snowflake file: metadata rows before the header, multi-sheet workbooks, wide vs. long layouts, and SIN / employee-ID / name identifiers each with their own quirks (a trailing .0 on IDs, SINs with spaces or dashes). With no standard process, turning one into rows a CRM would accept meant ~30 minutes of hand-massaging per file — the member-to-record match rate sat around 50%, and the ad-hoc approach kept minting duplicate records.
Why it mattered
Dues are how the union funds itself; a row that doesn't reconcile is a payment that can't post and a member whose standing is wrong on the books. Doing it by hand didn't scale past a handful of units, and every month's new files reset the treadmill. It had to become a pipeline, not a chore.
What I built
I built a registry-driven ingestion pipeline (~40,000 lines, ~1,200 tests). Each unit is a Pydantic-validated YAML config that declares its column mappings, layout, and a chain of transforms drawn from a registry of 19 composable steps; the runner validates the whole chain against the registry before it touches a row. Every file takes the same path: ingest → map to a canonical schema → transform → reconcile each row to a member (three-pass: SIN, then unit/job assignment, then payment history) → reconcile against already-uploaded records so only new or changed dues post → fail-closed validation that splits clean rows from a typed exceptions file. Every run, checkpoint, and transform is written to a SQLite audit trail — row deltas, timing, a config snapshot, and a data hash per checkpoint — so any number the pipeline produces is traceable back to the file it came from. Units email their files each period, so a run kicks off on demand rather than on a clock.
-
Step 1Ingest
Read each unit's file as-is (CSV or Excel, multi-sheet, metadata rows), driven by a Pydantic-validated YAML config.
-
Step 2Transform
A registry of 19 composable transforms folds every layout into one canonical schema; the runner validates the declared chain before running it.
-
Step 3Reconcile
Match each row to a member — three-pass (SIN, then unit/job, then payment history) — and against already-uploaded records so only new or changed dues post.
-
Step 4Validate, split & audit
Fail-closed checks route clean rows vs. a typed issues file; every run, checkpoint, and transform is logged to a SQLite audit trail.
Key decisions
A transform registry + per-unit config over 79 bespoke scripts. Transforms are registered by name and composed declaratively in each unit's YAML; onboarding a unit is a validated config, not a one-off script that rots and drifts out of sync.
One canonical schema, validated at the edge. Every unit normalizes into the same shape, and configs are Pydantic-validated (and the transform chain checked against the registry) before execution — so a bad config fails fast instead of corrupting output.
Idempotent, reconcile-first loads. The pipeline compares source against already- uploaded records (with a $0.01 amount tolerance) and only posts new or changed dues, so re-running a unit is always safe and never double-charges a member.
Outcome
- ~30 minutes a file, by hand → ~3 minutes on the pipeline — a standard, repeatable path where there was none.
- All 79 units now funnel through one pipeline into a single canonical schema — no bespoke holdouts; onboarding a new unit is a validated YAML config, not a script.
- Killed the duplicate records the old ad-hoc process created — every reconciled member updates cleanly in the CRM, and the match rate lifted off its inherited ~50% floor.
- ~40,000-line module, 19 composable transforms, ~1,200 tests — the canonical schema is the single source of truth for code and fixtures.
- Every run is auditable — a SQLite trail logs per-transform row deltas, timing, config snapshots, and per-checkpoint data hashes.
What a production version needs
All 79 units now run through the pipeline; the remaining work is hardening, not coverage:
- Move the master data off local CSVs — reconciliation currently reads flat member/jobs/dues exports; a proper database source would remove a brittle step.
- Extend the contracts to incoming files — configs are validated, but a schema check on each source file would catch upstream format drift before ingestion, not during.