diff --git a/AI/configs/config.json b/AI/configs/config.json index 215b2b74..36e05aff 100644 --- a/AI/configs/config.json +++ b/AI/configs/config.json @@ -6,4 +6,14 @@ "dbname": "neondb", "port": 5432 } + + + , + "report_DB": { + "host": "ep-jolly-waterfall-ads25fir-pooler.c-2.us-east-1.aws.neon.tech", + "user": "neondb_owner", + "password": "npg_lo0rC9aOyFkw", + "dbname": "neondb", + "port": 5432 + } } diff --git a/AI/finder/__init__.py b/AI/finder/__init__.py new file mode 100644 index 00000000..817d9492 --- /dev/null +++ b/AI/finder/__init__.py @@ -0,0 +1,3 @@ +# AI/finder/__init__.py +from .main import run_finder +__all__ = ["run_finder"] diff --git a/AI/finder/jobs/.gitkeep b/AI/finder/jobs/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/finder/modules/.gitkeep b/AI/finder/modules/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/libs/core/pipeline.py b/AI/libs/core/pipeline.py index fdf166b4..053da095 100644 --- a/AI/libs/core/pipeline.py +++ b/AI/libs/core/pipeline.py @@ -12,9 +12,10 @@ # --- 모듈 import --- from finder.main import run_finder -from transform.modules.main import run_transform -from libs.utils.data.fetch_ohlcv import fetch_ohlcv +from AI.transformer.main import run_transformer +from AI.libs.utils.fetch_ohlcv import fetch_ohlcv from xai.run_xai import run_xai +from AI.libs.utils.get_db_conn import get_db_conn # --------------------------------- def run_weekly_finder() -> List[str]: @@ -27,13 +28,21 @@ def run_weekly_finder() -> List[str]: print(f"--- [PIPELINE-STEP 1] Finder 모듈 실행 완료 ---") return top_tickers -def run_signal_transform(tickers: List[str], config: Dict) -> pd.DataFrame: +def run_signal_transformer(tickers: List[str], config: Dict) -> pd.DataFrame: """ - 종목 리스트를 받아 Transform 모듈을 실행하고, 신호(결정 로그)를 반환합니다. + 종목 리스트를 받아 Transformer 모듈을 실행하고, 신호(결정 로그)를 반환합니다. """ - print("--- [PIPELINE-STEP 2] Transform 모듈 실행 시작 ---") + try: + with open(os.path.join(project_root, 'configs', 'config.json'), 'r') as f: + config = json.load(f) + except FileNotFoundError: + print("Config file not found") + except json.JSONDecodeError: + print("Invalid JSON format in config file") + db_config = (config or {}).get("db", {}) # ★ db 섹션만 추출 + print("--- [PIPELINE-STEP 2] Transformer 모듈 실행 시작 ---") - # --- 실제 Transform 모듈 호출 --- + # --- 실제 Transformer 모듈 호출 --- end_date = datetime.now() start_date = end_date - timedelta(days=600) all_ohlcv_df = [] @@ -42,7 +51,7 @@ def run_signal_transform(tickers: List[str], config: Dict) -> pd.DataFrame: ticker=ticker, start=start_date.strftime('%Y-%m-%d'), end=end_date.strftime('%Y-%m-%d'), - config=config + config=db_config ) ohlcv_df['ticker'] = ticker all_ohlcv_df.append(ohlcv_df) @@ -51,14 +60,14 @@ def run_signal_transform(tickers: List[str], config: Dict) -> pd.DataFrame: return pd.DataFrame() raw_data = pd.concat(all_ohlcv_df, ignore_index=True) finder_df = pd.DataFrame(tickers, columns=['ticker']) - transform_result = run_transform( + transformer_result = run_transformer( finder_df=finder_df, seq_len=60, pred_h=1, raw_data=raw_data, config=config ) - logs_df = transform_result.get("logs", pd.DataFrame()) + logs_df = transformer_result.get("logs", pd.DataFrame()) # --- 임시 결정 로그 데이터 (주석 처리) --- # data = { @@ -76,7 +85,7 @@ def run_signal_transform(tickers: List[str], config: Dict) -> pd.DataFrame: # } # logs_df = pd.DataFrame(data) - print(f"--- [PIPELINE-STEP 2] Transform 모듈 실행 완료 ---") + print(f"--- [PIPELINE-STEP 2] Transformer 모듈 실행 완료 ---") return logs_df def run_xai_report(decision_log: pd.DataFrame) -> List[str]: @@ -113,12 +122,30 @@ def run_xai_report(decision_log: pd.DataFrame) -> List[str]: print(f"--- [PIPELINE-STEP 3] XAI 리포트 생성 완료 ---") return reports +def save_reports_to_db(reports: List[str], config: Dict): + """ + 생성된 XAI 리포트를 데이터베이스에 저장합니다. + """ + db_config = config.get("report_DB", {}) + conn = get_db_conn(db_config) + cursor = conn.cursor() + insert_query = """ + INSERT INTO xai_reports (report_text, created_at) + VALUES (%s, %s); + """ + for report in reports: + cursor.execute(insert_query, (report, datetime.now())) + conn.commit() + cursor.close() + conn.close() + print(f"--- {len(reports)}개의 XAI 리포트가 데이터베이스에 저장되었습니다. ---") + # --- 전체 파이프라인 실행 --- def run_pipeline(): """ - 전체 파이프라인(Finder -> Transform -> XAI)을 실행합니다. + 전체 파이프라인(Finder -> Transformer -> XAI)을 실행합니다. """ - config = None + config : Dict = {} try: with open(os.path.join(project_root, 'configs', 'config.json'), 'r') as f: config = json.load(f) @@ -128,16 +155,20 @@ def run_pipeline(): if not top_tickers: print("Finder에서 종목을 찾지 못해 파이프라인을 중단합니다.") return None - decision_log = run_signal_transform(top_tickers, config) + decision_log = run_signal_transformer(top_tickers, config) if decision_log.empty: - print("Transform에서 신호를 생성하지 못해 파이프라인을 중단합니다.") + print("Transformer에서 신호를 생성하지 못해 파이프라인을 중단합니다.") return None xai_reports = run_xai_report(decision_log) + + save_reports_to_db(xai_reports, config) + return xai_reports + # --- 테스트를 위한 실행 코드 --- if __name__ == "__main__": - print(">>> 파이프라인 (Finder -> Transform -> XAI) 테스트를 시작합니다.") + print(">>> 파이프라인 (Finder -> Transformer -> XAI) 테스트를 시작합니다.") final_reports = run_pipeline() print("\n>>> 최종 반환 결과 (XAI Reports):") if final_reports: diff --git a/AI/libs/utils/data/fetch_ohlcv.py b/AI/libs/utils/fetch_ohlcv.py similarity index 74% rename from AI/libs/utils/data/fetch_ohlcv.py rename to AI/libs/utils/fetch_ohlcv.py index 5eda2b7e..5b4f1742 100644 --- a/AI/libs/utils/data/fetch_ohlcv.py +++ b/AI/libs/utils/fetch_ohlcv.py @@ -2,16 +2,7 @@ import pandas as pd # DB 접속 커넥션 생성 -def get_db_conn(config: dict): - """config에서 DB 접속 정보 가져와 psycopg2 Connection 생성""" - conn = psycopg2.connect( - host=config["db"]["host"], - user=config["db"]["user"], - password=config["db"]["password"], - dbname=config["db"]["dbname"], - port=config["db"].get("port", 5432), - ) - return conn +from AI.libs.utils.get_db_conn import get_db_conn # OHLCV 데이터 불러오기 def fetch_ohlcv( diff --git a/AI/libs/utils/get_db_conn.py b/AI/libs/utils/get_db_conn.py new file mode 100644 index 00000000..ad930d63 --- /dev/null +++ b/AI/libs/utils/get_db_conn.py @@ -0,0 +1,12 @@ +import psycopg2 + +def get_db_conn(config: dict): + """config에서 DB 접속 정보 가져와 psycopg2 Connection 생성""" + conn = psycopg2.connect( + host=config["host"], + user=config["user"], + password=config["password"], + dbname=config["dbname"], + port=config.get("port", 5432), + ) + return conn \ No newline at end of file diff --git a/AI/transform/.gitkeep b/AI/transform/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/transform/artifacts/.gitkeep b/AI/transform/artifacts/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/transform/datasets/.gitkeep b/AI/transform/datasets/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/transform/jobs/.gitkeep b/AI/transform/jobs/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/transform/models/.gitkeep b/AI/transform/models/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/transform/modules/.gitkeep b/AI/transform/modules/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/transformer/__init__.py b/AI/transformer/__init__.py new file mode 100644 index 00000000..a7eb2341 --- /dev/null +++ b/AI/transformer/__init__.py @@ -0,0 +1,3 @@ +# AI/finder/__init__.py +from .main import run_transformer +__all__ = ["run_transformer"] diff --git a/AI/finder/.gitkeep b/AI/transformer/datasets/.gitkeep similarity index 100% rename from AI/finder/.gitkeep rename to AI/transformer/datasets/.gitkeep diff --git a/AI/transform/modules/main.py b/AI/transformer/main.py similarity index 83% rename from AI/transform/modules/main.py rename to AI/transformer/main.py index eed8d0d1..ea7dde9a 100644 --- a/AI/transform/modules/main.py +++ b/AI/transformer/main.py @@ -1,273 +1,289 @@ -# transform/modules/transform.py -from __future__ import annotations -from typing import Dict, List, Optional, Tuple - -import numpy as np -import pandas as pd -from sklearn.preprocessing import MinMaxScaler -from tensorflow.keras import layers, Model -import tensorflow as tf - -# from AI.libs.utils.io import _log -_log = print # TODO: 추후 io._log 구현 시 복구 -from .models import build_transformer_classifier # <-- 모델 분리됨 - -# ===== 공개 상수 ===== -FEATURES: List[str] = [ - "RSI", - "MACD", - "Bollinger_Bands_upper", - "Bollinger_Bands_lower", - "ATR", - "OBV", - "Stochastic", # %K - "MFI", - "MA_5", - "MA_20", - "MA_50", - "MA_200", - "CLOSE_RAW", # 마지막에 추가 (스케일 제외, 로그용) -] - -CLASS_NAMES = ["BUY", "HOLD", "SELL"] - -# ====== 기술지표 유틸 ====== -def _ema(s: pd.Series, span: int) -> pd.Series: - return s.ewm(span=span, adjust=False).mean() - -def _rsi_wilder(close: pd.Series, period: int = 14) -> pd.Series: - delta = close.diff() - gain = delta.clip(lower=0.0) - loss = -delta.clip(upper=0.0) - avg_gain = gain.ewm(alpha=1/period, adjust=False).mean() - avg_loss = loss.ewm(alpha=1/period, adjust=False).mean() - rs = avg_gain / avg_loss.replace(0, np.nan) - return 100.0 - (100.0 / (1.0 + rs)) - -def _macd_line(close: pd.Series, fast: int = 12, slow: int = 26) -> pd.Series: - return _ema(close, fast) - _ema(close, slow) - -def _true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: - prev_close = close.shift(1) - tr1 = high - low - tr2 = (high - prev_close).abs() - tr3 = (low - prev_close).abs() - return pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) - -def _atr_wilder(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: - tr = _true_range(high, low, close) - return tr.ewm(alpha=1/period, adjust=False).mean() - -def _obv(close: pd.Series, volume: pd.Series) -> pd.Series: - sign = np.sign(close.diff().fillna(0.0)) - sign[sign == 0] = 0.0 - return (sign * volume).fillna(0.0).cumsum() - -def _stochastic_k(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: - ll = low.rolling(period).min() - hh = high.rolling(period).max() - denom = (hh - ll).replace(0, np.nan) - return (close - ll) / denom * 100.0 - -def _mfi(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, period: int = 14) -> pd.Series: - tp = (high + low + close) / 3.0 - rmf = tp * volume - delta_tp = tp.diff() - pos_mf = rmf.where(delta_tp > 0, 0.0) - neg_mf = rmf.where(delta_tp < 0, 0.0).abs() - pos_sum = pos_mf.rolling(period).sum() - neg_sum = neg_mf.rolling(period).sum().replace(0, np.nan) - mr = pos_sum / neg_sum - return 100.0 - (100.0 / (1.0 + mr)) - -# ====== 피처 빌더 ====== -def build_features(df: pd.DataFrame) -> pd.DataFrame: - cols = {c.lower(): c for c in df.columns} - need = ["open", "high", "low", "close", "volume"] - mapping = {} - for k in need: - if k in cols: - mapping[cols[k]] = k - if mapping: - df = df.rename(columns=mapping) - - O = df["open"].astype(float).squeeze() - H = df["high"].astype(float).squeeze() - L = df["low"].astype(float).squeeze() - C = df["close"].astype(float).squeeze() - V = df["volume"].astype(float).squeeze() - - feats = pd.DataFrame(index=df.index) - feats["RSI"] = _rsi_wilder(C, period=14) - feats["MACD"] = _macd_line(C, fast=12, slow=26) - - ma20 = C.rolling(20).mean() - std20 = C.rolling(20).std(ddof=0) - feats["Bollinger_Bands_upper"] = ma20 + 2.0 * std20 - feats["Bollinger_Bands_lower"] = ma20 - 2.0 * std20 - feats["ATR"] = _atr_wilder(H, L, C, period=14) - feats["OBV"] = _obv(C, V) - feats["Stochastic"] = _stochastic_k(H, L, C, period=14) - feats["MFI"] = _mfi(H, L, C, V, period=14) - feats["MA_5"] = C.rolling(5).mean() - feats["MA_20"] = ma20 - feats["MA_50"] = C.rolling(50).mean() - feats["MA_200"] = C.rolling(200).mean() - feats["CLOSE_RAW"] = C - return feats.dropna() - -# ====== 모델 로드 ====== -def _load_or_build_model(seq_len: int, n_features: int, model_path: Optional[str]) -> Model: - model = build_transformer_classifier(seq_len, n_features) - if model_path: - try: - model.load_weights(model_path) - _log(f"[TRANSFORM] Transformer weights loaded: {model_path}") - except Exception as e: - _log(f"[TRANSFORM][WARN] 모델 가중치 로드 실패 → 랜덤 초기화: {e}") - else: - _log("[TRANSFORM][WARN] model_path 미지정 → 랜덤 초기화로 진행") - return model - -# ====== 시퀀스/스케일링 ====== -def _make_sequence(feats: pd.DataFrame, use_cols: List[str], seq_len: int) -> Optional[np.ndarray]: - if len(feats) < seq_len: - return None - X = feats[use_cols].iloc[-seq_len:].copy() - return X.values.astype("float32") - -def _scale_per_ticker(seq_arr: np.ndarray) -> Tuple[np.ndarray, MinMaxScaler]: - scaler = MinMaxScaler(feature_range=(0, 1), clip=True) - X_scaled = scaler.fit_transform(seq_arr) - return X_scaled.astype("float32"), scaler - -# ====== 메인 엔트리포인트 ====== -def run_transform( - *, - finder_df: pd.DataFrame, - seq_len: int, - pred_h: int, - raw_data: pd.DataFrame, - run_date: Optional[str] = None, - config: Optional[dict] = None, - interval: str = "1d", -) -> Dict[str, pd.DataFrame]: - tickers = finder_df["ticker"].astype(str).tolist() - if raw_data is None or raw_data.empty: - _log("[TRANSFORM] raw_data empty -> empty logs") - return {"logs": pd.DataFrame(columns=[ - "ticker","date","action","price","weight", - "feature1","feature2","feature3","prob1","prob2","prob3" - ])} - - df = raw_data.copy() - ts_col = "ts_local" if "ts_local" in df.columns else ("date" if "date" in df.columns else None) - if ts_col is None: - raise ValueError("raw_data에 'ts_local' 또는 'date' 컬럼이 필요합니다.") - df[ts_col] = pd.to_datetime(df[ts_col]) - df = df.rename(columns={c: c.lower() for c in df.columns}) - df = df[df["ticker"].astype(str).isin(tickers)] - if df.empty: - _log("[TRANSFORM] 대상 종목 데이터 없음") - return {"logs": pd.DataFrame(columns=[ - "ticker","date","action","price","weight", - "feature1","feature2","feature3","prob1","prob2","prob3" - ])} - - if run_date is None: - end_dt = pd.Timestamp.now(tz="Asia/Seoul").normalize() + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) - else: - end_dt = pd.to_datetime(run_date).tz_localize("Asia/Seoul", nonexistent="shift_forward").normalize() - end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) - - if df[ts_col].dt.tz is not None: - end_cut = end_dt.tz_convert(df[ts_col].dt.tz) - else: - end_cut = end_dt.tz_localize(None) - - df = df[df[ts_col] <= end_cut] - df = df.sort_values(["ticker", ts_col]).reset_index(drop=True) - - model_feats = [f for f in FEATURES if f != "CLOSE_RAW"] - n_features = len(model_feats) - - model_path = None - if config and "transform" in config and "model_path" in config["transform"]: - model_path = str(config["transform"]["model_path"]) - - model = _load_or_build_model(seq_len=seq_len, n_features=n_features, model_path=model_path) - - rows: List[dict] = [] - for t, g in df.groupby("ticker", sort=False): - try: - if g.empty: - continue - g = g.rename(columns={ts_col: "date"}).set_index("date") - ohlcv = g[["open", "high", "low", "close", "volume"]].copy() - - feats = build_features(ohlcv) - if feats.empty: - _log(f"[TRANSFORM] {t} features empty -> skip") - continue - - X_seq = _make_sequence(feats, model_feats, seq_len) - if X_seq is None: - _log(f"[TRANSFORM] {t} 부족한 길이(seq_len={seq_len}) -> skip") - continue - - X_scaled, _ = _scale_per_ticker(X_seq) - X_scaled = np.expand_dims(X_scaled, axis=0) - - try: - probs = model.predict(X_scaled, verbose=0)[0] - probs = np.clip(probs.astype(float), 1e-6, 1.0) - probs = probs / probs.sum() - buy_p, hold_p, sell_p = float(probs[0]), float(probs[1]), float(probs[2]) - action = CLASS_NAMES[int(np.argmax(probs))] - except Exception as e: - _log(f"[TRANSFORM][WARN] 모델 예측 실패({t}) → 룰기반 fallback: {e}") - recent = feats.iloc[-1] - rsi = float(recent["RSI"]) - macd = float(recent["MACD"]) - if rsi < 30 and macd > 0: - action = "BUY"; buy_p, hold_p, sell_p = 0.65, 0.30, 0.05 - elif rsi > 70 and macd < 0: - action = "SELL"; buy_p, hold_p, sell_p = 0.05, 0.30, 0.65 - else: - action = "HOLD"; buy_p, hold_p, sell_p = 0.33, 0.34, 0.33 - - p_max = max(buy_p, hold_p, sell_p) - confidence = float(np.clip((p_max - 1/3) * 1.5, 0.0, 1.0)) - ret = 0.0 - if len(feats) > 2: - c_now = float(feats["CLOSE_RAW"].iloc[-1]) - c_prev = float(feats["CLOSE_RAW"].iloc[-2]) - if c_prev: - ret = (c_now / c_prev) - 1.0 - weight = float(np.clip(0.05 + confidence * 0.20 + abs(ret) * 0.05, 0.05, 0.30)) - recent = feats.iloc[-1] - close_price = float(recent["CLOSE_RAW"]) - - rows.append({ - "ticker": str(t), - "date": feats.index[-1].strftime("%Y-%m-%d"), - "action": action, - "price": close_price, - "weight": weight, - "feature1": float(recent["RSI"]), - "feature2": float(recent["MACD"]), - "feature3": float(recent["ATR"]), - "prob1": float(buy_p), - "prob2": float(hold_p), - "prob3": float(sell_p), - }) - except Exception as e: - _log(f"[TRANSFORM][ERROR] {t}: {e}") - continue - - logs_df = pd.DataFrame(rows, columns=[ - "ticker","date","action","price","weight", - "feature1","feature2","feature3","prob1","prob2","prob3" - ]) - return {"logs": logs_df} - +# transformer/modules/transformer.py +from __future__ import annotations +from typing import Dict, List, Optional, Tuple + +import numpy as np +import pandas as pd +from sklearn.preprocessing import MinMaxScaler +import tensorflow as tf +from tensorflow.keras import layers, Model + +# from AI.libs.utils.io import _log +_log = print # TODO: 추후 io._log 구현 시 복구 +from .models import build_transformer_classifier # <-- 모델 분리됨 + +# ===== 공개 상수 ===== +FEATURES: List[str] = [ + "RSI", + "MACD", + "Bollinger_Bands_upper", + "Bollinger_Bands_lower", + "ATR", + "OBV", + "Stochastic", # %K + "MFI", + "MA_5", + "MA_20", + "MA_50", + "MA_200", + "CLOSE_RAW", # 마지막에 추가 (스케일 제외, 로그용) +] + +CLASS_NAMES = ["BUY", "HOLD", "SELL"] + +# ====== 기술지표 유틸 ====== +def _ema(s: pd.Series, span: int) -> pd.Series: + return s.ewm(span=span, adjust=False).mean() + +def _rsi_wilder(close: pd.Series, period: int = 14) -> pd.Series: + delta = close.diff() + gain = delta.clip(lower=0.0) + loss = -delta.clip(upper=0.0) + avg_gain = gain.ewm(alpha=1/period, adjust=False).mean() + avg_loss = loss.ewm(alpha=1/period, adjust=False).mean() + rs = avg_gain / avg_loss.replace(0, np.nan) + return 100.0 - (100.0 / (1.0 + rs)) + +def _macd_line(close: pd.Series, fast: int = 12, slow: int = 26) -> pd.Series: + return _ema(close, fast) - _ema(close, slow) + +def _true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: + prev_close = close.shift(1) + tr1 = high - low + tr2 = (high - prev_close).abs() + tr3 = (low - prev_close).abs() + return pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) + +def _atr_wilder(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: + tr = _true_range(high, low, close) + return tr.ewm(alpha=1/period, adjust=False).mean() + +def _obv(close: pd.Series, volume: pd.Series) -> pd.Series: + # 시리즈 보장 + 결측 처리 + close = pd.Series(close) + volume = pd.Series(volume).fillna(0) + + # 전일 대비 가격 변화 + diff = close.diff() + + # 방향: 상승=1, 하락=-1, 보합=0 (NaN 안전 비교) + direction = np.where(diff.gt(0), 1, np.where(diff.lt(0), -1, 0)) + direction = pd.Series(direction, index=close.index) + + # OBV = 방향 * 거래량 누적합 + obv_series = (direction * volume).cumsum().fillna(0) + obv_series.name = "OBV" + return obv_series + +def _stochastic_k(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: + ll = low.rolling(period).min() + hh = high.rolling(period).max() + denom = (hh - ll).replace(0, np.nan) + return (close - ll) / denom * 100.0 + +def _mfi(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, period: int = 14) -> pd.Series: + tp = ((high + low + close) / 3.0).astype(float) + rmf = (tp * volume.astype(float)).astype(float) + + delta_tp: pd.Series = tp.diff().astype(float) + + pos_mf = rmf.where(delta_tp.gt(0), 0.0) + neg_mf = rmf.where(delta_tp.lt(0), 0.0).abs() + + pos_sum = pos_mf.rolling(period).sum() + neg_sum = neg_mf.rolling(period).sum().replace(0, np.nan) + mr = pos_sum / neg_sum + return 100.0 - (100.0 / (1.0 + mr)) + + +# ====== 피처 빌더 ====== +def build_features(df: pd.DataFrame) -> pd.DataFrame: + cols = {c.lower(): c for c in df.columns} + need = ["open", "high", "low", "close", "volume"] + mapping = {} + for k in need: + if k in cols: + mapping[cols[k]] = k + if mapping: + df = df.rename(columns=mapping) + + O = df["open"].astype(float) + H = df["high"].astype(float) + L = df["low"].astype(float) + C = df["close"].astype(float) + V = df["volume"].astype(float) + + feats = pd.DataFrame(index=df.index) + feats["RSI"] = _rsi_wilder(C, period=14) + feats["MACD"] = _macd_line(C, fast=12, slow=26) + + ma20 = C.rolling(20).mean() + std20 = C.rolling(20).std(ddof=0) + feats["Bollinger_Bands_upper"] = ma20 + 2.0 * std20 + feats["Bollinger_Bands_lower"] = ma20 - 2.0 * std20 + feats["ATR"] = _atr_wilder(H, L, C, period=14) + feats["OBV"] = _obv(C, V) + feats["Stochastic"] = _stochastic_k(H, L, C, period=14) + feats["MFI"] = _mfi(H, L, C, V, period=14) + feats["MA_5"] = C.rolling(5).mean() + feats["MA_20"] = ma20 + feats["MA_50"] = C.rolling(50).mean() + feats["MA_200"] = C.rolling(200).mean() + feats["CLOSE_RAW"] = C + return feats.dropna() + +# ====== 모델 로드 ====== +def _load_or_build_model(seq_len: int, n_features: int, model_path: Optional[str]) -> Model: + model = build_transformer_classifier(seq_len, n_features) + if model_path: + try: + model.load_weights(model_path) + _log(f"[TRANSFORMER] Transformer weights loaded: {model_path}") + except Exception as e: + _log(f"[TRANSFORMER][WARN] 모델 가중치 로드 실패 → 랜덤 초기화: {e}") + else: + _log("[TRANSFORMER][WARN] model_path 미지정 → 랜덤 초기화로 진행") + return model + +# ====== 시퀀스/스케일링 ====== +def _make_sequence(feats: pd.DataFrame, use_cols: List[str], seq_len: int) -> Optional[np.ndarray]: + if len(feats) < seq_len: + return None + X = feats[use_cols].iloc[-seq_len:].copy() + return X.values.astype("float32") + +def _scale_per_ticker(seq_arr: np.ndarray) -> Tuple[np.ndarray, MinMaxScaler]: + scaler = MinMaxScaler(feature_range=(0, 1), clip=True) + X_scaled = scaler.fit_transform(seq_arr) + return X_scaled.astype("float32"), scaler + +# ====== 메인 엔트리포인트 ====== +def run_transformer( + *, + finder_df: pd.DataFrame, + seq_len: int, + pred_h: int, + raw_data: pd.DataFrame, + run_date: Optional[str] = None, + config: Optional[dict] = None, + interval: str = "1d", +) -> Dict[str, pd.DataFrame]: + tickers = finder_df["ticker"].astype(str).tolist() + if raw_data is None or raw_data.empty: + _log("[TRANSFORMER] raw_data empty -> empty logs") + return {"logs": pd.DataFrame(columns=[ + "ticker","date","action","price","weight", + "feature1","feature2","feature3","prob1","prob2","prob3" + ])} + + df = raw_data.copy() + ts_col = "ts_local" if "ts_local" in df.columns else ("date" if "date" in df.columns else None) + if ts_col is None: + raise ValueError("raw_data에 'ts_local' 또는 'date' 컬럼이 필요합니다.") + df[ts_col] = pd.to_datetime(df[ts_col]) + df = df.rename(columns={c: c.lower() for c in df.columns}) + df = df[df["ticker"].astype(str).isin(tickers)] + if df.empty: + _log("[TRANSFORMER] 대상 종목 데이터 없음") + return {"logs": pd.DataFrame(columns=[ + "ticker","date","action","price","weight", + "feature1","feature2","feature3","prob1","prob2","prob3" + ])} + + if run_date is None: + end_dt = pd.Timestamp.now(tz="Asia/Seoul").normalize() + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) + else: + end_dt = pd.to_datetime(run_date).tz_localize("Asia/Seoul", nonexistent="shift_forward").normalize() + end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) + + if df[ts_col].dt.tz is not None: + end_cut = end_dt.tz_convert(df[ts_col].dt.tz) + else: + end_cut = end_dt.tz_localize(None) + + df = df[df[ts_col] <= end_cut] + df = df.sort_values(["ticker", ts_col]).reset_index(drop=True) + + model_feats = [f for f in FEATURES if f != "CLOSE_RAW"] + n_features = len(model_feats) + + model_path = None + if config and "transformer" in config and "model_path" in config["transformer"]: + model_path = str(config["transformer"]["model_path"]) + + model = _load_or_build_model(seq_len=seq_len, n_features=n_features, model_path=model_path) + + rows: List[dict] = [] + for t, g in df.groupby("ticker", sort=False): + try: + if g.empty: + continue + g = g.rename(columns={ts_col: "date"}).set_index("date") + ohlcv = g[["open", "high", "low", "close", "volume"]].copy() + + feats = build_features(ohlcv) + if feats.empty: + _log(f"[TRANSFORMER] {t} features empty -> skip") + continue + + X_seq = _make_sequence(feats, model_feats, seq_len) + if X_seq is None: + _log(f"[TRANSFORMER] {t} 부족한 길이(seq_len={seq_len}) -> skip") + continue + + X_scaled, _ = _scale_per_ticker(X_seq) + X_scaled = np.expand_dims(X_scaled, axis=0) + + try: + probs = model.predict(X_scaled, verbose=0)[0] + probs = np.clip(probs.astype(float), 1e-6, 1.0) + probs = probs / probs.sum() + buy_p, hold_p, sell_p = float(probs[0]), float(probs[1]), float(probs[2]) + action = CLASS_NAMES[int(np.argmax(probs))] + except Exception as e: + _log(f"[TRANSFORMER][WARN] 모델 예측 실패({t}) → 룰기반 fallback: {e}") + recent = feats.iloc[-1] + rsi = float(recent["RSI"]) + macd = float(recent["MACD"]) + if rsi < 30 and macd > 0: + action = "BUY"; buy_p, hold_p, sell_p = 0.65, 0.30, 0.05 + elif rsi > 70 and macd < 0: + action = "SELL"; buy_p, hold_p, sell_p = 0.05, 0.30, 0.65 + else: + action = "HOLD"; buy_p, hold_p, sell_p = 0.33, 0.34, 0.33 + + p_max = max(buy_p, hold_p, sell_p) + confidence = float(np.clip((p_max - 1/3) * 1.5, 0.0, 1.0)) + ret = 0.0 + if len(feats) > 2: + c_now = float(feats["CLOSE_RAW"].iloc[-1]) + c_prev = float(feats["CLOSE_RAW"].iloc[-2]) + if c_prev: + ret = (c_now / c_prev) - 1.0 + weight = float(np.clip(0.05 + confidence * 0.20 + abs(ret) * 0.05, 0.05, 0.30)) + recent = feats.iloc[-1] + close_price = float(recent["CLOSE_RAW"]) + + rows.append({ + "ticker": str(t), + "date": feats.index[-1].strftime("%Y-%m-%d"), + "action": action, + "price": close_price, + "weight": weight, + "feature1": float(recent["RSI"]), + "feature2": float(recent["MACD"]), + "feature3": float(recent["ATR"]), + "prob1": float(buy_p), + "prob2": float(hold_p), + "prob3": float(sell_p), + }) + except Exception as e: + _log(f"[TRANSFORMER][ERROR] {t}: {e}") + continue + + logs_df = pd.DataFrame(rows, columns=[ + "ticker","date","action","price","weight", + "feature1","feature2","feature3","prob1","prob2","prob3" + ]) + return {"logs": logs_df} + diff --git a/AI/transformer/models/__init__.py b/AI/transformer/models/__init__.py new file mode 100644 index 00000000..75684179 --- /dev/null +++ b/AI/transformer/models/__init__.py @@ -0,0 +1,3 @@ +# AI/finder/__init__.py +from .models import build_transformer_classifier +__all__ = ["build_transformer_classifier"] diff --git a/AI/transform/modules/models.py b/AI/transformer/models/models.py similarity index 97% rename from AI/transform/modules/models.py rename to AI/transformer/models/models.py index ebc3e60b..b844cb51 100644 --- a/AI/transform/modules/models.py +++ b/AI/transformer/models/models.py @@ -1,44 +1,44 @@ -import numpy as np -import tensorflow as tf -from tensorflow.keras import layers, Model - -# 위치 인코딩 -def positional_encoding(maxlen: int, d_model: int) -> tf.Tensor: - angles = np.arange(maxlen)[:, None] / np.power( - 10000, (2 * (np.arange(d_model)[None, :] // 2)) / d_model - ) - pos_encoding = np.zeros((maxlen, d_model)) - pos_encoding[:, 0::2] = np.sin(angles[:, 0::2]) - pos_encoding[:, 1::2] = np.cos(angles[:, 1::2]) - return tf.constant(pos_encoding, dtype=tf.float32) - -# Transformer 분류기 -def build_transformer_classifier(seq_len: int, n_features: int, - d_model: int = 64, num_heads: int = 4, - ff_dim: int = 128, num_layers: int = 2, - dropout: float = 0.1) -> Model: - inp = layers.Input(shape=(seq_len, n_features), name="inputs") - - # 입력 projection - x = layers.Dense(d_model)(inp) - x = x + positional_encoding(seq_len, d_model) - - for _ in range(num_layers): - # MHA 블록 - attn_out = layers.MultiHeadAttention( - num_heads=num_heads, key_dim=d_model // num_heads, dropout=dropout - )(x, x, training=False) - x = layers.LayerNormalization(epsilon=1e-5)(x + attn_out) - - # FFN 블록 - ffn = layers.Dense(ff_dim, activation="gelu")(x) - ffn = layers.Dropout(dropout)(ffn, training=False) - ffn = layers.Dense(d_model)(ffn) - x = layers.LayerNormalization(epsilon=1e-5)(x + ffn) - - # 풀링 + 출력 - x = layers.GlobalAveragePooling1D()(x) - x = layers.Dropout(dropout)(x, training=False) - out = layers.Dense(3, activation="softmax", name="probs")(x) - - return Model(inp, out, name="transformer_classifier") +import numpy as np +import tensorflow as tf +from tensorflow.keras import layers, Model + +# 위치 인코딩 +def positional_encoding(maxlen: int, d_model: int) -> tf.Tensor: + angles = np.arange(maxlen)[:, None] / np.power( + 10000, (2 * (np.arange(d_model)[None, :] // 2)) / d_model + ) + pos_encoding = np.zeros((maxlen, d_model)) + pos_encoding[:, 0::2] = np.sin(angles[:, 0::2]) + pos_encoding[:, 1::2] = np.cos(angles[:, 1::2]) + return tf.constant(pos_encoding, dtype=tf.float32) + +# Transformer 분류기 +def build_transformer_classifier(seq_len: int, n_features: int, + d_model: int = 64, num_heads: int = 4, + ff_dim: int = 128, num_layers: int = 2, + dropout: float = 0.1) -> Model: + inp = layers.Input(shape=(seq_len, n_features), name="inputs") + + # 입력 projection + x = layers.Dense(d_model)(inp) + x = x + positional_encoding(seq_len, d_model) + + for _ in range(num_layers): + # MHA 블록 + attn_out = layers.MultiHeadAttention( + num_heads=num_heads, key_dim=d_model // num_heads, dropout=dropout + )(x, x, training=False) + x = layers.LayerNormalization(epsilon=1e-5)(x + attn_out) + + # FFN 블록 + ffn = layers.Dense(ff_dim, activation="gelu")(x) + ffn = layers.Dropout(dropout)(ffn, training=False) + ffn = layers.Dense(d_model)(ffn) + x = layers.LayerNormalization(epsilon=1e-5)(x + ffn) + + # 풀링 + 출력 + x = layers.GlobalAveragePooling1D()(x) + x = layers.Dropout(dropout)(x, training=False) + out = layers.Dense(3, activation="softmax", name="probs")(x) + + return Model(inp, out, name="transformer_classifier")