以 Binance 歷史資料為來源的資料平台,主流程已全面採用:
遠端 ZIP -> 記憶體解壓 CSV -> 直接寫 Parquet -> Postgres ingestion_manifest- 不落地
.zip/.csv中間檔案 - 增量依據為 PostgreSQL(不是本地檔案存在與否)
舊流程已封存:
backups/legacy_pipeline_2026-02-10/
flowchart LR
A[data.binance.vision]
B[binance-ingest CLI]
C[data_pipeline/pipeline.py]
D[data_pipeline/binance.py]
E[data_pipeline/transform.py]
F[data_pipeline/manifest.py]
G[Parquet Data Lake]
H[(PostgreSQL ingestion_manifest)]
I[DuckDB / Polars / Pandas]
A --> B
B --> C
C --> D
C --> E
C --> F
E --> G
F --> H
G --> I
sequenceDiagram
autonumber
participant CLI as binance-ingest
participant P as DirectZipToParquetPipeline
participant S as data_pipeline/binance.py
participant T as data_pipeline/transform.py
participant M as data_pipeline/manifest.py
participant L as Parquet Lake
CLI->>P: 建立 IngestConfig 並 run()
P->>S: fetch_symbols_with_availability() + filter_symbols()
P->>M: fetch_done_dates() 規劃增量
loop 每個待處理 partition
P->>S: build_job_key() 取得 ZIP URL
P->>P: 下載 ZIP bytes
P->>T: stream_zip_bytes_to_parquet()
T->>L: 寫入 parquet
P->>M: upsert_rows(done/failed)
end
P-->>CLI: 回傳 planned/done/failed/skipped
完整模組說明:
docs/MODULES.mddocs/CALL_SEQUENCE.mddocs/RUNBOOK.md
D:\Data
├─ pyproject.toml # 套件定義(可 pip install)
├─ configs
│ ├─ fetch_plan.standard.json # 標準化任務計畫(建議主入口)
│ └─ ingest.sample.json # 單任務入口參數檔範例(JSON)
├─ data_pipeline
│ ├─ cli
│ │ ├─ ingest.py # 套件命令:binance-ingest
│ │ ├─ fetch_plan.py # 套件命令:binance-fetch-plan
│ │ └─ db_bootstrap.py # 套件命令:binance-db-bootstrap
│ ├─ db.py # DB 建立與 manifest bootstrap
│ ├─ types.py # 常數、型別、規則
│ ├─ binance.py # 交易對探索、URL 組裝
│ ├─ transform.py # ZIP->DataFrame->Parquet(記憶體)
│ ├─ manifest.py # ingestion_manifest IO
│ └─ pipeline.py # 主流程協調與並行
├─ scripts
│ ├─ binance_zip_to_parquet_manifest.py # 主入口
│ ├─ run_fetch_plan.py # 標準化多任務入口
│ ├─ bootstrap_manifest_db.py # DB bootstrap 相容入口
│ └─ windows_bootstrap_data_stack.ps1 # Windows 初始化腳本
├─ docs
│ ├─ MODULES.md
│ └─ CALL_SEQUENCE.md
├─ data_lake
│ └─ silver
└─ backups
└─ legacy_pipeline_2026-02-10
- Python 3.12+
duckdb,pandas,pyarrow,psycopg[binary],tqdm- PostgreSQL(至少可連
market_data) - 建議執行環境:Windows(PowerShell)
先安裝(替換成你的 repo URL):
pip install "git+https://github.com/<your-org>/<your-repo>.git"安裝後可直接使用:
binance-db-bootstrapbinance-ingestbinance-fetch-plan
若你在 repo 內本地執行,原本 scripts/*.py 仍可用(現在是相容 wrapper)。
Windows(含 PostgreSQL + Python stack + DuckDB):
run_windows_bootstrap_data_stack.bat -InstallPostgresDuckDB 安裝位置原則:
- 入口程式在哪個 Python 環境執行,就在同一個環境安裝
duckdb
binance-ingest --help
binance-fetch-plan --help
binance-db-bootstrap --help使用單一 plan 管理全部任務(固定 source 與 pg_dsn):
binance-fetch-plan \
--plan-file ./configs/fetch_plan.standard.json常用覆蓋:
binance-fetch-plan \
--plan-file ./configs/fetch_plan.standard.json \
--only um_klines_1m,cm_klines_1m \
--date-end 2021-12-31管理規則:
- 任務不可覆蓋
source與pg_dsn(避免每次腳本建立新命名空間) - 每次執行會產生 run report:
runs/*.json
先複製並編輯範例檔:
configs/ingest.sample.jsonconfigs/ingest.option.eohsummary.sample.jsonconfigs/ingest.option.bvolindex.sample.json
用參數檔啟動:
binance-ingest \
--config-file ./configs/ingest.sample.jsonCLI 會覆蓋參數檔同名欄位(例如臨時改日期):
binance-ingest \
--config-file ./configs/ingest.sample.json \
--date-end 2024-01-31 \
--workers 2記憶體控制(預設):
memory_limit_gb=50chunk_rows=200000(CSV 分塊轉 parquet)worker_memory_budget_gb=8(用於自動限制 worker 併發)
tickers 規則:
["*"]或["ALL"]代表全標的(含um)[]或不填時會套用預設過濾(um/spot預設*USDT,cm預設全標的)
全歷史全標的範例參數檔:
configs/ingest.full.um.metrics.all.jsonconfigs/ingest.full.cm.metrics.all.jsonconfigs/ingest.full.um.klines.1m.all.jsonconfigs/ingest.full.cm.klines.1m.all.jsonconfigs/ingest.full.option.eohsummary.all.jsonconfigs/ingest.full.option.bvolindex.all.json- 任務清單:
configs/ingest.full_history_all.tasks.json
Windows 一鍵全量更新(改為跑標準化 plan):
run_full_update_all.bat
fundingRate(um,只跑 monthly):
binance-ingest \
--parquet-root ./data_lake/silver \
--pg-dsn "dbname=market_data user=lobi" \
--asset-class um \
--data-type fundingRate \
--date-start 2024-01-01 \
--date-end 2024-04-15 \
--tickers BTCUSDT \
--workers 2klines(um,monthly 歷史 + 當月 daily):
binance-ingest \
--parquet-root ./data_lake/silver \
--pg-dsn "dbname=market_data user=lobi" \
--asset-class um \
--data-type klines \
--data-frequency 1h \
--date-start 2024-03-01 \
--date-end 2024-04-02 \
--tickers BTCUSDToption(daily):
binance-ingest \
--parquet-root ./data_lake/silver \
--pg-dsn "dbname=market_data user=lobi" \
--asset-class option \
--data-type EOHSummary \
--date-start 2023-06-01 \
--date-end 2023-06-03 \
--tickers BTCUSDT強制重跑(忽略 done 狀態):
binance-ingest \
--parquet-root ./data_lake/silver \
--pg-dsn "dbname=market_data user=lobi" \
--asset-class um \
--data-type fundingRate \
--date-start 2024-01-01 \
--date-end 2024-04-15 \
--tickers BTCUSDT \
--update-existing- 來源真相:
ingestion_manifest中status='done' - 預設:已完成 partition 直接跳過
HTTP 404/410:寫入status='unavailable'(視為終態,下次不再 fetch)--update-existing:強制重抓重寫- 失敗:寫入
status='failed'與錯誤原因(notes) - 規劃時會套用 symbol 可用日期窗(futures/spot 用
onboardDate;option 用 S3 實際檔案 min/max 日期)
預設 symbol 過濾:
cm:預設保留全符號(例如BTCUSD_PERP)- 其他資產類:預設
*USDT - 指定
--tickers時以指定清單為準
fundingRate:monthly onlymetrics,bookDepth,bookTicker,liquidationSnapshot,trades:daily onlyklines,indexPriceKlines,markPriceKlines,premiumIndexKlines:monthly 歷史 + 當月 dailyoption(BVOLIndex,EOHSummary):daily only
注意:
- monthly 僅處理「已完成月份」
- 例如要包含
2024-03的 monthly 檔,--date-end至少要到2024-04-01
若 market_data 還不存在,可用套件命令一次完成:
binance-db-bootstrap \
--pg-dsn "dbname=market_data user=postgres host=localhost port=5432 password=..." \
--create-db若 DB 已存在,只需建立/確認 ingestion_manifest table:
binance-db-bootstrap \
--pg-dsn "dbname=market_data user=postgres host=localhost port=5432 password=..."Windows 建議用 TCP DSN:
dbname=market_data user=postgres host=localhost port=5432 password=...
- 記錄每個 partition 的狀態、筆數、時間範圍、檔案路徑與 hash
- 作為增量判斷與重跑依據
- 不存完整歷史行情(完整歷史在 Parquet)
主鍵:
(source, asset_class, data_type, symbol, date, period, interval)
最新批次結果:
SELECT source, asset_class, data_type, symbol, date, period, interval, status, row_count, updated_at
FROM ingestion_manifest
ORDER BY updated_at DESC
LIMIT 50;查看失敗 partition:
SELECT source, asset_class, data_type, symbol, date, period, interval, notes, updated_at
FROM ingestion_manifest
WHERE status = 'failed'
ORDER BY updated_at DESC
LIMIT 100;查看某商品增量覆蓋情況:
SELECT date, period, interval, status, row_count
FROM ingestion_manifest
WHERE source='binance' AND asset_class='um' AND data_type='klines' AND symbol='BTCUSDT'
ORDER BY date;DuckDB 不管理你的資料生命週期,這個專案目前的資料真相仍是:
- Parquet 檔案(實際行情資料)
- PostgreSQL
ingestion_manifest(增量與狀態)
DuckDB 主要用來直接查 Parquet:
duckdbSELECT COUNT(*)
FROM read_parquet('D:/Data/data_lake/silver/exchange=binance/market=um/dataset=klines/symbol=BTCUSDT/dt=*/*.parquet');Parquet 路徑:
data_lake/silver/exchange=binance/market=<um|spot|cm|option>/dataset=<...>/symbol=<SYMBOL>/dt=YYYY-MM-DD/<file>.parquet
資料欄位:
- 原始欄位 + 補充欄位:
source,asset_class,data_type,symbol,period,interval,partition_date - SQL 友善別名欄位:
exchange,market,dataset,dt - CSV 讀取會自動偵測 header/no-header,避免首列資料遺失
- 以固定排程每日跑同一組命令,靠 manifest 做增量
- 定期檢查
status='failed' - 監控資料量與磁碟空間(只應成長於 Parquet)
- 把
--workers依網路/CPU/I/O 調整,不要盲目拉高
舊版檔案僅保留回溯,不再是正式流程:
backups/legacy_pipeline_2026-02-10/data_dumper.pybackups/legacy_pipeline_2026-02-10/scripts/csv_to_parquet_manifest.py
若要推到 https://github.com/Garylitw/binance-data-dumper.git:
git remote remove origin 2>/dev/null || true
git remote add origin https://github.com/Garylitw/binance-data-dumper.git
git add .
git commit -m "chore: package pipeline and add windows bootstrap"
git branch -M main
git push -u origin main