Automated batch file ingestion for a Landing Zone. Moves files from
per-source inboxes into governed feed=/date= partitions, records a
SHA-256-verified audit trail, enforces retention, and sends email alerts for
anything that needs operator attention.
- Landing Zone Folder Model
- Quick Start
- Configuration Reference
- Delivery Patterns
- Audit Manifest
- Retention
- Notifications
- Process Lock
- Project Structure
- Python Notes — @dataclass
/zone=landing/
source=<source_name>/
inbox/ ← files are delivered here
config.yaml ← source-specific rules
manifest.json ← JSONL audit log for this source
feed=<feed_name>/
date=<YYYY-MM-DD>/
<files> ← governed, read-only after import
First-time setup (Windows)
setup.batThis creates a .venv virtual environment and installs the single dependency
(pyyaml).
Running the process
run.batPoint Windows Task Scheduler at run.bat. The process acquires a lock file
on startup so overlapping scheduled runs are detected and aborted
automatically.
Linux / macOS
python3.11 -m venv .venv
.venv/bin/pip install -r requirements.txt
.venv/bin/python ingest.pylanding_zone_root: "C:/data/zone=landing" # Root of the landing zone
log_dir: "C:/logs/file-manager" # Daily rotating log files
lock_file: "C:/temp/file-manager.lock"
smtp:
host: "smtp.company.com"
port: 25
from_address: "me@company.com"
# username: "" # Uncomment if the relay requires authentication
# password: ""The process discovers every source=* folder under landing_zone_root and
loads its config.yaml automatically — no code changes are needed to add or
remove a source.
delivery_pattern: root_file # root_file | feed_subfolder | feed_date_subfolder
feeds:
- name: orders
glob: "orders_*.csv" # glob required for root_file; ignored otherwise
retention_days: 90
notifications:
recipients:
- ops@company.com
# Optional: override global SMTP for this source only
# smtp_host: smtp.company.com
# from_address: source-ingest@company.comSee examples/ for annotated configs for all three delivery patterns.
Files are delivered directly into inbox/. The feed name is determined by
matching the filename against the glob patterns defined in feeds. The
batch date is extracted from the filename (YYYY-MM-DD or YYYYMMDD); if no
date is found, the file's last-modified timestamp is used.
inbox/
orders_20240115_v1.csv → feed=orders, date=2024-01-15
returns_2024-01-15.csv → feed=returns, date=2024-01-15
Glob matching uses Python's fnmatch module. Patterns are case-sensitive
and evaluated in the order they appear in feeds; the first match wins.
Files are delivered into inbox/<feed_name>/ subdirectories. The folder
name must exactly match a configured feed name. The batch date is still
extracted from the filename or mtime.
inbox/
transactions/
txn_20240115.csv → feed=transactions, date=2024-01-15
settlements/
settle_2024-01-15.csv → feed=settlements, date=2024-01-15
Files are delivered into inbox/<feed_name>/<date>/ subdirectories. Both
the feed name and the batch date come from the path. Date folders must be
in YYYY-MM-DD or YYYYMMDD format; folders with non-conforming names are
left in the inbox and reported as leftovers.
inbox/
prices/
2024-01-15/
prices_20240115_eod.csv → feed=prices, date=2024-01-15
20240116/
prices_20240116_eod.csv → feed=prices, date=2024-01-16
A leftover is any file or folder in the inbox that could not be attributed to a configured feed or date. Leftovers are never moved; they are listed in the post-run notification email for operator review. Common causes:
| Situation | Pattern |
|---|---|
| Filename matches no feed glob | root_file |
Subfolder name not in feeds list |
feed_subfolder, feed_date_subfolder |
| Date folder has unrecognised format | feed_date_subfolder |
| Unexpected file at inbox root | feed_subfolder, feed_date_subfolder |
Every source has an append-only manifest.json file (JSONL — one JSON
object per line) in the source folder. Records are never deleted or
modified; retention deletions are recorded as additional entries.
Import record — written on every successful file move
{
"event": "import",
"timestamp": "2024-01-15T10:30:00.123456+00:00",
"source": "sales",
"feed": "orders",
"batch_date": "2024-01-15",
"filename": "orders_20240115_v1.csv",
"original_path": "/zone=landing/source=sales/inbox/orders_20240115_v1.csv",
"final_path": "/zone=landing/source=sales/feed=orders/date=2024-01-15/orders_20240115_v1.csv",
"file_size": 1234567,
"file_hash": "sha256:a3f5..."
}Error record — written when a file cannot be processed
{
"event": "error",
"timestamp": "...",
"source": "sales",
"feed": "orders",
"batch_date": "2024-01-15",
"filename": "orders_20240115_v1.csv",
"original_path": "...",
"error_type": "duplicate",
"message": "File hash sha256:... already exists in the manifest.",
"file_hash": "sha256:a3f5..."
}Error types: duplicate, existing_target, read_error, mkdir_error, move_error.
Retention deletion record — written when an expired partition is removed
{
"event": "retention_delete",
"timestamp": "...",
"source": "sales",
"feed": "orders",
"batch_date": "2024-01-15",
"deleted_path": "/zone=landing/source=sales/feed=orders/date=2024-01-15",
"files_deleted": 5,
"bytes_deleted": 6234567
}Querying the manifest
Because each line is valid JSON, the manifest is easy to filter with standard tools. Examples using Python:
import json
from pathlib import Path
records = [
json.loads(line)
for line in Path("source=sales/manifest.json").read_text().splitlines()
if line.strip()
]
# Count imported files by feed
from collections import Counter
counts = Counter(r["feed"] for r in records if r["event"] == "import")
# Total bytes imported
total = sum(r["file_size"] for r in records if r["event"] == "import")
# Verify a specific file has not changed
record = next(r for r in records if r["filename"] == "orders_20240115_v1.csv")
# Re-hash the file and compare to record["file_hash"]Retention is configured per source with retention_days. On each run,
after all inbox files have been processed, the process walks every
feed=/date= partition in the source folder and removes any partition whose
batch date is older than retention_days calendar days from today.
- Entire
date=folders are removed including all files inside. - If the
feed=folder becomes empty after cleanup it is also removed. - Each removed partition gets a
retention_deleterecord in the manifest so the deletion is auditable even though the files are gone. - Errors during deletion are logged but do not stop the run.
One email is sent per source at the end of each run, but only when there are leftovers or errors. Clean runs produce no email.
The email contains:
- A run summary (files imported, error count, leftover count)
- A table of processing errors with reason for each file
- A list of leftover paths in the inbox
Recipients and optional SMTP overrides are set in the source config.yaml
under notifications. The global SMTP relay is used by default.
The lock file (lock_file in global config) prevents two instances from
running simultaneously if the scheduler fires while a previous run is still
active.
On startup the process tries to create the lock file exclusively (atomic
O_EXCL create). If the file already exists, the PID inside is checked:
- Process still running → new instance exits immediately with a log message.
- Process no longer running (stale lock from a crash) → lock file is removed and the new instance continues normally.
The lock file is always removed in a finally block so a clean exit never
leaves a stale lock behind.
ingest.py Entry point
requirements.txt Python dependencies (pyyaml only)
setup.bat Windows: creates .venv and installs dependencies
run.bat Windows: activates .venv and runs ingest.py
config/
settings.yaml Global configuration
src/
config_loader.py Load and validate YAML configs; config dataclasses
lock.py Process lock file management
hasher.py SHA-256 file hashing
date_extractor.py Extract batch date from filename or mtime
feed_matcher.py Glob-based feed name matching (pattern 1)
file_ops.py Atomic file move (same-FS and cross-FS)
manifest.py Append-only JSONL manifest; duplicate hash cache
scanner.py Inbox walker for all three delivery patterns
retention.py Expired partition removal
notifier.py SMTP notification email builder and sender
processor.py Per-source orchestration (ties all modules together)
examples/
source_config_pattern1.yaml Annotated config for root_file pattern
source_config_pattern2.yaml Annotated config for feed_subfolder pattern
source_config_pattern3.yaml Annotated config for feed_date_subfolder pattern
Several modules in this project use the @dataclass decorator from Python's
standard library dataclasses module (Python 3.7+).
A dataclass is a regular Python class that has its __init__, __repr__,
and __eq__ methods generated automatically from a set of annotated field
declarations. Instead of writing:
class FeedConfig:
def __init__(self, name: str, glob: str | None = None):
self.name = name
self.glob = glob
def __repr__(self):
return f"FeedConfig(name={self.name!r}, glob={self.glob!r})"you write:
from dataclasses import dataclass
from typing import Optional
@dataclass
class FeedConfig:
name: str
glob: Optional[str] = NoneBoth produce identical runtime behaviour.
| Option | Why not used here |
|---|---|
Plain class with manual __init__ |
More boilerplate; fields are described twice (signature + body) |
collections.namedtuple / typing.NamedTuple |
Immutable; awkward to add default values; no mutation after construction |
typing.TypedDict |
Describes a dictionary shape, not an object; no attribute access or methods |
Third-party attrs / pydantic |
Would add a dependency; stdlib dataclasses are sufficient here |
@dataclass is the idiomatic, dependency-free choice for simple data-holder
classes in modern Python.
dataclassesmodule — Python 3.11- PEP 557 — Data Classes (the original design document)
dataclasses.field()— controlling default values and more