diff --git a/.github/workflows/train.yml b/.github/workflows/train.yml new file mode 100644 index 00000000..bb823735 --- /dev/null +++ b/.github/workflows/train.yml @@ -0,0 +1,79 @@ +name: Weekly Model Training Pipeline + +on: + # 매주 화요일 새벽 2시 (KST) 자동 실행 + schedule: + - cron: '0 17 * * 1' + # GitHub Actions 탭에서 수동 실행 가능 + workflow_dispatch: + inputs: + skip_extract: + description: 'DB 추출 스킵 (parquet 이미 있을 때)' + required: false + default: false + type: boolean + skip_upload: + description: 'Kaggle 업로드 스킵 (데이터 변경 없을 때)' + required: false + default: false + type: boolean + +jobs: + train: + runs-on: ubuntu-latest + timeout-minutes: 780 # 최대 13시간 + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + pip install \ + kaggle \ + pandas \ + pyarrow \ + psycopg2-binary \ + paramiko \ + sshtunnel \ + scp + + - name: Setup Kaggle credentials + run: | + mkdir -p ~/.kaggle + jq -n \ + --arg user "${{ secrets.KAGGLE_USERNAME }}" \ + --arg key "${{ secrets.KAGGLE_KEY }}" \ + '{"username": $user, "key": $key}' \ + > ~/.kaggle/kaggle.json + chmod 600 ~/.kaggle/kaggle.json + + - name: Run weekly training pipeline + env: + SSH_HOST: ${{ secrets.SSH_HOST }} + SSH_USER: ${{ secrets.SSH_USER }} + SSH_PRIVATE_KEY: ${{ secrets.SSH_PRIVATE_KEY }} + SSH_PORT: ${{ secrets.SSH_PORT }} + DB_HOST: ${{ secrets.DB_HOST }} + DB_PORT: ${{ secrets.DB_PORT }} + DB_USER: ${{ secrets.DB_USER }} + DB_PASSWORD: ${{ secrets.DB_PASSWORD }} + DB_NAME: ${{ secrets.DB_NAME }} + KAGGLE_USERNAME: ${{ secrets.KAGGLE_USERNAME }} + KAGGLE_KEY: ${{ secrets.KAGGLE_KEY }} + SERVER_WEIGHTS_PATH: ${{ secrets.SERVER_WEIGHTS_PATH }} + run: | + SKIP_EXTRACT="" + SKIP_UPLOAD="" + if [ "${{ github.event.inputs.skip_extract }}" == "true" ]; then + SKIP_EXTRACT="--skip-extract" + fi + if [ "${{ github.event.inputs.skip_upload }}" == "true" ]; then + SKIP_UPLOAD="--skip-upload" + fi + python AI/pipelines/weekly_routine.py $SKIP_EXTRACT $SKIP_UPLOAD diff --git a/.gitignore b/.gitignore index a2ad2264..9bfe2ab9 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,7 @@ AI/data/weights/ AI/.venv/ AI/data/weights/tcn/ +AI/data/kaggle_data/ AI/data/weights/itransformer/* !AI/data/weights/itransformer/.gitkeep @@ -42,6 +43,16 @@ AI/config/trading.local.json AI/tests/out/ AI/docs/ +SISC_*.md +test_*.py +AI/data/kaggle_data/ +*.parquet +*.pt +*.keras +*.pkl +# ===== Kaggle API 키 ===== +kaggle.json +.kaggle/ # ===== Backend ===== backend/src/main/java/org/sejongisc/backend/stock/TestController.java diff --git a/AI/modules/signal/core/artifact_paths.py b/AI/modules/signal/core/artifact_paths.py index 8c97e4b6..e8febd12 100644 --- a/AI/modules/signal/core/artifact_paths.py +++ b/AI/modules/signal/core/artifact_paths.py @@ -100,13 +100,14 @@ def resolve_model_artifacts( metadata_path=str(resolved_model_dir / "metadata.json"), ) + # [수정] PatchTST: 실제 저장 파일명으로 통일 + scaler_path 추가 if normalized_model == "patchtst": - resolved_model_dir = _resolve_absolute(model_dir) if model_dir else (root_dir / "patchtst") + resolved_model_dir = _resolve_absolute(model_dir) if model_dir else (root_dir / "PatchTST") return ModelArtifactPaths( root_dir=str(root_dir), model_dir=str(resolved_model_dir), - model_path=str(resolved_model_dir / "PatchTST_best.pt"), - scaler_path=None, + model_path=str(resolved_model_dir / "patchtst_model.pt"), # PatchTST_best.pt → patchtst_model.pt + scaler_path=str(resolved_model_dir / "patchtst_scaler.pkl"), # 추가 metadata_path=None, ) diff --git a/AI/modules/signal/models/PatchTST/train.py b/AI/modules/signal/models/PatchTST/train.py index a02d868f..e560a1cb 100644 --- a/AI/modules/signal/models/PatchTST/train.py +++ b/AI/modules/signal/models/PatchTST/train.py @@ -22,25 +22,8 @@ import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset -<<<<<<< HEAD from sklearn.preprocessing import MinMaxScaler from tqdm import tqdm -======= -from .architecture import PatchTST_Model -from AI.config import load_trading_config -from AI.modules.signal.core.artifact_paths import resolve_model_artifacts - - -def _default_model_save_path() -> str: - try: - trading_config = load_trading_config() - return resolve_model_artifacts( - model_name="patchtst", - config_weights_dir=trading_config.model.weights_dir, - ).model_path - except Exception: - return resolve_model_artifacts(model_name="patchtst").model_path ->>>>>>> 969fb59bb447edc8ffb66545ba0fdc1a4d190e79 # 경로 설정 (다른 import보다 먼저) current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -59,7 +42,6 @@ def _default_model_save_path() -> str: # CONFIG # ───────────────────────────────────────────────────────────────────────────── CONFIG = { -<<<<<<< HEAD 'start_date' : '2015-01-01', 'end_date' : '2023-12-31', # 미래 데이터 차단 (Look-ahead bias 방지) 'seq_len' : 120, @@ -88,18 +70,6 @@ def _default_model_save_path() -> str: 'weights_dir' : 'AI/data/weights/PatchTST', 'model_name' : 'patchtst_model.pt', 'scaler_name' : 'patchtst_scaler.pkl', -======= - 'seq_len': 120, - 'input_features': 7, - 'batch_size': 32, - 'learning_rate': 0.0001, - 'epochs': 100, - 'patience': 10, - 'model_save_path': _default_model_save_path() -<<<<<<< HEAD ->>>>>>> e47fa9e ([AI] [FEAT] 볼륨 마운트를 통한 가중치 저장) -======= ->>>>>>> 969fb59bb447edc8ffb66545ba0fdc1a4d190e79 } # ───────────────────────────────────────────────────────────────────────────── @@ -244,9 +214,15 @@ def train(): # ── [수정] Train/Val 분리 먼저 → 그 다음 스케일링 ────────────────────── # 기존: 전체 스케일링 → 분리 (데이터 누수 발생) # 수정: 티커 기준으로 분리 → train만 fit → val은 transform - tickers = full_df['ticker'].unique() - n_val = max(1, int(len(tickers) * 0.2)) - val_tickers = tickers[-n_val:] # 마지막 20% 티커를 val로 (시간 순서 보존) + tickers = full_df['ticker'].unique() + + # 최소 2개 이상 있어야 train/val 분리 가능 + if len(tickers) < 2: + raise ValueError(f"학습에 필요한 ticker가 부족합니다. (현재: {len(tickers)}개, 최소 2개 필요)") + + # val 비율 20%, 단 train이 최소 1개는 남도록 상한 보정 + n_val = max(1, min(int(len(tickers) * 0.2), len(tickers) - 1)) + val_tickers = tickers[-n_val:] # 마지막 20% 티커를 val로 (시간 순서 보존) train_tickers = tickers[:-n_val] train_df = full_df[full_df['ticker'].isin(train_tickers)].copy() @@ -337,7 +313,7 @@ def train(): if avg_val < best_val_loss: best_val_loss = avg_val patience_counter = 0 - # [수정] config + state_dict 같이 저장 (load 시 구조 재현 가능) + # config + state_dict 같이 저장 (load 시 구조 재현 가능) torch.save({ 'config' : CONFIG, 'state_dict': model.state_dict() @@ -350,7 +326,6 @@ def train(): print(f"\n>> Early Stopping at epoch {epoch+1}") break -<<<<<<< HEAD # 7. 스케일러 저장 with open(scaler_path, 'wb') as f: pickle.dump(scaler, f) @@ -362,25 +337,3 @@ def train(): if __name__ == '__main__': train() -======= -def run_training(X_train, y_train, X_val, y_val): - """ - 외부에서 호출 가능한 학습 진입점 - X: [Samples, Seq_Len, Features] numpy array - y: [Samples] numpy array (0 or 1) - """ - device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - - # Tensor 변환 - train_data = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)) - val_data = TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)) - - train_loader = DataLoader(train_data, batch_size=CONFIG['batch_size'], shuffle=True) - val_loader = DataLoader(val_data, batch_size=CONFIG['batch_size'], shuffle=False) - - trained_model = train_model(train_loader, val_loader, device) - return trained_model -<<<<<<< HEAD ->>>>>>> e47fa9e ([AI] [FEAT] 볼륨 마운트를 통한 가중치 저장) -======= ->>>>>>> 969fb59bb447edc8ffb66545ba0fdc1a4d190e79 diff --git a/AI/modules/signal/models/PatchTST/train_kaggle.py b/AI/modules/signal/models/PatchTST/train_kaggle.py new file mode 100644 index 00000000..d218d9ad --- /dev/null +++ b/AI/modules/signal/models/PatchTST/train_kaggle.py @@ -0,0 +1,327 @@ +# AI/modules/signal/models/PatchTST/train_kaggle.py +""" +PatchTST 학습 스크립트 - Kaggle/GitHub Actions 버전 +----------------------------------------------- +[train.py와의 차이점] +- DB 연결 없음 (SISCDataLoader 사용 안 함) +- parquet 파일에서 직접 로드 +- GitHub Actions 자동화 파이프라인에서 사용 + +[사용 환경] +- Kaggle 노트북 (GPU 학습) +- GitHub Actions (자동화) + +[train.py는 그대로 유지] +- 로컬/서버에서 DB 연결로 학습할 때 사용 +- 팀원 파트 영향 없음 +----------------------------------------------- +""" +import os +import sys +import pickle +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader, TensorDataset +from sklearn.preprocessing import MinMaxScaler +from tqdm import tqdm + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# Kaggle: /kaggle/input/datasets/jihyeongkimm/sisc-ai-trading-dataset +# GitHub Actions: ./kaggle_data +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.signal.models.PatchTST.architecture import PatchTST_Model +from AI.modules.features.legacy.technical_features import ( + add_technical_indicators, + add_multi_timeframe_features +) + +# ───────────────────────────────────────────────────────────────────────────── +# CONFIG +# ───────────────────────────────────────────────────────────────────────────── +CONFIG = { + # parquet 파일 경로 + # Kaggle 환경이면 /kaggle/input/datasets/... 로 바꿔서 쓰면 됨 + 'parquet_dir' : os.environ.get( + 'PARQUET_DIR', + '/kaggle/input/datasets/jihyeongkimm/sisc-ai-trading-dataset' + ), + + 'start_date' : '2015-01-01', + 'end_date' : '2023-12-31', + 'seq_len' : 120, + 'horizons' : [1, 3, 5, 7], + + # 모델 구조 + 'patch_len' : 16, + 'stride' : 8, + 'd_model' : 128, + 'n_heads' : 4, + 'e_layers' : 3, + 'd_ff' : 256, + 'dropout' : 0.1, + + # 학습 + 'batch_size' : 256, + 'learning_rate' : 0.0001, + 'epochs' : 50, + 'patience' : 10, + + # 저장 경로 + # Kaggle: /kaggle/working/ + # GitHub Actions: AI/data/weights/PatchTST/ + 'weights_dir' : os.environ.get('WEIGHTS_DIR', '/kaggle/working'), + 'model_name' : 'patchtst_model.pt', + 'scaler_name' : 'patchtst_scaler.pkl', +} + +# ───────────────────────────────────────────────────────────────────────────── +# 피처 정의 (train.py, wrapper.py와 동일한 순서 유지) +# ───────────────────────────────────────────────────────────────────────────── +FEATURE_COLUMNS = [ + # 일봉 (11개) + 'log_return', + 'ma5_ratio', 'ma20_ratio', 'ma60_ratio', + 'rsi', 'bb_position', 'macd_ratio', + 'open_ratio', 'high_ratio', 'low_ratio', + 'vol_change', + # 주봉 (4개) + 'week_ma20_ratio', 'week_rsi', 'week_bb_pos', 'week_vol_change', + # 월봉 (2개) + 'month_ma12_ratio', 'month_rsi', +] + +HORIZONS = [1, 3, 5, 7] + + +# ───────────────────────────────────────────────────────────────────────────── +# 시퀀스 생성 (train.py와 동일) +# ───────────────────────────────────────────────────────────────────────────── +def build_sequences(full_df: pd.DataFrame, scaler: MinMaxScaler, fit_scaler: bool = True): + seq_len = CONFIG['seq_len'] + horizons = CONFIG['horizons'] + max_horizon = max(horizons) + + available = [c for c in FEATURE_COLUMNS if c in full_df.columns] + missing = set(FEATURE_COLUMNS) - set(available) + if missing: + print(f"[경고] 누락된 피처: {missing}") + + full_df = full_df.dropna(subset=available).copy() + + if fit_scaler: + full_df[available] = scaler.fit_transform(full_df[available]) + else: + full_df[available] = scaler.transform(full_df[available]) + + X_list, y_list = [], [] + + for ticker in tqdm(full_df['ticker'].unique(), desc="시퀀스 생성"): + sub = full_df[full_df['ticker'] == ticker].sort_values('date') + + if len(sub) < seq_len + max_horizon: + continue + + feat_vals = sub[available].values + raw_closes = sub['close'].values + + num_samples = len(sub) - seq_len - max_horizon + 1 + if num_samples <= 0: + continue + + for i in range(num_samples): + window = feat_vals[i : i + seq_len] + curr_price = raw_closes[i + seq_len - 1] + labels = [] + for h in horizons: + future_price = raw_closes[i + seq_len + h - 1] + labels.append(1 if future_price > curr_price else 0) + + X_list.append(window) + y_list.append(labels) + + X = np.array(X_list, dtype=np.float32) + y = np.array(y_list, dtype=np.float32) + print(f">> 시퀀스 완료: X={X.shape}, y={y.shape}") + return X, y + + +# ───────────────────────────────────────────────────────────────────────────── +# [핵심 변경] 데이터 로드 +# train.py: SISCDataLoader → DB 연결 필요 +# train_kaggle.py: parquet 직접 읽기 → DB 연결 불필요 +# ───────────────────────────────────────────────────────────────────────────── +def load_and_preprocess(): + parquet_path = os.path.join(CONFIG['parquet_dir'], 'price_data.parquet') + print(f">> parquet 로드 중: {parquet_path}") + + raw_df = pd.read_parquet(parquet_path) + raw_df['date'] = pd.to_datetime(raw_df['date']) + + # 날짜 필터링 + raw_df = raw_df[ + (raw_df['date'] >= CONFIG['start_date']) & + (raw_df['date'] <= CONFIG['end_date']) + ].copy() + + print(f">> 로드 완료: {len(raw_df):,}행, {raw_df['ticker'].nunique()}개 종목") + + # 피처 계산 + print(">> 피처 계산 중 (일봉 + 주봉/월봉)...") + processed = [] + fail_count = 0 + fail_limit = 20 + + for ticker in tqdm(raw_df['ticker'].unique(), desc="피처 계산"): + df_t = raw_df[raw_df['ticker'] == ticker].copy() + try: + df_t = add_technical_indicators(df_t) + df_t = add_multi_timeframe_features(df_t) + processed.append(df_t) + except Exception as e: + fail_count += 1 + print(f"\n[경고] {ticker} 피처 계산 실패 ({fail_count}/{fail_limit}): {e}") + if fail_count >= fail_limit: + raise RuntimeError(f"피처 계산 실패가 {fail_limit}개를 초과했습니다.") + + if not processed: + raise ValueError("전처리된 데이터가 없습니다.") + + full_df = pd.concat(processed).reset_index(drop=True) + print(f">> 피처 계산 완료: {len(full_df):,}행 (실패: {fail_count}개)") + return full_df + + +# ───────────────────────────────────────────────────────────────────────────── +# 학습 메인 (train.py와 동일) +# ───────────────────────────────────────────────────────────────────────────── +def train(): + print("=" * 50) + print(" PatchTST 학습 시작 (Kaggle/Actions 버전)") + print(f" 데이터: {CONFIG['parquet_dir']}") + print(f" 피처: {len(FEATURE_COLUMNS)}개") + print(f" horizon: {CONFIG['horizons']}일") + print("=" * 50) + + # 1. 데이터 로드 + full_df = load_and_preprocess() + + # 2. Train/Val 분리 + tickers = full_df['ticker'].unique() + n_val = max(1, int(len(tickers) * 0.2)) + val_tickers = tickers[-n_val:] + train_tickers = tickers[:-n_val] + + train_df = full_df[full_df['ticker'].isin(train_tickers)].copy() + val_df = full_df[full_df['ticker'].isin(val_tickers)].copy() + print(f"\n>> Train 티커: {len(train_tickers)}개, Val 티커: {len(val_tickers)}개") + + # 3. 시퀀스 생성 + scaler = MinMaxScaler() + X_train, y_train = build_sequences(train_df, scaler, fit_scaler=True) + X_val, y_val = build_sequences(val_df, scaler, fit_scaler=False) + print(f"\n>> Train: {X_train.shape}, Val: {X_val.shape}") + + # 4. DataLoader + train_loader = DataLoader( + TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)), + batch_size=CONFIG['batch_size'], shuffle=True + ) + val_loader = DataLoader( + TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)), + batch_size=CONFIG['batch_size'], shuffle=False + ) + + # 5. 모델 + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + print(f">> Device: {device}\n") + + model = PatchTST_Model( + seq_len = CONFIG['seq_len'], + enc_in = len(FEATURE_COLUMNS), + patch_len = CONFIG['patch_len'], + stride = CONFIG['stride'], + d_model = CONFIG['d_model'], + n_heads = CONFIG['n_heads'], + e_layers = CONFIG['e_layers'], + d_ff = CONFIG['d_ff'], + dropout = CONFIG['dropout'], + n_outputs = len(CONFIG['horizons']) + ).to(device) + + # 6. 손실함수 & 옵티마이저 + criterion = nn.BCEWithLogitsLoss() + optimizer = optim.AdamW(model.parameters(), lr=CONFIG['learning_rate']) + + # 7. 저장 경로 + save_dir = CONFIG['weights_dir'] + os.makedirs(save_dir, exist_ok=True) + model_path = os.path.join(save_dir, CONFIG['model_name']) + scaler_path = os.path.join(save_dir, CONFIG['scaler_name']) + + best_val_loss = float('inf') + patience_counter = 0 + + print(f">> 학습 시작 (epochs={CONFIG['epochs']}, patience={CONFIG['patience']})\n") + + for epoch in range(CONFIG['epochs']): + + # Training + model.train() + train_loss = 0.0 + for X_b, y_b in train_loader: + X_b, y_b = X_b.to(device), y_b.to(device) + optimizer.zero_grad() + loss = criterion(model(X_b), y_b) + loss.backward() + optimizer.step() + train_loss += loss.item() + avg_train = train_loss / len(train_loader) + + # Validation + model.eval() + val_loss = 0.0 + with torch.no_grad(): + for X_v, y_v in val_loader: + X_v, y_v = X_v.to(device), y_v.to(device) + val_loss += criterion(model(X_v), y_v).item() + avg_val = val_loss / len(val_loader) + + print(f"Epoch [{epoch+1:3d}/{CONFIG['epochs']}] " + f"Train: {avg_train:.4f} | Val: {avg_val:.4f}", end="") + + # Early Stopping & 저장 + if avg_val < best_val_loss: + best_val_loss = avg_val + patience_counter = 0 + torch.save({ + 'config' : CONFIG, + 'state_dict': model.state_dict() + }, model_path) + print(" ✓ saved") + else: + patience_counter += 1 + print(f" ({patience_counter}/{CONFIG['patience']})") + if patience_counter >= CONFIG['patience']: + print(f"\n>> Early Stopping at epoch {epoch+1}") + break + + # 8. 스케일러 저장 + with open(scaler_path, 'wb') as f: + pickle.dump(scaler, f) + + print(f"\n>> 완료") + print(f" 모델 : {model_path}") + print(f" 스케일러: {scaler_path}") + + +if __name__ == '__main__': + train() diff --git a/AI/modules/signal/models/PatchTST/wrapper.py b/AI/modules/signal/models/PatchTST/wrapper.py index a589f3a7..da249885 100644 --- a/AI/modules/signal/models/PatchTST/wrapper.py +++ b/AI/modules/signal/models/PatchTST/wrapper.py @@ -1,4 +1,3 @@ -<<<<<<< HEAD # AI/modules/signal/models/PatchTST/wrapper.py """ PatchTST Wrapper @@ -24,21 +23,10 @@ project_root = os.path.abspath(os.path.join(current_dir, "../../../../..")) if project_root not in sys.path: sys.path.append(project_root) -======= -import os -from typing import Any, Dict, Optional - -import numpy as np -import pandas as pd -import torch -import torch.nn as nn -import torch.optim as optim ->>>>>>> 969fb59bb447edc8ffb66545ba0fdc1a4d190e79 from AI.modules.signal.core.base_model import BaseSignalModel from AI.modules.signal.models.PatchTST.architecture import PatchTST_Model -<<<<<<< HEAD # ───────────────────────────────────────────────────────────────────────────── # [수정] train.py와 완전히 동일한 순서로 정의 # 순서가 다르면 스케일러 통계가 잘못 적용되어 예측값이 틀림 @@ -78,22 +66,11 @@ def __init__(self, config: Dict[str, Any]): self.model = None self.scaler = None self.seq_len = config.get('seq_len', 120) -======= - -class PatchTSTWrapper(BaseSignalModel): - def __init__(self, config: Dict[str, Any]): - super().__init__(config) - self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - self.seq_len = int(config.get("seq_len", 120)) - self.features = list(config.get("feature_columns") or config.get("features") or []) - self.model = None ->>>>>>> 969fb59bb447edc8ffb66545ba0fdc1a4d190e79 # ── 1. build() ─────────────────────────────────────────────────────────── def build(self, input_shape: tuple): seq_len, num_features = input_shape self.model = PatchTST_Model( -<<<<<<< HEAD seq_len = seq_len, enc_in = num_features, patch_len = self.config.get('patch_len', 16), @@ -188,7 +165,13 @@ def load(self, filepath: str, scaler_path: str = None): self.build((self.seq_len, len(FEATURE_COLUMNS))) # 가중치 로드 - self.model.load_state_dict(checkpoint['state_dict']) + state_dict = checkpoint['state_dict'] + + # DataParallel로 학습된 경우 module. 접두사 제거 + if all(k.startswith('module.') for k in state_dict.keys()): + state_dict = {k[len('module.'):]: v for k, v in state_dict.items()} + + self.model.load_state_dict(state_dict) self.model.eval() print(f"[PatchTST] 모델 로드: {filepath}") @@ -204,108 +187,7 @@ def load(self, filepath: str, scaler_path: str = None): def _default_output(self) -> Dict[str, float]: """오류 시 중립값 반환 (파이프라인 중단 방지)""" return {f"patchtst_{h}d": 0.5 for h in HORIZONS} -======= - seq_len=seq_len, - enc_in=num_features, - patch_len=self.config.get("patch_len", 16), - stride=self.config.get("stride", 8), - d_model=self.config.get("d_model", 128), - dropout=self.config.get("dropout", 0.1), - ).to(self.device) - - def train( - self, - X_train: np.ndarray, - y_train: np.ndarray, - X_val: Optional[np.ndarray] = None, - y_val: Optional[np.ndarray] = None, - **kwargs, - ): - if self.model is None: - self.build(X_train.shape[1:]) - - criterion = nn.BCEWithLogitsLoss() - optimizer = optim.AdamW(self.model.parameters(), lr=self.config.get("lr", 1e-4)) - epochs = int(self.config.get("epochs", 50)) - batch_size = int(self.config.get("batch_size", 32)) - - X_tensor = torch.from_numpy(X_train).float().to(self.device) - y_tensor = torch.from_numpy(y_train).float().view(-1, 1).to(self.device) - - self.model.train() - for epoch in range(epochs): - permutation = torch.randperm(X_tensor.size(0), device=self.device) - epoch_loss = 0.0 - - for i in range(0, X_tensor.size(0), batch_size): - indices = permutation[i : i + batch_size] - batch_x, batch_y = X_tensor[indices], y_tensor[indices] - - optimizer.zero_grad() - output = self.model(batch_x) - loss = criterion(output, batch_y) - loss.backward() - optimizer.step() - epoch_loss += loss.item() - - if (epoch + 1) % 10 == 0: - print(f"Epoch [{epoch + 1}/{epochs}] Loss: {epoch_loss:.4f}") - - def predict(self, X_input: np.ndarray) -> np.ndarray: - if self.model is None: - raise ValueError("Model not initialized. Call build() or load() first.") - - array_x = np.asarray(X_input, dtype=np.float32) - if array_x.ndim == 2: - array_x = np.expand_dims(array_x, axis=0) - - self.model.eval() - with torch.no_grad(): - X_tensor = torch.from_numpy(array_x).float().to(self.device) - logits = self.model(X_tensor) - probs = torch.sigmoid(logits).cpu().numpy() - - return probs - - def get_signals(self, df: pd.DataFrame, ticker_id: int = 0, sector_id: int = 0) -> Dict[str, float]: - if df is None or df.empty: - raise ValueError("Input dataframe is empty.") - - if not self.features: - numeric_columns = [col for col in df.columns if pd.api.types.is_numeric_dtype(df[col])] - self.features = numeric_columns[: int(self.config.get("enc_in", 7))] - - if not self.features: - raise ValueError("No features configured for PatchTST inference.") - - missing_features = [col for col in self.features if col not in df.columns] - if missing_features: - raise ValueError("Missing required PatchTST features: " + ", ".join(missing_features)) - - if len(df) < self.seq_len: - raise ValueError( - f"Insufficient rows for PatchTST inference: required {self.seq_len}, got {len(df)}" - ) - - window = df[self.features].iloc[-self.seq_len :].to_numpy(dtype=np.float32) - probs = self.predict(np.expand_dims(window, axis=0)).reshape(-1) - score = float(probs[0]) if probs.size else 0.5 - return {"patchtst_1d": score} - - def save(self, filepath: str): - if self.model is None: - raise ValueError("No PatchTST model to save.") - save_dir = os.path.dirname(filepath) - if save_dir: - os.makedirs(save_dir, exist_ok=True) - torch.save(self.model.state_dict(), filepath) - print(f"PatchTST saved to {filepath}") - - def load(self, filepath: str): - if self.model is None: - self.build((self.config.get("seq_len", 120), self.config.get("enc_in", 7))) - - self.model.load_state_dict(torch.load(filepath, map_location=self.device)) - self.model.eval() - print(f"PatchTST loaded from {filepath}") ->>>>>>> 969fb59bb447edc8ffb66545ba0fdc1a4d190e79 + + def get_signals(self, df, **kwargs): + """BaseSignalModel 추상 메서드 구현 - predict()로 위임""" + return self.predict(df) \ No newline at end of file diff --git a/AI/modules/signal/models/TCN/train_kaggle.py b/AI/modules/signal/models/TCN/train_kaggle.py new file mode 100644 index 00000000..258fbb8b --- /dev/null +++ b/AI/modules/signal/models/TCN/train_kaggle.py @@ -0,0 +1,328 @@ +# AI/modules/signal/models/TCN/train_kaggle.py +""" +TCN 학습 스크립트 - Kaggle/GitHub Actions 버전 +----------------------------------------------- +[train.py와의 차이점] +- DB 연결 없음 (get_standard_training_data 사용 안 함) +- parquet 파일에서 직접 로드 후 피처 계산 +- GitHub Actions 자동화 파이프라인에서 사용 + +[train.py는 그대로 유지] +- 로컬/서버에서 DB 연결로 학습할 때 사용 +----------------------------------------------- +""" +import argparse +import copy +import json +import os +import warnings +warnings.filterwarnings('ignore') +import pickle +import sys +from typing import Dict, List, Tuple + +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +from sklearn.preprocessing import StandardScaler +from torch.utils.data import DataLoader as TorchDataLoader +from torch.utils.data import TensorDataset +from tqdm import tqdm + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.signal.models.TCN.architecture import TCNClassifier +from AI.modules.features.legacy.technical_features import ( + add_technical_indicators, + add_multi_timeframe_features +) + +# ───────────────────────────────────────────────────────────────────────────── +# 피처 정의 (train.py와 동일) +# ───────────────────────────────────────────────────────────────────────────── +FEATURE_COLUMNS = [ + "log_return", + "open_ratio", + "high_ratio", + "low_ratio", + "vol_change", + "ma5_ratio", + "ma20_ratio", + "ma60_ratio", + "rsi", + "macd_ratio", + "bb_position", +] + +HORIZONS = [1, 3, 5, 7] + + +# ───────────────────────────────────────────────────────────────────────────── +# 시퀀스 생성 (train.py와 동일) +# ───────────────────────────────────────────────────────────────────────────── +def build_sequences( + df: pd.DataFrame, + seq_len: int, + feature_cols: List[str], + horizons: List[int], +) -> Tuple[np.ndarray, np.ndarray]: + features = [] + labels = [] + max_horizon = max(horizons) + + for _, sub_df in df.groupby("ticker"): + sub_df = sub_df.sort_values("date").copy() + sub_df = sub_df.dropna(subset=["close"]) + + if len(sub_df) < seq_len + max_horizon: + continue + + feature_values = sub_df[feature_cols].to_numpy(dtype=np.float32) + closes = sub_df["close"].to_numpy(dtype=np.float32) + + for start in range(len(sub_df) - seq_len - max_horizon + 1): + end = start + seq_len + current_close = closes[end - 1] + target = [ + 1.0 if closes[end + h - 1] > current_close else 0.0 + for h in horizons + ] + features.append(feature_values[start:end]) + labels.append(target) + + if not features: + return ( + np.empty((0, seq_len, len(feature_cols)), dtype=np.float32), + np.empty((0, len(horizons)), dtype=np.float32) + ) + + return np.array(features, dtype=np.float32), np.array(labels, dtype=np.float32) + + +# ───────────────────────────────────────────────────────────────────────────── +# [핵심 변경] 데이터 로드 +# train.py: get_standard_training_data() → DB 연결 필요 +# train_kaggle.py: pd.read_parquet() → DB 연결 불필요 +# ───────────────────────────────────────────────────────────────────────────── +def load_and_preprocess(parquet_dir: str, start_date: str, end_date: str) -> pd.DataFrame: + parquet_path = os.path.join(parquet_dir, 'price_data.parquet') + print(f">> parquet 로드 중: {parquet_path}") + + raw_df = pd.read_parquet(parquet_path) + raw_df['date'] = pd.to_datetime(raw_df['date']) + + raw_df = raw_df[ + (raw_df['date'] >= start_date) & + (raw_df['date'] <= end_date) + ].copy() + + print(f">> 로드 완료: {len(raw_df):,}행, {raw_df['ticker'].nunique()}개 종목") + + # 피처 계산 + print(">> 피처 계산 중...") + processed = [] + fail_count = 0 + + for ticker in tqdm(raw_df['ticker'].unique(), desc="피처 계산"): + df_t = raw_df[raw_df['ticker'] == ticker].copy() + try: + df_t = add_technical_indicators(df_t) + processed.append(df_t) + except Exception as e: + fail_count += 1 + print(f"\n[경고] {ticker} 피처 계산 실패 ({fail_count}/20): {e}") + if fail_count >= 20: + raise RuntimeError("피처 계산 실패가 20개를 초과했습니다.") + + if not processed: + raise ValueError("전처리된 데이터가 없습니다. 날짜 범위나 parquet 파일을 확인하세요.") + + full_df = pd.concat(processed).reset_index(drop=True) + print(f">> 피처 계산 완료: {len(full_df):,}행 (실패: {fail_count}개)") + return full_df + + +def train_model(args: argparse.Namespace): + print("=" * 50) + print(" TCN 학습 시작 (Kaggle/Actions 버전)") + print("=" * 50) + + # 1. 데이터 로드 + raw_df = load_and_preprocess(args.parquet_dir, args.start_date, args.end_date) + + # 2. Train/Val 날짜 기준 분리 + dates = raw_df['date'].sort_values().unique() + split_date_idx = int(len(dates) * 0.9) + split_date = dates[split_date_idx] + + # split_date 미만을 train으로 → val이 비어지는 경계 케이스 방지 + train_df = raw_df[raw_df['date'] < split_date].copy() + val_df = raw_df[raw_df['date'] >= split_date].copy() + print(f">> Train: ~{split_date} 미만, Val: {split_date}~") + + # 3. 스케일링 (train만 fit) + scaler = StandardScaler() + scaler.fit(train_df[FEATURE_COLUMNS]) + train_df[FEATURE_COLUMNS] = scaler.transform(train_df[FEATURE_COLUMNS]) + val_df[FEATURE_COLUMNS] = scaler.transform(val_df[FEATURE_COLUMNS]) + + # 4. 시퀀스 생성 + X_train, y_train = build_sequences(train_df, args.seq_len, FEATURE_COLUMNS, HORIZONS) + X_val, y_val = build_sequences(val_df, args.seq_len, FEATURE_COLUMNS, HORIZONS) + + if len(X_train) == 0 or len(X_val) == 0: + raise ValueError("시퀀스 생성 결과가 비어있습니다.") + + print(f">> Train: {X_train.shape}, Val: {X_val.shape}") + + train_loader = TorchDataLoader( + TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train)), + batch_size=args.batch_size, shuffle=True + ) + val_loader = TorchDataLoader( + TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val)), + batch_size=args.batch_size, shuffle=False + ) + + # 5. 모델 + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + print(f">> Device: {device}") + + model = TCNClassifier( + input_size = len(FEATURE_COLUMNS), + output_size = len(HORIZONS), + num_channels = args.channels, + kernel_size = args.kernel_size, + dropout = args.dropout, + ).to(device) + + criterion = nn.BCEWithLogitsLoss() + optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate,weight_decay=1e-4)#L2규제 추가 + + best_val_loss = float("inf") + best_state = None + + patience = 7 # 성능 개선이 없어도 기다려줄 에포크 횟수 + counter = 0 + # 6. 학습 루프 + print(f">> 학습 시작 (epochs={args.epochs})\n") + for epoch in range(args.epochs): + model.train() + train_loss = 0.0 + for batch_x, batch_y in train_loader: + batch_x, batch_y = batch_x.to(device), batch_y.to(device) + optimizer.zero_grad() + loss = criterion(model(batch_x), batch_y) + loss.backward() + optimizer.step() + train_loss += loss.item() * batch_x.size(0) + + model.eval() + val_loss = 0.0 + with torch.no_grad(): + for batch_x, batch_y in val_loader: + batch_x, batch_y = batch_x.to(device), batch_y.to(device) + val_loss += criterion(model(batch_x), batch_y).item() * batch_x.size(0) + + train_loss /= len(X_train) + val_loss /= len(X_val) + print(f"Epoch [{epoch+1:3d}/{args.epochs}] Train: {train_loss:.4f} | Val: {val_loss:.4f}", end="") + + if val_loss < best_val_loss: + best_val_loss = val_loss + best_state = copy.deepcopy(model.state_dict()) + counter = 0 # 성능이 개선되었으므로 카운터 초기화 + print(" ✓ saved") + else: + counter += 1 # 성능 개선이 없으므로 카운터 증가 + print(f" (Patience: {counter}/{patience})") + + if counter >= patience: + print(f"\n>> [Early Stopping] {patience}번의 에포크 동안 개선이 없어 학습을 중단합니다.") + break + + # 7. 저장 + os.makedirs(args.output_dir, exist_ok=True) + model_path = os.path.join(args.output_dir, "model.pt") + scaler_path = os.path.join(args.output_dir, "scaler.pkl") + metadata_path = os.path.join(args.output_dir, "metadata.json") + + torch.save(best_state or model.state_dict(), model_path) + + with open(scaler_path, "wb") as f: + pickle.dump(scaler, f) + + metadata = { + "feature_columns": FEATURE_COLUMNS, + "horizons" : HORIZONS, + "seq_len" : args.seq_len, + "kernel_size" : args.kernel_size, + "dropout" : args.dropout, + "channels" : args.channels, + "model_path" : model_path, + "scaler_path" : scaler_path, + } + with open(metadata_path, "w", encoding="utf-8") as f: + json.dump(metadata, f, ensure_ascii=False, indent=2) + + print("\n>> 완료") + print(f" 모델 : {model_path}") + print(f" 스케일러: {scaler_path}") + print(f" 메타데이터: {metadata_path}") + + +def _find_kaggle_dataset_path() -> str: + """Kaggle 입력 데이터셋 경로를 자동으로 탐색""" + base = "/kaggle/input" + if os.path.exists(base): + for entry in os.listdir(base): + full = os.path.join(base, entry) + if os.path.isdir(full) and any(f.endswith(".parquet") for f in os.listdir(full)): + return full + return os.environ.get("PARQUET_DIR", base) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Train TCN signal model (Kaggle/Actions 버전)") + parser.add_argument("--parquet-dir", default=os.environ.get("PARQUET_DIR", _find_kaggle_dataset_path())) + parser.add_argument("--start-date", default="2015-01-01") + parser.add_argument("--end-date", default="2023-12-31") + parser.add_argument("--seq-len", type=int, default=60) + parser.add_argument("--epochs", type=int, default=20) + parser.add_argument("--batch-size", type=int, default=64) + parser.add_argument("--learning-rate", type=float, default=1e-3) + parser.add_argument("--kernel-size", type=int, default=3) + parser.add_argument("--dropout", type=float, default=0.2) + parser.add_argument("--channels", type=int, nargs="+", default=[32, 64, 64]) + parser.add_argument("--output-dir", default=os.environ.get('WEIGHTS_DIR', '/kaggle/working/tcn')) + return parser.parse_args() + + +if __name__ == "__main__": + train_model(parse_args()) + + +def train(): + """노트북에서 module.train()으로 호출하기 위한 래퍼""" + import argparse + args = argparse.Namespace( + parquet_dir = '/kaggle/input/sisc-ai-trading-dataset', + start_date = "2015-01-01", + end_date = "2023-12-31", + seq_len = 30, + epochs = 50, # 20 → 50 + batch_size = 64, + learning_rate = 1e-4, # 1e-3 → 1e-4 (학습률 낮춤) + kernel_size = 3, + dropout = 0.5, + channels = [64, 128, 128], # [32,64,64] → 더 크게 + output_dir = "/kaggle/working" +) + train_model(args) \ No newline at end of file diff --git a/AI/modules/signal/models/TCN/wrapper.py b/AI/modules/signal/models/TCN/wrapper.py index bedf845b..554a5554 100644 --- a/AI/modules/signal/models/TCN/wrapper.py +++ b/AI/modules/signal/models/TCN/wrapper.py @@ -48,7 +48,7 @@ def __init__(self, config: Dict[str, Any]): self.dropout = float(config.get("dropout", 0.2)) self.scaler = None self.metadata = {} - self.is_loaded = False #중복로딩 방지 플래그 + self.is_loaded = False artifact_paths = resolve_model_artifacts( model_name="tcn", @@ -62,7 +62,6 @@ def __init__(self, config: Dict[str, Any]): self.metadata_path = os.path.abspath(config.get("metadata_path", artifact_paths.metadata_path)) def build(self, input_shape: tuple): - # 학습 메타데이터 기준 shape로 TCN 본체를 복원합니다. self.model = TCNClassifier( input_size=input_shape[1], output_size=len(self.horizons), @@ -79,7 +78,6 @@ def train( y_val: Optional[np.ndarray] = None, **kwargs, ): - # wrapper 단독 테스트용 학습 루프입니다. 실제 대규모 학습은 train.py 사용을 기준으로 둡니다. if self.model is None: self.build(X_train.shape[1:]) @@ -115,10 +113,9 @@ def train( print(f"Epoch [{epoch + 1}/{epochs}] Loss: {epoch_loss:.4f}") def _load_artifacts(self): - # metadata -> scaler -> model 순서로 읽어 추론에 필요한 상태를 복원합니다. - if self.is_loaded: - return # 이미 로드된 상태라면 중복 로딩을 방지합니다. + return + if self.metadata_path and os.path.exists(self.metadata_path): with open(self.metadata_path, "r", encoding="utf-8") as f: self.metadata = json.load(f) @@ -138,6 +135,11 @@ def _load_artifacts(self): if self.model is not None and os.path.exists(self.model_path): state_dict = torch.load(self.model_path, map_location=self.device) + + # [수정] Kaggle DataParallel 학습 시 생기는 'module.' 접두사 제거 + if all(k.startswith("module.") for k in state_dict.keys()): + state_dict = {k[len("module."):]: v for k, v in state_dict.items()} + self.model.load_state_dict(state_dict) self.model.eval() @@ -151,20 +153,16 @@ def load_scaler(self, filepath: str): self.scaler_path = os.path.abspath(filepath) def _prepare_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: - # 서비스 파이프라인이 넘겨준 원본 df에서 TCN용 기술지표를 생성합니다. if df is None or df.empty: raise ValueError("Input dataframe is empty.") - prepared = get_standard_training_data(df.copy()) missing = [col for col in self.feature_columns if col not in prepared.columns] if missing: raise ValueError(f"Missing required TCN feature columns: {missing}") - prepared = prepared.replace([np.inf, -np.inf], np.nan).fillna(0) return prepared def _prepare_input_tensor(self, df: pd.DataFrame) -> torch.Tensor: - # 최근 seq_len 구간만 잘라서 학습 때와 동일한 feature 순서/스케일로 맞춥니다. prepared = self._prepare_dataframe(df) feature_frame = prepared[self.feature_columns] @@ -181,7 +179,6 @@ def _prepare_input_tensor(self, df: pd.DataFrame) -> torch.Tensor: return torch.from_numpy(batch).float().to(self.device) def predict(self, X_input: Union[pd.DataFrame, np.ndarray]) -> Dict[str, float]: - # DataFrame 입력이 기본 경로이며, 테스트 편의를 위해 ndarray도 허용합니다. self._load_artifacts() if self.model is None: @@ -200,7 +197,6 @@ def predict(self, X_input: Union[pd.DataFrame, np.ndarray]) -> Dict[str, float]: logits = self.model(tensor_x) probs = torch.sigmoid(logits).cpu().numpy().flatten() - # 포트폴리오 파이프라인이 바로 읽을 수 있도록 horizon별 dict 형태로 반환합니다. return { f"tcn_{horizon}d": float(prob) for horizon, prob in zip(self.horizons, probs) @@ -208,13 +204,8 @@ def predict(self, X_input: Union[pd.DataFrame, np.ndarray]) -> Dict[str, float]: def get_signals(self, df: pd.DataFrame, ticker_id: int = 0, sector_id: int = 0) -> Dict[str, float]: return self.predict(df) - + def predict_batch(self, ticker_data_map: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, float]]: - """ - [Batch 추론] 여러 종목의 DataFrame을 받아 한 번의 GPU 연산으로 결과를 반환합니다. - 입력: {"AAPL": df_aapl, "MSFT": df_msft, ...} - 출력: {"AAPL": {"tcn_1d": 0.55, ...}, "MSFT": {"tcn_1d": 0.61, ...}} - """ self._load_artifacts() if self.model is None: @@ -223,69 +214,55 @@ def predict_batch(self, ticker_data_map: Dict[str, pd.DataFrame]) -> Dict[str, D valid_tickers = [] tensor_list = [] - # 1. 딕셔너리로 받은 종목별 데이터를 순회하며 전처리 및 윈도우 추출 for ticker, df in ticker_data_map.items(): try: - # dataset_builder의 다형성을 활용하여 전처리 (에러 발생 종목은 스킵) prepared = get_standard_training_data(df.copy()) feature_frame = prepared[self.feature_columns] if len(feature_frame) < self.seq_len: - continue # 시퀀스 길이가 부족한 신규 상장 종목 등은 건너뜁니다. + continue latest_window = feature_frame.tail(self.seq_len).to_numpy(dtype=np.float32) - - # [참고] Global Scaler를 가정하고 일괄 적용합니다. if self.scaler is not None: latest_window = self.scaler.transform(latest_window).astype(np.float32) tensor_list.append(latest_window) valid_tickers.append(ticker) - + except Exception as e: - # 특정 종목 데이터 불량 시 전체 파이프라인이 멈추지 않도록 예외 처리 - print(f"⚠️ [{ticker}] 전처리 실패로 배치 추론에서 제외됨: {e}") + print(f"[{ticker}] 전처리 실패로 배치 추론에서 제외됨: {e}") if not tensor_list: - return {} # 유효한 종목이 없으면 빈 결과 반환 + return {} - # 2. 리스트에 모인 2D 배열들을 3D 텐서로 조립 [Batch, Seq, Features] batch_array = np.stack(tensor_list, axis=0) batch_tensor = torch.from_numpy(batch_array).float().to(self.device) - # 3. GPU 병렬 추론 (단 1번의 호출로 전체 종목 예측) self.model.eval() with torch.no_grad(): logits = self.model(batch_tensor) - probs = torch.sigmoid(logits).cpu().numpy() # 형태: [Batch, Horizons] + probs = torch.sigmoid(logits).cpu().numpy() - # 4. 결과를 포트폴리오 모듈이 인식할 수 있도록 딕셔너리로 매핑 results = {} for i, ticker in enumerate(valid_tickers): results[ticker] = { f"tcn_{horizon}d": float(probs[i, j]) for j, horizon in enumerate(self.horizons) } - + return results def save(self, filepath: str): - # 수동 저장이 필요한 경우 wrapper에서도 state_dict 저장이 가능합니다. if self.model is None: raise ValueError("No TCN model to save.") os.makedirs(os.path.dirname(filepath), exist_ok=True) torch.save(self.model.state_dict(), filepath) def load(self, filepath: str): - """ - 외부 경로의 가중치를 불러옵니다. - 가중치가 위치한 동일 폴더 내의 scaler 및 metadata를 읽어오도록 경로를 동기화합니다. - """ self.model_path = filepath target_dir = os.path.dirname(filepath) self.weights_dir = target_dir self.scaler_path = os.path.join(target_dir, "scaler.pkl") self.metadata_path = os.path.join(target_dir, "metadata.json") - - self.is_loaded = False # 새 경로로 로드할 때는 중복 로딩 방지 플래그를 초기화합니다. + self.is_loaded = False self._load_artifacts() diff --git a/AI/modules/signal/models/itransformer/train_kaggle.py b/AI/modules/signal/models/itransformer/train_kaggle.py new file mode 100644 index 00000000..d2527952 --- /dev/null +++ b/AI/modules/signal/models/itransformer/train_kaggle.py @@ -0,0 +1,364 @@ +# AI/modules/signal/models/itransformer/train_kaggle.py +""" +iTransformer Kaggle 학습 스크립트 +----------------------------------------------- +- DB 연결 없이 parquet 파일로 학습 +- train.py의 로직을 그대로 유지하되 DataLoader → parquet 로드로 교체 +- Kaggle 데이터셋: jihyeongkimm/sisc-ai-trading-dataset +- 저장: /kaggle/working/multi_horizon_model.keras + /kaggle/working/multi_horizon_scaler.pkl + /kaggle/working/metadata.json +----------------------------------------------- +""" +import os + +def _find_kaggle_parquet_dir() -> str: + """Kaggle parquet 데이터셋 경로 자동 탐색""" + import glob as _glob + matches = _glob.glob("/kaggle/input/**/price_data.parquet", recursive=True) + if matches: + return os.path.dirname(matches[0]) + return os.environ.get("PARQUET_DIR", "/kaggle/input") + +import sys +import json +import pickle +import warnings +warnings.filterwarnings("ignore") +os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" +os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" + +import numpy as np +import pandas as pd +import tensorflow as tf +from sklearn.preprocessing import StandardScaler + +# Kaggle 경로 설정 +def _find_kaggle_dataset_path() -> str: + """Kaggle 입력 데이터셋 경로 자동 탐색""" + base = "/kaggle/input" + if os.path.exists(base): + for root, dirs, files in os.walk(base): + if any(f.endswith(".parquet") for f in files): + return root + return os.environ.get("PARQUET_DIR", base) + +KAGGLE_DATA_DIR = _find_kaggle_dataset_path() +OUTPUT_DIR = "/kaggle/working" + +# ───────────────────────────────────────────────────────────────────────────── +# CONFIG +# ───────────────────────────────────────────────────────────────────────────── +CONFIG = { + "lookback" : 60, + "horizons" : [1, 3, 5, 7], + "train_end_date" : "2023-12-31", + "epochs" : 50, + "batch_size" : 32, + "learning_rate" : 1e-4, + "head_size" : 128, + "num_heads" : 4, + "ff_dim" : 256, + "num_blocks" : 4, + "mlp_units" : [128, 64], + "dropout" : 0.2, + "mlp_dropout" : 0.2, + "test_size" : 0.2, + "model_name" : "multi_horizon_model.keras", # artifact_paths.py, wrapper.py와 일치 + "scaler_name" : "multi_horizon_scaler.pkl", # artifact_paths.py, wrapper.py와 일치 + "metadata_name" : "metadata.json", # artifact_paths.py, wrapper.py와 일치 +} + +# iTransformer 피처 - 거시경제 + 상관관계 중심 +FEATURE_COLUMNS = [ + "us10y", + "us10y_chg", + "yield_spread", + "vix_close", + "vix_change_rate", + "dxy_close", + "dxy_chg", + "credit_spread_hy", + "wti_price", + "gold_price", + "nh_nl_index", + "ma200_pct", + "correlation_spike", + "recent_loss_ema", + "ret_1d", + "intraday_vol", + "log_return", + "surprise_cpi", +] + +HORIZONS = CONFIG["horizons"] + + +# ───────────────────────────────────────────────────────────────────────────── +# 데이터 로드 +# ───────────────────────────────────────────────────────────────────────────── +def load_parquet_data() -> pd.DataFrame: + """parquet에서 price_data + macroeconomic_indicators 로드 후 병합""" + print(">> parquet 데이터 로드 중...") + + price_path = os.path.join(KAGGLE_DATA_DIR, "price_data.parquet") + macro_path = os.path.join(KAGGLE_DATA_DIR, "macroeconomic_indicators.parquet") + + price_df = pd.read_parquet(price_path) + price_df["date"] = pd.to_datetime(price_df["date"]) + + # 기본 피처 계산 + price_df = price_df.sort_values(["ticker", "date"]).reset_index(drop=True) + price_df["log_return"] = price_df.groupby("ticker")["close"].transform( + lambda x: np.log(x / x.shift(1)) + ) + price_df["ret_1d"] = price_df.groupby("ticker")["close"].transform( + lambda x: x.pct_change() + ) + price_df["intraday_vol"] = (price_df["high"] - price_df["low"]) / price_df["close"] + price_df["ma200"] = price_df.groupby("ticker")["close"].transform( + lambda x: x.rolling(200, min_periods=1).mean() + ) + price_df["ma200_pct"] = (price_df["close"] - price_df["ma200"]) / price_df["ma200"] + + # EMA 최근 손실 (단순 근사) + price_df["recent_loss_ema"] = price_df.groupby("ticker")["log_return"].transform( + lambda x: x.clip(upper=0).ewm(span=20).mean().abs() + ) + + # 거시경제 데이터 병합 + macro_df = pd.read_parquet(macro_path) + macro_df["date"] = pd.to_datetime(macro_df["date"]) + + # 필요한 거시 컬럼만 선택 (있는 것만) + macro_cols = ["date", "us10y", "yield_spread", "vix_close", "dxy_close", + "credit_spread_hy", "wti_price", "gold_price", + "nh_nl_index", "correlation_spike", "surprise_cpi"] + available_macro = [c for c in macro_cols if c in macro_df.columns] + macro_df = macro_df[available_macro].drop_duplicates("date") + + # 변화율 계산 + if "us10y" in macro_df.columns: + macro_df["us10y_chg"] = macro_df["us10y"].diff() + if "dxy_close" in macro_df.columns: + macro_df["dxy_chg"] = macro_df["dxy_close"].pct_change() + if "vix_close" in macro_df.columns: + macro_df["vix_change_rate"] = macro_df["vix_close"].pct_change() + + df = pd.merge(price_df, macro_df, on="date", how="left") + df = df.sort_values(["ticker", "date"]).reset_index(drop=True) + + # macro 컬럼만 티커별로 ffill (전역 ffill 시 티커 간 누수 발생) + macro_cols = [c for c in macro_df.columns if c != "date"] + df[macro_cols] = df.groupby("ticker")[macro_cols].transform(lambda x: x.ffill()) + df = df.fillna(0) + + # 학습 기간 필터 + df = df[df["date"] <= CONFIG["train_end_date"]] + + print(f">> 로드 완료: {len(df):,}행, {df['ticker'].nunique()}개 종목") + return df + + +# ───────────────────────────────────────────────────────────────────────────── +# 시퀀스 생성 +# ───────────────────────────────────────────────────────────────────────────── +def build_sequences( + df: pd.DataFrame, + scaler: StandardScaler, + fit_scaler: bool = False, +) -> tuple: + lookback = CONFIG["lookback"] + horizons = CONFIG["horizons"] + max_horizon = max(horizons) + + # 사용 가능한 피처만 추출 + available_feats = [f for f in FEATURE_COLUMNS if f in df.columns] + if len(available_feats) < 8: + raise ValueError(f"피처가 너무 적습니다: {available_feats}") + + # 스케일러는 루프 밖에서 전체 데이터로 한 번만 fit + # 루프 안에서 fit하면 마지막 티커 통계만 남아 스케일 불일치 발생 + all_feat_vals = df[available_feats].values.astype(np.float32) + if fit_scaler: + scaler.fit(all_feat_vals) + + X_list, y_list = [], [] + + for ticker, group in df.groupby("ticker"): + group = group.sort_values("date").reset_index(drop=True) + if len(group) < lookback + max_horizon + 10: + continue + + feat_vals = scaler.transform(group[available_feats].values.astype(np.float32)) + close_vals = group["close"].values + + for i in range(lookback, len(group) - max_horizon): + X_list.append(feat_vals[i - lookback : i]) + labels = [] + for h in horizons: + future_ret = (close_vals[i + h] - close_vals[i]) / close_vals[i] + labels.append(1.0 if future_ret > 0 else 0.0) + y_list.append(labels) + + if not X_list: + raise ValueError("시퀀스 생성 실패 - 데이터 부족") + + X = np.array(X_list, dtype=np.float32) + y = np.array(y_list, dtype=np.float32) + print(f">> 시퀀스: X={X.shape}, y={y.shape}, 피처={available_feats}") + return X, y, available_feats + + +# ───────────────────────────────────────────────────────────────────────────── +# 모델 구성 +# ───────────────────────────────────────────────────────────────────────────── +def build_model(seq_len: int, n_features: int, n_outputs: int) -> tf.keras.Model: + """iTransformer: 변수(feature) 축을 토큰으로 취급하는 Transformer""" + from tensorflow.keras import layers + + # 입력 + seq_input = tf.keras.Input(shape=(seq_len, n_features), name="sequence_input") + + # Transpose: [batch, seq, feat] → [batch, feat, seq] + x = layers.Permute((2, 1), name="transpose_in")(seq_input) + + # Transformer Encoder 블록 + for block_idx in range(CONFIG["num_blocks"]): + name = f"block{block_idx}" + attn_in = layers.LayerNormalization(epsilon=1e-6, name=f"{name}_ln1")(x) + attn_out = layers.MultiHeadAttention( + num_heads=CONFIG["num_heads"], + key_dim=CONFIG["head_size"] // CONFIG["num_heads"], + dropout=CONFIG["dropout"], + name=f"{name}_mha", + )(attn_in, attn_in) + attn_out = layers.Dropout(CONFIG["dropout"])(attn_out) + x = layers.Add(name=f"{name}_attn_add")([x, attn_out]) + + ffn_in = layers.LayerNormalization(epsilon=1e-6, name=f"{name}_ln2")(x) + ffn = layers.Dense(CONFIG["ff_dim"], activation="gelu", name=f"{name}_ffn1")(ffn_in) + ffn = layers.Dropout(CONFIG["dropout"])(ffn) + ffn = layers.Dense(seq_len, name=f"{name}_ffn2")(ffn) + ffn = layers.Dropout(CONFIG["dropout"])(ffn) + x = layers.Add(name=f"{name}_ffn_add")([x, ffn]) + + # Transpose back: [batch, feat, seq] → [batch, seq, feat] + x = layers.Permute((2, 1), name="transpose_out")(x) + + # Global Average Pooling + x = layers.GlobalAveragePooling1D()(x) + + # MLP Head + for units in CONFIG["mlp_units"]: + x = layers.Dense(units, activation="relu")(x) + x = layers.Dropout(CONFIG["mlp_dropout"])(x) + + output = layers.Dense(n_outputs, activation="sigmoid", name="output")(x) + + model = tf.keras.Model(inputs=seq_input, outputs=output) + model.compile( + optimizer=tf.keras.optimizers.Adam(learning_rate=CONFIG["learning_rate"]), + loss="binary_crossentropy", + metrics=["accuracy"], + ) + return model + + +# ───────────────────────────────────────────────────────────────────────────── +# 학습 메인 +# ───────────────────────────────────────────────────────────────────────────── +def train(): + print("=" * 56) + print(" iTransformer Kaggle 학습 시작") + print(f" Horizons: {CONFIG['horizons']}일") + print(f" Lookback: {CONFIG['lookback']}일") + print("=" * 56) + + # 1. 데이터 로드 + df = load_parquet_data() + + # 2. Train / Val 분리 (티커 기준, 시간 순서 보존) + tickers = df["ticker"].unique() + n_val = max(1, int(len(tickers) * CONFIG["test_size"])) + val_tickers = tickers[-n_val:] + train_tickers = tickers[:-n_val] + + train_df = df[df["ticker"].isin(train_tickers)].copy() + val_df = df[df["ticker"].isin(val_tickers)].copy() + print(f"\n>> Train: {len(train_tickers)}개 종목 | Val: {len(val_tickers)}개 종목") + + # 3. 시퀀스 생성 (train만 scaler fit) + scaler = StandardScaler() + X_train, y_train, feat_cols = build_sequences(train_df, scaler, fit_scaler=True) + X_val, y_val, _ = build_sequences(val_df, scaler, fit_scaler=False) + + # 4. 모델 구성 + n_features = X_train.shape[2] + n_outputs = len(HORIZONS) + model = build_model(CONFIG["lookback"], n_features, n_outputs) + model.summary() + + # 5. 학습 + callbacks = [ + tf.keras.callbacks.EarlyStopping( + monitor="val_loss", patience=10, restore_best_weights=True + ), + tf.keras.callbacks.ReduceLROnPlateau( + monitor="val_loss", factor=0.5, patience=5, verbose=1 + ), + ] + + history = model.fit( + X_train, y_train, + validation_data=(X_val, y_val), + epochs=CONFIG["epochs"], + batch_size=CONFIG["batch_size"], + callbacks=callbacks, + verbose=1, + ) + + # 6. 저장 + os.makedirs(OUTPUT_DIR, exist_ok=True) + model_path = os.path.join(OUTPUT_DIR, CONFIG["model_name"]) + scaler_path = os.path.join(OUTPUT_DIR, CONFIG["scaler_name"]) + metadata_path = os.path.join(OUTPUT_DIR, CONFIG["metadata_name"]) + + model.save(model_path) + with open(scaler_path, "wb") as f: + pickle.dump(scaler, f) + + best_val_loss = min(history.history["val_loss"]) + best_val_acc = max(history.history.get("val_accuracy", [0])) + + metadata = { + "model_name" : "itransformer", + "seq_len" : CONFIG["lookback"], + "feature_names" : feat_cols, + "feature_columns": feat_cols, + "horizons" : HORIZONS, + "head_size" : CONFIG["head_size"], + "num_heads" : CONFIG["num_heads"], + "ff_dim" : CONFIG["ff_dim"], + "num_blocks" : CONFIG["num_blocks"], + "mlp_units" : CONFIG["mlp_units"], + "dropout" : CONFIG["dropout"], + "mlp_dropout" : CONFIG["mlp_dropout"], + "best_val_loss" : round(best_val_loss, 4), + "best_val_acc" : round(best_val_acc, 4), + "n_train_samples": int(len(X_train)), + "n_val_samples" : int(len(X_val)), + } + with open(metadata_path, "w", encoding="utf-8") as f: + json.dump(metadata, f, ensure_ascii=False, indent=2) + + print("\n학습 완료!") + print(f" 모델 : {model_path}") + print(f" 스케일러: {scaler_path}") + print(f" 메타데이터: {metadata_path}") + print(f" Best val_loss: {best_val_loss:.4f}") + print(f" Best val_acc : {best_val_acc:.4f}") + + +if __name__ == "__main__": + train() diff --git a/AI/modules/signal/models/transformer/train_kaggle.py b/AI/modules/signal/models/transformer/train_kaggle.py new file mode 100644 index 00000000..ab772a65 --- /dev/null +++ b/AI/modules/signal/models/transformer/train_kaggle.py @@ -0,0 +1,303 @@ +# AI/modules/signal/models/transformer/train_kaggle.py +""" +Transformer 학습 스크립트 - Kaggle/GitHub Actions 버전 +----------------------------------------------- +[train.py와의 차이점] +- DB 연결 없음 (DataLoader.load_data_from_db 사용 안 함) +- parquet 파일에서 직접 로드 +- GitHub Actions 자동화 파이프라인에서 사용 + +[train.py는 그대로 유지] +- 로컬/서버에서 DB 연결로 학습할 때 사용 +----------------------------------------------- +""" +import os +import sys +import pickle +import warnings +import logging +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split + +# 불필요한 로그 억제 +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' +os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' +warnings.filterwarnings('ignore') +logging.getLogger('tensorflow').setLevel(logging.ERROR) + +import tensorflow as tf +tf.get_logger().setLevel('ERROR') +from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau + +# ───────────────────────────────────────────────────────────────────────────── +# GPU 설정 +# ───────────────────────────────────────────────────────────────────────────── +print("텐서플로우 버전:", tf.__version__) +print("GPU 목록:", tf.config.list_physical_devices('GPU')) + +gpus = tf.config.list_physical_devices('GPU') +if gpus: + try: + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + print(f"🚀 GPU {len(gpus)}대 사용") + except RuntimeError as e: + print(e) +else: + print("🐢 CPU 사용") + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.signal.models.transformer.architecture import build_transformer_model +from sklearn.preprocessing import StandardScaler +from AI.modules.features.legacy.technical_features import ( + add_technical_indicators, + add_multi_timeframe_features +) + +# ───────────────────────────────────────────────────────────────────────────── +# CONFIG +# ───────────────────────────────────────────────────────────────────────────── +CONFIG = { + # parquet 경로 (환경변수로 주입 가능) + 'parquet_dir' : os.environ.get( + 'PARQUET_DIR', + '/kaggle/input/datasets/jihyeongkimm/sisc-ai-trading-dataset' + ), + 'start_date' : '2015-01-01', + 'end_date' : '2023-12-31', + 'seq_len' : 60, + 'batch_size' : 32, + 'epochs' : 50, + + # 저장 경로 (환경변수로 주입 가능) + 'weights_dir' : os.environ.get('WEIGHTS_DIR', '/kaggle/working'), + 'model_name' : 'multi_horizon_model.keras', + 'scaler_name' : 'multi_horizon_scaler.pkl', +} + +TRANSFORMER_TRAIN_FEATURES = [ + "log_return", + "open_ratio", + "high_ratio", + "low_ratio", + "vol_change", + "ma5_ratio", + "ma20_ratio", + "ma60_ratio", + "rsi", + "macd_ratio", + "bb_position", + "week_ma20_ratio", + "week_rsi", + "week_bb_pos", + "week_vol_change", + "month_ma12_ratio", + "month_rsi", +] + + +# ───────────────────────────────────────────────────────────────────────────── +# [핵심 변경] 데이터 로드 +# train.py: DataLoader.load_data_from_db() → DB 연결 필요 +# train_kaggle.py: pd.read_parquet() → DB 연결 불필요 +# ───────────────────────────────────────────────────────────────────────────── +def load_and_preprocess(): + parquet_path = os.path.join(CONFIG['parquet_dir'], 'price_data.parquet') + print(f">> parquet 로드 중: {parquet_path}") + + raw_df = pd.read_parquet(parquet_path) + raw_df['date'] = pd.to_datetime(raw_df['date']) + + raw_df = raw_df[ + (raw_df['date'] >= CONFIG['start_date']) & + (raw_df['date'] <= CONFIG['end_date']) + ].copy() + + print(f">> 로드 완료: {len(raw_df):,}행, {raw_df['ticker'].nunique()}개 종목") + + # 피처 계산 + print(">> 피처 계산 중...") + processed = [] + fail_count = 0 + + for ticker in raw_df['ticker'].unique(): + df_t = raw_df[raw_df['ticker'] == ticker].copy() + try: + df_t = add_technical_indicators(df_t) + df_t = add_multi_timeframe_features(df_t) + processed.append(df_t) + except Exception as e: + fail_count += 1 + if fail_count >= 20: + raise RuntimeError("피처 계산 실패가 20개를 초과했습니다.") + + full_df = pd.concat(processed).reset_index(drop=True) + print(f">> 피처 계산 완료: {len(full_df):,}행 (실패: {fail_count}개)") + return full_df + + +def build_sequences_transformer(full_df, scaler, fit_scaler=True): + """Transformer용 시퀀스 생성 (DB 없이 직접 구현)""" + seq_len = CONFIG['seq_len'] + horizons = [1, 3, 5, 7] + max_h = max(horizons) + available = [c for c in TRANSFORMER_TRAIN_FEATURES if c in full_df.columns] + + # ticker/sector ID 매핑 + tickers = sorted(full_df['ticker'].unique()) + sectors = sorted(full_df['sector'].unique() if 'sector' in full_df.columns else ['Unknown']) + ticker_to_id = {t: i for i, t in enumerate(tickers)} + sector_to_id = {s: i for i, s in enumerate(sectors)} + + full_df = full_df.dropna(subset=available).copy() + if fit_scaler: + full_df[available] = scaler.fit_transform(full_df[available]) + else: + full_df[available] = scaler.transform(full_df[available]) + + X_ts, X_tick, X_sec, y_list = [], [], [], [] + + for ticker, group in full_df.groupby('ticker'): + group = group.sort_values('date').reset_index(drop=True) + sector = group['sector'].iloc[0] if 'sector' in group.columns else 'Unknown' + tick_id = ticker_to_id.get(ticker, 0) + sec_id = sector_to_id.get(sector, 0) + + if len(group) < seq_len + max_h: + continue + + feat_vals = group[available].values + closes = group['close'].values + + for i in range(len(group) - seq_len - max_h + 1): + window = feat_vals[i:i + seq_len] + curr = closes[i + seq_len - 1] + labels = [1.0 if closes[i + seq_len + h - 1] > curr else 0.0 for h in horizons] + X_ts.append(window) + X_tick.append(tick_id) + X_sec.append(sec_id) + y_list.append(labels) + + return (np.array(X_ts, dtype=np.float32), + np.array(X_tick, dtype=np.int32), + np.array(X_sec, dtype=np.int32), + np.array(y_list, dtype=np.float32), + len(tickers), len(sectors)) + + +def train_single_pipeline(): + print("=" * 50) + print(" Transformer 학습 시작 (Kaggle/Actions 버전)") + print("=" * 50) + + # 1. 데이터 로드 + full_df = load_and_preprocess() + + # stock_info parquet에서 sector 정보 병합 + stock_info_path = os.path.join(CONFIG['parquet_dir'], 'stock_info.parquet') + if os.path.exists(stock_info_path): + stock_info = pd.read_parquet(stock_info_path)[['ticker', 'sector']] + full_df = full_df.merge(stock_info, on='ticker', how='left') + full_df['sector'] = full_df['sector'].fillna('Unknown') + print(f">> sector 병합 완료") + + horizons = [1, 3, 5, 7] + n_outputs = len(horizons) + + # 2. Train/Val 분리 (ticker 기준) + tickers = full_df['ticker'].unique() + n_val = max(1, int(len(tickers) * 0.2)) + val_tickers = tickers[-n_val:] + train_tickers = tickers[:-n_val] + + train_df = full_df[full_df['ticker'].isin(train_tickers)].copy() + val_df = full_df[full_df['ticker'].isin(val_tickers)].copy() + print(f">> Train: {len(train_tickers)}개, Val: {len(val_tickers)}개 종목") + + # 3. 시퀀스 생성 + scaler = StandardScaler() + X_ts_train, X_tick_train, X_sec_train, y_train, n_tickers, n_sectors = build_sequences_transformer(train_df, scaler, fit_scaler=True) + X_ts_val, X_tick_val, X_sec_val, y_val, _, _ = build_sequences_transformer(val_df, scaler, fit_scaler=False) + + print(f"\n>> 시퀀스 생성 완료: {X_ts_train.shape}") + print(f">> horizon: {horizons}") + + # 4. 모델 빌드 + model = build_transformer_model( + input_shape=(X_ts_train.shape[1], X_ts_train.shape[2]), + n_tickers=n_tickers, + n_sectors=n_sectors, + n_outputs=n_outputs + ) + + model.compile( + optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), + loss="binary_crossentropy", + metrics=["accuracy"] + ) + + # 5. 저장 경로 + save_dir = CONFIG['weights_dir'] + os.makedirs(save_dir, exist_ok=True) + model_path = os.path.join(save_dir, CONFIG['model_name']) + scaler_path = os.path.join(save_dir, CONFIG['scaler_name']) + + # 6. 콜백 + callbacks = [ + ModelCheckpoint( + filepath=model_path, + monitor='val_loss', + save_best_only=True, + verbose=1 + ), + EarlyStopping( + monitor='val_loss', + patience=10, + restore_best_weights=True + ), + ReduceLROnPlateau( + monitor='val_loss', + factor=0.5, + patience=5, + min_lr=1e-6, + verbose=1 + ), + ] + + # 7. 학습 + print(f">> 학습 시작 (epochs={CONFIG['epochs']})") + model.fit( + x=[X_ts_train, X_tick_train, X_sec_train], + y=y_train, + validation_data=([X_ts_val, X_tick_val, X_sec_val], y_val), + epochs=CONFIG['epochs'], + batch_size=CONFIG['batch_size'], + shuffle=True, + callbacks=callbacks, + verbose=2 + ) + + # 8. 스케일러 저장 + with open(scaler_path, "wb") as f: + pickle.dump(scaler, f) + + print(f"\n>> 완료") + print(f" 모델 : {model_path}") + print(f" 스케일러: {scaler_path}") + + +if __name__ == "__main__": + train_single_pipeline() + + +def train(): + """노트북에서 module.train()으로 호출하기 위한 래퍼""" + train_single_pipeline() diff --git a/AI/pipelines/weekly_routine.py b/AI/pipelines/weekly_routine.py new file mode 100644 index 00000000..fecba3e3 --- /dev/null +++ b/AI/pipelines/weekly_routine.py @@ -0,0 +1,172 @@ +# AI/pipelines/weekly_routine.py +""" +[주간 자동화 파이프라인] +- 매주 화요일 새벽 2시 자동 실행 (GitHub Actions) +- 또는 수동 실행 가능 + +[daily_routine.py와의 차이] +- daily_routine.py: 매일 → 매매 신호 생성 + 주문 실행 +- weekly_routine.py: 매주 → 최신 데이터로 모델 재학습 + 가중치 배포 + +[실행 순서] +1. DB 최신 데이터 추출 → parquet +2. Kaggle 데이터셋 업데이트 +3. Kaggle 노트북 학습 트리거 (모델별 순차 실행) +4. 학습 완료 후 가중치 다운로드 +5. 서버에 가중치 배포 + +[실행 방법] + # 로컬 (Termius 터널 켜둔 상태) + python AI/pipelines/weekly_routine.py + + # GitHub Actions에서 자동 실행 + .github/workflows/train.yml 참고 +""" + +import os +import sys +import argparse +import subprocess +import traceback +from datetime import datetime + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +SCRIPTS_DIR = os.path.join(project_root, "AI/scripts") + + +def log(msg: str): + """타임스탬프 포함 로그""" + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{now}] {msg}") + + +def run_script(script_name: str, desc: str) -> bool: + """ + AI/scripts/ 하위 스크립트 실행 + 성공하면 True, 실패하면 False 반환 + """ + script_path = os.path.join(SCRIPTS_DIR, script_name) + log(f">> [{desc}] 시작...") + + result = subprocess.run( + [sys.executable, script_path], + cwd=project_root, + ) + + if result.returncode == 0: + log(f">> [{desc}] 완료! ✅") + return True + else: + log(f">> [{desc}] 실패! ❌") + return False + + +def run_weekly_pipeline(skip_extract: bool = False, skip_upload: bool = False): + """ + 주간 학습 파이프라인 메인 + + skip_extract: True면 DB 추출 스킵 (parquet 이미 있을 때) + skip_upload: True면 Kaggle 업로드 스킵 (데이터 변경 없을 때) + """ + start_time = datetime.now() + + log("=" * 50) + log(" 주간 학습 파이프라인 시작") + log(f" skip_extract: {skip_extract}") + log(f" skip_upload: {skip_upload}") + log("=" * 50) + + # ───────────────────────────────────────────────────── + # STEP 1. DB 추출 → parquet + # 서버 DB에서 최신 데이터를 parquet으로 추출 + # 로컬: Termius 터널 필요 + # Actions: paramiko SSH 터널 자동 오픈 + # ───────────────────────────────────────────────────── + if not skip_extract: + success = run_script("extract_to_parquet.py", "DB 추출") + if not success: + log("❌ DB 추출 실패. 파이프라인 중단.") + return False + else: + log(">> [DB 추출] 스킵") + + # ───────────────────────────────────────────────────── + # STEP 2. Kaggle 데이터셋 업데이트 + # 최신 parquet + GitHub 최신 코드 Kaggle에 업로드 + # ───────────────────────────────────────────────────── + if not skip_upload: + success = run_script("upload_to_kaggle.py", "Kaggle 업로드") + if not success: + log("❌ Kaggle 업로드 실패. 파이프라인 중단.") + return False + else: + log(">> [Kaggle 업로드] 스킵") + + # ───────────────────────────────────────────────────── + # STEP 3. Kaggle 노트북 학습 트리거 + 완료 대기 + # PatchTST → Transformer 순서로 순차 실행 + # (iTransformer, TCN 머지 후 추가 예정) + # ───────────────────────────────────────────────────── + success = run_script("trigger_training.py", "Kaggle 학습 트리거") + if not success: + log("❌ 학습 트리거 실패. 파이프라인 중단.") + return False + + # ───────────────────────────────────────────────────── + # STEP 4. 가중치 다운로드 + # Kaggle Output → 로컬 AI/data/weights/ + # ───────────────────────────────────────────────────── + success = run_script("download_weights.py", "가중치 다운로드") + if not success: + log("❌ 가중치 다운로드 실패. 파이프라인 중단.") + return False + + # ───────────────────────────────────────────────────── + # STEP 5. 서버 배포 + # SCP로 운영 서버 AI/data/weights/ 에 가중치 덮어씌움 + # ───────────────────────────────────────────────────── + success = run_script("deploy_to_server.py", "서버 배포") + if not success: + log("❌ 서버 배포 실패.") + return False + + # ───────────────────────────────────────────────────── + # 완료 + # ───────────────────────────────────────────────────── + elapsed = datetime.now() - start_time + hours = int(elapsed.total_seconds() // 3600) + minutes = int((elapsed.total_seconds() % 3600) // 60) + + log("=" * 50) + log("✅ 주간 학습 파이프라인 완료!") + log(f" 총 소요 시간: {hours}시간 {minutes}분") + log(" → 새 가중치로 daily_routine.py 동작 가능") + log("=" * 50) + return True + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="주간 모델 학습 파이프라인") + parser.add_argument( + "--skip-extract", + action="store_true", + help="DB 추출 스킵 (parquet 이미 있을 때)" + ) + parser.add_argument( + "--skip-upload", + action="store_true", + help="Kaggle 업로드 스킵 (데이터 변경 없을 때)" + ) + args = parser.parse_args() + + run_weekly_pipeline( + skip_extract = args.skip_extract, + skip_upload = args.skip_upload, + ) diff --git a/AI/scripts/deploy_to_server.py b/AI/scripts/deploy_to_server.py new file mode 100644 index 00000000..9b1ea7a4 --- /dev/null +++ b/AI/scripts/deploy_to_server.py @@ -0,0 +1,165 @@ +# AI/scripts/deploy_to_server.py +""" +[목적] + 다운로드된 가중치 파일을 운영 서버에 배포 + 로컬 AI/data/weights/ → 서버 AI/data/weights/ + +[실행 방법] + python AI/scripts/deploy_to_server.py + +[GitHub Actions에서] + download_weights.py 완료 후 자동 실행 + SSH_HOST, SSH_USER, SSH_PRIVATE_KEY 환경변수 필요 + +[전제 조건] + - paramiko, scp 설치 필요 + pip install paramiko scp +""" +import os +import sys +import io + +import paramiko +from scp import SCPClient + +# ───────────────────────────────────────────────────────────────────────────── +# 서버 접속 정보 (GitHub Secrets → 환경변수로 주입) +# ───────────────────────────────────────────────────────────────────────────── +SSH_HOST = os.environ.get("SSH_HOST") +SSH_USER = os.environ.get("SSH_USER") +SSH_KEY_STR = os.environ.get("SSH_PRIVATE_KEY") # 키 내용 (파일 경로 아님) +SSH_PORT = int(os.environ.get("SSH_PORT", 22)) + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../..")) + +SERVER_WEIGHTS_PATH = os.environ.get( + "SERVER_WEIGHTS_PATH", + "/app/AI/data/weights" +) + +# ───────────────────────────────────────────────────────────────────────────── +# 배포할 모델 목록 (4개 전체) +# ───────────────────────────────────────────────────────────────────────────── +MODELS = [ + { + "name" : "PatchTST", + "local_dir" : os.path.join(project_root, "AI/data/weights/PatchTST"), + "remote_dir": f"{SERVER_WEIGHTS_PATH}/PatchTST", + "files" : ["patchtst_model.pt", "patchtst_scaler.pkl"], + }, + { + "name" : "Transformer", + "local_dir" : os.path.join(project_root, "AI/data/weights/transformer/prod"), + "remote_dir": f"{SERVER_WEIGHTS_PATH}/transformer/prod", + "files" : ["multi_horizon_model_prod.keras", "multi_horizon_scaler_prod.pkl"], + }, + { + "name" : "iTransformer", + "local_dir" : os.path.join(project_root, "AI/data/weights/itransformer"), + "remote_dir": f"{SERVER_WEIGHTS_PATH}/itransformer", + "files" : ["multi_horizon_model.keras", "multi_horizon_scaler.pkl", "metadata.json"], + }, + { + "name" : "TCN", + "local_dir" : os.path.join(project_root, "AI/data/weights/tcn"), + "remote_dir": f"{SERVER_WEIGHTS_PATH}/tcn", + "files" : ["model.pt", "scaler.pkl", "metadata.json"], + }, +] + + +def create_ssh_client() -> paramiko.SSHClient: + """SSH 연결 생성 (키 문자열로 직접 연결)""" + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + for key_class in [paramiko.Ed25519Key, paramiko.RSAKey, paramiko.ECDSAKey]: + try: + private_key = key_class.from_private_key(io.StringIO(SSH_KEY_STR)) + break + except Exception: + continue + else: + raise ValueError("SSH 키 타입을 인식할 수 없습니다.") + ssh.connect( + hostname = SSH_HOST, + port = SSH_PORT, + username = SSH_USER, + pkey = private_key, + timeout = 30, + ) + return ssh + + +def deploy_model(ssh: paramiko.SSHClient, scp: SCPClient, model: dict) -> bool: + """모델 가중치를 서버에 배포""" + print(f"\n>> [{model['name']}] 배포 중...") + + ssh.exec_command(f"mkdir -p {model['remote_dir']}") + + for fname in model['files']: + local_path = os.path.join(model['local_dir'], fname) + + if not os.path.exists(local_path): + print(f" [경고] 파일 없음 (스킵): {local_path}") + continue + + remote_path = f"{model['remote_dir']}/{fname}" + size = os.path.getsize(local_path) / (1024 * 1024) + + print(f" 전송 중: {fname} ({size:.1f} MB)...") + scp.put(local_path, remote_path) + print(f" 전송 완료: {remote_path}") + + print(f" [{model['name']}] 배포 완료! ✅") + return True + + +# ───────────────────────────────────────────────────────────────────────────── +# 환경변수 검증 +# ───────────────────────────────────────────────────────────────────────────── +print("=" * 50) +print(">> deploy_to_server.py 시작") +print("=" * 50) + +if not all([SSH_HOST, SSH_USER, SSH_KEY_STR]): + print("❌ SSH 접속 정보 없음!") + print(" SSH_HOST, SSH_USER, SSH_PRIVATE_KEY 환경변수를 설정하세요.") + sys.exit(1) + +print(f">> 서버: {SSH_USER}@{SSH_HOST}:{SSH_PORT}") +print(f">> 배포 경로: {SERVER_WEIGHTS_PATH}") + +# ───────────────────────────────────────────────────────────────────────────── +# SSH 연결 + 배포 +# ───────────────────────────────────────────────────────────────────────────── +try: + print("\n>> SSH 연결 중...") + ssh = create_ssh_client() + scp = SCPClient(ssh.get_transport()) + print(">> SSH 연결 성공! ✅") + + failed = [] + for model in MODELS: + success = deploy_model(ssh, scp, model) + if not success: + failed.append(model['name']) + + scp.close() + ssh.close() + + print("\n" + "=" * 50) + if failed: + print(f">> 실패한 모델: {failed}") + sys.exit(1) + else: + print(">> 전체 배포 완료! ✅") + print(">> 서버가 새 가중치로 업데이트됐습니다.") + print("=" * 50) + +except Exception as e: + print(f"❌ 배포 실패: {e}") + sys.exit(1) diff --git a/AI/scripts/download_weights.py b/AI/scripts/download_weights.py new file mode 100644 index 00000000..4d96a185 --- /dev/null +++ b/AI/scripts/download_weights.py @@ -0,0 +1,194 @@ +# AI/scripts/download_weights.py +""" +[목적] + Kaggle 노트북 Output에서 학습된 가중치 파일만 선택적 다운로드 + kaggle kernels output CLI 대신 Kaggle API 직접 사용 (안정적 다운로드) +""" +import os +import shutil +import sys +import requests +import json + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../..")) + +KAGGLE_USERNAME = os.environ.get("KAGGLE_USERNAME", "jihyeongkimm") +KAGGLE_KEY = os.environ.get("KAGGLE_KEY", "") + +# kaggle.json에서 읽기 (로컬 환경) +if not KAGGLE_KEY: + kaggle_json = os.path.expanduser("~/.kaggle/kaggle.json") + if os.path.exists(kaggle_json): + with open(kaggle_json) as f: + creds = json.load(f) + KAGGLE_USERNAME = creds.get("username", KAGGLE_USERNAME) + KAGGLE_KEY = creds.get("key", "") + +MODELS = [ + { + "name" : "PatchTST", + "slug" : f"{KAGGLE_USERNAME}/patchtst-training", + "dst_dir" : os.path.join(project_root, "AI/data/weights/PatchTST"), + "keep_files": ["patchtst_model.pt", "patchtst_scaler.pkl"], + }, + { + "name" : "Transformer", + "slug" : f"{KAGGLE_USERNAME}/transformer-training", + "dst_dir" : os.path.join(project_root, "AI/data/weights/transformer/prod"), + "keep_files": ["multi_horizon_model.keras", "multi_horizon_scaler.pkl"], + }, + { + "name" : "iTransformer", + "slug" : f"{KAGGLE_USERNAME}/itransformer-training", + "dst_dir" : os.path.join(project_root, "AI/data/weights/itransformer"), + "keep_files": ["multi_horizon_model.keras", "multi_horizon_scaler.pkl", "metadata.json"], + }, + { + "name" : "TCN", + "slug" : f"{KAGGLE_USERNAME}/tcn-training", + "dst_dir" : os.path.join(project_root, "AI/data/weights/tcn"), + "keep_files": ["model.pt", "scaler.pkl", "metadata.json"], + }, +] + + +def list_output_files(slug: str) -> list: + """Kaggle API로 노트북 output 파일 목록 조회""" + owner, kernel = slug.split("/") + url = f"https://www.kaggle.com/api/v1/kernels/output/{owner}/{kernel}?page_token=START" + resp = requests.get(url, auth=(KAGGLE_USERNAME, KAGGLE_KEY)) + if resp.status_code != 200: + print(f" [오류] 파일 목록 조회 실패: {resp.status_code}") + return [] + data = resp.json() + files = data.get("files", []) + return files + + +def download_file(slug: str, file_name: str, dst_path: str) -> bool: + """Kaggle API로 특정 파일 다운로드 (스트리밍)""" + owner, kernel = slug.split("/") + url = f"https://www.kaggle.com/api/v1/kernels/output/{owner}/{kernel}?fileName={file_name}" + + with requests.get(url, auth=(KAGGLE_USERNAME, KAGGLE_KEY), stream=True) as resp: + if resp.status_code != 200: + print(f" [오류] 다운로드 실패: {file_name} ({resp.status_code})") + return False + + total = int(resp.headers.get("content-length", 0)) + downloaded = 0 + with open(dst_path, "wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + f.write(chunk) + downloaded += len(chunk) + + actual = os.path.getsize(dst_path) + if total > 0 and actual != total: + print(f" [경고] {file_name} 크기 불일치: 예상 {total}, 실제 {actual}") + return False + return True + + +def download_weights(model: dict) -> bool: + print(f"\n>> [{model['name']}] 가중치 다운로드 중...") + print(f" 소스: {model['slug']}") + print(f" 저장: {model['dst_dir']}") + print(f" 대상: {model['keep_files']}") + + os.makedirs(model['dst_dir'], exist_ok=True) + + # 파일 목록 조회 + all_files = list_output_files(model['slug']) + if not all_files: + print(f" [{model['name']}] 파일 목록 없음 (CLI 폴백)") + # CLI 폴백 + import subprocess, tempfile + with tempfile.TemporaryDirectory() as tmp_dir: + result = subprocess.run( + ["kaggle", "kernels", "output", model['slug'], "-p", tmp_dir, "-o"], + capture_output=True, text=True + ) + if result.returncode != 0: + print(f" [CLI 오류] {result.stderr.strip()}") + if result.stdout.strip(): + print(f" [CLI 출력] {result.stdout.strip()}") + file_map = {} + for root, dirs, files in os.walk(tmp_dir): + for f in files: + if f not in file_map: + file_map[f] = os.path.join(root, f) + copied = [] + for fname in model['keep_files']: + if fname in file_map: + src = file_map[fname] + if os.path.getsize(src) < 100: + continue + dst = os.path.join(model['dst_dir'], fname) + shutil.copy2(src, dst) + size = os.path.getsize(dst) / (1024 * 1024) + copied.append(f"{fname} ({size:.1f} MB)") + if copied: + print(f" [{model['name']}] 다운로드 완료!") + for f in copied: print(f" - {f}") + return True + return False + + # 파일명 → URL 매핑 + file_map = {} + for f in all_files: + name = f.get("name", "").split("/")[-1] # 경로에서 파일명만 추출 + if name and name not in file_map: + file_map[name] = f.get("name", name) + + copied = [] + missing = [] + for fname in model['keep_files']: + if fname in file_map: + dst = os.path.join(model['dst_dir'], fname) + print(f" 다운로드 중: {fname}...") + success = download_file(model['slug'], file_map[fname], dst) + if success: + size = os.path.getsize(dst) / (1024 * 1024) + copied.append(f"{fname} ({size:.1f} MB)") + else: + missing.append(fname) + else: + missing.append(fname) + + if missing: + print(f" [{model['name']}] 경고: 파일 없음 -> {missing}") + if not copied: + print(f" [{model['name']}] 실패: 가중치 파일 없음") + return False + + print(f" [{model['name']}] 다운로드 완료!") + for f in copied: + print(f" - {f}") + return True + + +print("=" * 50) +print(">> download_weights.py 시작") +print("=" * 50) + +if not KAGGLE_KEY: + print(">> Kaggle API 키 없음. kaggle.json 확인 필요") + sys.exit(1) + +failed = [] +for model in MODELS: + success = download_weights(model) + if not success: + failed.append(model['name']) + +print("\n" + "=" * 50) +if failed: + print(f">> 실패한 모델: {failed}") + sys.exit(1) +else: + print(">> 전체 가중치 다운로드 완료!") +print("=" * 50) diff --git a/AI/scripts/extract_to_parquet.py b/AI/scripts/extract_to_parquet.py new file mode 100644 index 00000000..45c2b268 --- /dev/null +++ b/AI/scripts/extract_to_parquet.py @@ -0,0 +1,249 @@ +# AI/scripts/extract_to_parquet.py +""" +[목적] + 운영 서버 DB에서 학습 데이터를 추출하여 parquet 파일로 저장 + +[두 가지 실행 환경] + 1. 로컬 (Termius 터널 켜둔 상태) + → SSH_PRIVATE_KEY 환경변수 없으면 자동으로 로컬 터널 모드 + → localhost:15432 직접 접속 + + 2. GitHub Actions (자동화) + → SSH_PRIVATE_KEY 환경변수 있으면 paramiko로 터널 자동 오픈 + → Termius 불필요 + +[실행 방법] + # 로컬 (Termius 켜둔 상태) + python AI/scripts/extract_to_parquet.py + + # GitHub Actions (환경변수 자동 주입) + python AI/scripts/extract_to_parquet.py +""" +import os +import io +import time +import sys +import pandas as pd +import psycopg2 +from dotenv import load_dotenv + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../..")) + +# .env 로드 +load_dotenv(os.path.join(project_root, ".env")) + +OUTPUT_DIR = os.path.join(project_root, "AI/data/kaggle_data") +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# ───────────────────────────────────────────────────────────────────────────── +# 접속 설정 +# 로컬: 환경변수 없으면 localhost:15432 (Termius 터널) +# Actions: 환경변수로 SSH 정보 주입 → paramiko 터널 자동 오픈 +# ───────────────────────────────────────────────────────────────────────────── +SSH_HOST = os.environ.get("SSH_HOST") +SSH_USER = os.environ.get("SSH_USER") +SSH_KEY_STR = os.environ.get("SSH_PRIVATE_KEY") +SSH_PORT = int(os.environ.get("SSH_PORT", 22)) + +DB_HOST = os.environ.get("DB_HOST", "127.0.0.1") +DB_PORT = int(os.environ.get("DB_PORT", 5432)) +DB_USER = os.environ.get("DB_USER", "postgres") +DB_PASSWORD = os.environ.get("DB_PASSWORD", "") +DB_NAME = os.environ.get("DB_NAME", "sisc_db") + +# GitHub Actions 여부: SSH 환경변수 3개 모두 있으면 Actions 모드 +IS_ACTIONS = all([SSH_HOST, SSH_USER, SSH_KEY_STR]) + + +# ───────────────────────────────────────────────────────────────────────────── +# SSH 터널 오픈 +# 로컬: Termius가 이미 15432 열어두니까 그냥 패스 +# Actions: paramiko + sshtunnel 로 코드에서 직접 터널 생성 +# ───────────────────────────────────────────────────────────────────────────── +tunnel = None +LOCAL_PORT = None + + +def open_tunnel() -> int: + global tunnel, LOCAL_PORT + + if not IS_ACTIONS: + # 로컬 모드: Termius 터널이 이미 열려있다고 가정 + print(">> [로컬 모드] Termius 터널 사용 (127.0.0.1:15432)") + LOCAL_PORT = 15432 + return LOCAL_PORT + + print(">> [Actions 모드] paramiko SSH 터널 오픈 중...") + try: + from sshtunnel import SSHTunnelForwarder + import paramiko + + for key_class in [paramiko.Ed25519Key, paramiko.RSAKey, paramiko.ECDSAKey]: + try: + private_key = key_class.from_private_key(io.StringIO(SSH_KEY_STR)) + break + except Exception: + continue + else: + raise ValueError("SSH 키 타입을 인식할 수 없습니다.") + + tunnel = SSHTunnelForwarder( + (SSH_HOST, SSH_PORT), + ssh_username = SSH_USER, + ssh_pkey = private_key, + remote_bind_address = (DB_HOST, DB_PORT), + local_bind_address = ('127.0.0.1', 0), # 0 = 빈 포트 자동 배정 + ) + tunnel.start() + LOCAL_PORT = tunnel.local_bind_port + print(f">> SSH 터널 오픈 완료! (127.0.0.1:{LOCAL_PORT} → {DB_HOST}:{DB_PORT})") + return LOCAL_PORT + + except Exception as e: + print(f"❌ SSH 터널 오픈 실패: {e}") + sys.exit(1) + + +def close_tunnel(): + global tunnel + if tunnel: + tunnel.stop() + print(">> SSH 터널 닫힘") + + +# ───────────────────────────────────────────────────────────────────────────── +# DB 연결 (매번 새 연결 생성 - Neon 연결 끊김 방지) +# ───────────────────────────────────────────────────────────────────────────── +def get_conn(): + return psycopg2.connect( + host = "127.0.0.1", + port = LOCAL_PORT, + user = DB_USER, + password = DB_PASSWORD, + dbname = DB_NAME, + connect_timeout = 30, + ) + + +def read_sql_safe(query: str, desc: str = "") -> pd.DataFrame: + """연결 끊김 시 최대 3회 재시도""" + for attempt in range(1, 4): + conn = None + try: + conn = get_conn() + df = pd.read_sql(query, conn) + return df + except Exception as e: + print(f" [시도 {attempt}/3] 실패: {e}") + time.sleep(3) + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass + raise RuntimeError(f"'{desc}' 쿼리 3회 모두 실패") + + +# ───────────────────────────────────────────────────────────────────────────── +# 메인 +# ───────────────────────────────────────────────────────────────────────────── +def main(): + print("=" * 50) + print(">> extract_to_parquet.py 시작") + print(f">> 실행 환경: {'GitHub Actions' if IS_ACTIONS else '로컬 (Termius)'}") + print("=" * 50) + + open_tunnel() + + try: + # 1. price_data (연도별 청크) + print("\n>> [1/6] price_data 추출 중 (연도별 분할)...") + chunks = [] + for year in range(2015, 2024): + print(f" {year}년 읽는 중...") + query = f""" + SELECT ticker, date, open, high, low, close, volume, per, pbr + FROM price_data + WHERE date BETWEEN '{year}-01-01' AND '{year}-12-31' + ORDER BY ticker, date + """ + df_chunk = read_sql_safe(query, f"price_data {year}") + print(f" {year}년 완료: {len(df_chunk):,}행") + chunks.append(df_chunk) + time.sleep(1) + + df_price = pd.concat(chunks, ignore_index=True) + df_price.to_parquet(os.path.join(OUTPUT_DIR, "price_data.parquet"), index=False) + print(f" >> 전체 완료: {len(df_price):,}행") + + # 2. stock_info + print("\n>> [2/6] stock_info 추출 중...") + df = read_sql_safe( + "SELECT ticker, sector, industry FROM stock_info", + "stock_info" + ) + df.to_parquet(os.path.join(OUTPUT_DIR, "stock_info.parquet"), index=False) + print(f" 완료: {len(df):,}행") + + # 3. macroeconomic_indicators + print("\n>> [3/6] macroeconomic_indicators 추출 중...") + df = read_sql_safe(""" + SELECT date, cpi, gdp, interest_rate, unemployment_rate, + us10y, us2y, yield_spread, vix_close, dxy_close, + wti_price, gold_price, credit_spread_hy, + core_cpi, pce, core_pce + FROM macroeconomic_indicators + ORDER BY date + """, "macroeconomic_indicators") + df.to_parquet(os.path.join(OUTPUT_DIR, "macroeconomic_indicators.parquet"), index=False) + print(f" 완료: {len(df):,}행") + + # 4. company_fundamentals + print("\n>> [4/6] company_fundamentals 추출 중...") + df = read_sql_safe(""" + SELECT ticker, date, revenue, net_income, total_assets, + total_liabilities, equity, eps, roe, debt_ratio, + operating_cash_flow + FROM company_fundamentals + ORDER BY ticker, date + """, "company_fundamentals") + df.to_parquet(os.path.join(OUTPUT_DIR, "company_fundamentals.parquet"), index=False) + print(f" 완료: {len(df):,}행") + + # 5. market_breadth + print("\n>> [5/6] market_breadth 추출 중...") + df = read_sql_safe(""" + SELECT date, nh_nl_index, ma200_pct + FROM market_breadth + ORDER BY date + """, "market_breadth") + df.to_parquet(os.path.join(OUTPUT_DIR, "market_breadth.parquet"), index=False) + print(f" 완료: {len(df):,}행") + + # 6. sector_returns + print("\n>> [6/6] sector_returns 추출 중...") + df = read_sql_safe(""" + SELECT date, sector, etf_ticker, return, close + FROM sector_returns + ORDER BY date, sector + """, "sector_returns") + df.to_parquet(os.path.join(OUTPUT_DIR, "sector_returns.parquet"), index=False) + print(f" 완료: {len(df):,}행") + + print("\n" + "=" * 50) + print(">> 전체 추출 완료!") + print(f">> 저장 위치: {OUTPUT_DIR}") + print("=" * 50) + + finally: + close_tunnel() + + +if __name__ == "__main__": + main() + diff --git a/AI/scripts/trigger_training.py b/AI/scripts/trigger_training.py new file mode 100644 index 00000000..87b26683 --- /dev/null +++ b/AI/scripts/trigger_training.py @@ -0,0 +1,159 @@ +# AI/scripts/trigger_training.py +""" +[목적] + Kaggle 노트북 4개를 순서대로 학습 트리거 + +[실행 방법] + python AI/scripts/trigger_training.py + python AI/scripts/trigger_training.py --start-from iTransformer + +[전제 조건] + - Kaggle 노트북이 미리 만들어져 있어야 함 + - kaggle_notebooks/ 폴더 및 push 불필요 + - Kaggle 노트북은 Kaggle 웹에서 직접 관리 + +[GitHub Actions에서] + upload_to_kaggle.py 실행 후 자동 실행 +""" +import os +import subprocess +import sys +import time +import argparse + +KAGGLE_USERNAME = os.environ.get("KAGGLE_USERNAME", "jihyeongkimm") + +NOTEBOOKS = [ + {"name": "PatchTST", "slug": f"{KAGGLE_USERNAME}/patchtst-training"}, + {"name": "Transformer", "slug": f"{KAGGLE_USERNAME}/transformer-training"}, + {"name": "iTransformer", "slug": f"{KAGGLE_USERNAME}/itransformer-training"}, + {"name": "TCN", "slug": f"{KAGGLE_USERNAME}/tcn-training"}, +] + + +def trigger_notebook(notebook: dict) -> bool: + """Kaggle API로 노트북 실행 트리거 (push 없이)""" + print(f"\n>> [{notebook['name']}] 학습 트리거 중...") + print(f" 슬러그: {notebook['slug']}") + + result = subprocess.run( + ["kaggle", "kernels", "pull", notebook['slug'], "-p", "/tmp/kaggle_trigger"], + capture_output=True, text=True + ) + + # pull 실패해도 무관 - run trigger만 필요 + result = subprocess.run( + ["kaggle", "kernels", "push", "-p", "/tmp/kaggle_trigger"], + capture_output=True, text=True + ) + + # push 없이 API로 직접 트리거 + import json + try: + import requests + kaggle_json = os.path.expanduser("~/.kaggle/kaggle.json") + with open(kaggle_json) as f: + creds = json.load(f) + username = creds.get("username", KAGGLE_USERNAME) + key = creds.get("key", "") + + owner, kernel = notebook['slug'].split("/") + url = f"https://www.kaggle.com/api/v1/kernels/{owner}/{kernel}/run" + resp = requests.post(url, auth=(username, key)) + + if resp.status_code in [200, 201, 202]: + print(f" [{notebook['name']}] 트리거 성공!") + print(f" 확인: https://www.kaggle.com/code/{notebook['slug']}") + return True + else: + print(f" [{notebook['name']}] API 트리거 실패: {resp.status_code}") + print(f" {resp.text[:200]}") + return False + except Exception as e: + print(f" [{notebook['name']}] 트리거 오류: {e}") + return False + + +def wait_for_notebook(notebook: dict, timeout_hours: int = 12) -> bool: + """노트북 완료까지 대기 (polling)""" + print(f"\n>> [{notebook['name']}] 완료 대기 중...") + slug = notebook['slug'] + max_checks = timeout_hours * 12 + check_count = 0 + + while check_count < max_checks: + result = subprocess.run( + ["kaggle", "kernels", "status", slug], + capture_output=True, text=True + ) + output = result.stdout.lower() + + if "complete" in output: + print(f" [{notebook['name']}] 학습 완료!") + return True + elif "error" in output or "failed" in output: + print(f" [{notebook['name']}] 학습 실패!") + print(result.stdout) + return False + elif "running" in output or "queued" in output: + check_count += 1 + elapsed = check_count * 5 + print(f" [{notebook['name']}] 학습 중... ({elapsed}분 경과)") + else: + check_count += 1 + print(f" [{notebook['name']}] 상태: {result.stdout.strip()}") + + time.sleep(300) + + print(f" [{notebook['name']}] 타임아웃 ({timeout_hours}시간 초과)") + return False + + +# ───────────────────────────────────────────────────────────────────────────── +# 메인 +# ───────────────────────────────────────────────────────────────────────────── +parser = argparse.ArgumentParser() +parser.add_argument("--start-from", type=str, default=None, + help="특정 모델부터 시작 (PatchTST/Transformer/iTransformer/TCN)") +args, _ = parser.parse_known_args() + +start_idx = 0 +if args.start_from: + names = [n['name'] for n in NOTEBOOKS] + if args.start_from in names: + start_idx = names.index(args.start_from) + print(f">> [{args.start_from}]부터 시작합니다.") + else: + print(f">> [경고] 모델명 '{args.start_from}' 없음. 처음부터 시작합니다.") + +notebooks_to_run = NOTEBOOKS[start_idx:] + +print("=" * 50) +print(">> trigger_training.py 시작") +print(f">> 학습 대상: {[n['name'] for n in notebooks_to_run]}") +print("=" * 50) + +failed = [] + +for notebook in notebooks_to_run: + success = trigger_notebook(notebook) + if not success: + failed.append(notebook['name']) + print(f"\n>> [{notebook['name']}] 실패. 다음 모델로 넘어갑니다.") + continue + + success = wait_for_notebook(notebook) + if not success: + failed.append(notebook['name']) + print(f"\n>> [{notebook['name']}] 학습 실패. 다음 모델로 넘어갑니다.") + + time.sleep(60) + +print("\n" + "=" * 50) +if failed: + print(f">> 실패한 모델: {failed}") + print(f">> 재시작: python AI/scripts/trigger_training.py --start-from {failed[0]}") + sys.exit(1) +else: + print(">> 전체 학습 완료!") +print("=" * 50) diff --git a/AI/scripts/upload_to_kaggle.py b/AI/scripts/upload_to_kaggle.py new file mode 100644 index 00000000..c84511b1 --- /dev/null +++ b/AI/scripts/upload_to_kaggle.py @@ -0,0 +1,79 @@ +# AI/scripts/upload_to_kaggle.py +""" +[목적] + Kaggle 데이터셋 버전 업데이트 + (extract_to_parquet.py 실행 후 kaggle_data/ 에 parquet 파일이 있어야 함) + +[실행 방법] + python AI/scripts/upload_to_kaggle.py + +[GitHub Actions에서] + extract_to_parquet.py 완료 후 자동 실행 + env로 KAGGLE_USERNAME, KAGGLE_KEY 주입 +""" +import os +import subprocess +import sys + +# ───────────────────────────────────────────────────────────────────────────── +# 경로 설정 +# ───────────────────────────────────────────────────────────────────────────── +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../..")) + +OUTPUT_DIR = os.path.join(project_root, "AI/data/kaggle_data") +KAGGLE_USERNAME = os.environ.get("KAGGLE_USERNAME", "jihyeongkimm") +DATASET_SLUG = "sisc-ai-trading-dataset" + +# ───────────────────────────────────────────────────────────────────────────── +# 업로드 전 parquet 파일 존재 여부 확인 +# ───────────────────────────────────────────────────────────────────────────── +print("=" * 50) +print(">> upload_to_kaggle.py 시작") +print("=" * 50) + +if not os.path.exists(OUTPUT_DIR): + print(f"❌ kaggle_data 폴더 없음: {OUTPUT_DIR}") + print(" extract_to_parquet.py를 먼저 실행하세요.") + sys.exit(1) + +parquet_files = [f for f in os.listdir(OUTPUT_DIR) if f.endswith(".parquet")] +if not parquet_files: + print(f"❌ parquet 파일 없음: {OUTPUT_DIR}") + print(" extract_to_parquet.py를 먼저 실행하세요.") + sys.exit(1) + +print(f">> 업로드할 parquet 파일: {len(parquet_files)}개") +for f in parquet_files: + fpath = os.path.join(OUTPUT_DIR, f) + size = os.path.getsize(fpath) / (1024 * 1024) + print(f" - {f} ({size:.1f} MB)") + +# ───────────────────────────────────────────────────────────────────────────── +# Kaggle 데이터셋 버전 업데이트 +# ───────────────────────────────────────────────────────────────────────────── +print(f"\n>> Kaggle 데이터셋 업로드 중...") +print(f" 대상: {KAGGLE_USERNAME}/{DATASET_SLUG}") + +result = subprocess.run( + [ + "kaggle", "datasets", "version", + "-p", ".", + "-m", "Auto update: latest data", + ], + capture_output=True, + text=True, + cwd=OUTPUT_DIR # kaggle_data 폴더 안에서 실행 (Windows 경로 슬래시 버그 방지) +) + +if result.returncode == 0: + print(" 업로드 완료! ✅") + print(result.stdout) +else: + print(" [오류] 업로드 실패! ❌") + print(result.stderr) + sys.exit(1) + +print("=" * 50) +print(">> upload_to_kaggle.py 완료") +print("=" * 50)