diff --git a/AI/modules/data_collector/__init__.py b/AI/modules/data_collector/__init__.py index 2bb18233..49193b2c 100644 --- a/AI/modules/data_collector/__init__.py +++ b/AI/modules/data_collector/__init__.py @@ -5,7 +5,6 @@ from .company_fundamentals_data import FundamentalsDataCollector from .macro_data import MacroDataCollector from .crypto_data import CryptoDataCollector -from .index_data import IndexDataCollector from .event_data import EventDataCollector from .market_breadth_data import MarketBreadthCollector from .market_breadth_stats import MarketBreadthStatsCollector @@ -16,8 +15,7 @@ "StockInfoCollector", "FundamentalsDataCollector", "MacroDataCollector", - "CryptoDataCollector" - "IndexDataCollector", + "CryptoDataCollector", "EventDataCollector", "MarketBreadthCollector", "MarketBreadthStatsCollector", diff --git a/AI/modules/data_collector/company_fundamentals_data.py b/AI/modules/data_collector/company_fundamentals_data.py index 9b875769..ebd48d98 100644 --- a/AI/modules/data_collector/company_fundamentals_data.py +++ b/AI/modules/data_collector/company_fundamentals_data.py @@ -19,7 +19,8 @@ class FundamentalsDataCollector: """ 기업의 재무제표(손익계산서, 대차대조표, 현금흐름표)를 수집하고 - 주요 퀀트 투자 지표(PER, PBR, ROE, 이자보상배율 등)를 계산하여 DB에 저장하는 클래스 + 주요 퀀트 투자 지표(ROE, 부채비율, 이자보상배율 등)를 계산하여 DB에 저장하는 클래스 + per과 pbr은 MarketDataCollector에서 계산하여 저장됨(중복 방지) """ def __init__(self, db_name: str = "db"): @@ -46,7 +47,7 @@ def fetch_and_calculate_metrics(self, ticker: str): bal_df = stock.quarterly_balance_sheet.T cash_df = stock.quarterly_cashflow.T except Exception as e: - # print(f" [{ticker}] yfinance 데이터 로드 실패: {e}") + print(f"[ERROR][{ticker}] yfinance 데이터 로드 실패: {e}") return pd.DataFrame() if fin_df.empty or bal_df.empty: @@ -61,17 +62,6 @@ def fetch_and_calculate_metrics(self, ticker: str): merged_df = fin_df.join(bal_df, lsuffix='_fin', rsuffix='_bal', how='inner') merged_df = merged_df.join(cash_df, rsuffix='_cash', how='left') - # 4. 주가 데이터 로드 (PER/PBR 계산용) - if not merged_df.empty: - start_date = merged_df.index.min() - timedelta(days=5) - end_date = merged_df.index.max() + timedelta(days=5) - try: - hist_df = stock.history(start=start_date, end=end_date) - except: - hist_df = pd.DataFrame() - else: - hist_df = pd.DataFrame() - processed_data = [] for date_idx, row in merged_df.iterrows(): @@ -87,7 +77,7 @@ def fetch_and_calculate_metrics(self, ticker: str): operating_cash_flow = self.get_safe_value(row, ['Operating Cash Flow', 'Total Cash From Operating Activities']) shares_issued = self.get_safe_value(row, ['Share Issued', 'Ordinary Shares Number']) - # --- [추가] 이자보상배율 계산을 위한 항목 --- + # --- 이자보상배율 계산을 위한 항목 --- op_income = self.get_safe_value(row, ['Operating Income', 'EBIT']) int_expense = self.get_safe_value(row, ['Interest Expense', 'Interest Expense Non Operating']) @@ -103,38 +93,13 @@ def fetch_and_calculate_metrics(self, ticker: str): if total_liabilities is not None and equity is not None and equity != 0: debt_ratio = total_liabilities / equity - # 3. [신규] 이자보상배율 (Interest Coverage) = 영업이익 / |이자비용| + # 3. 이자보상배율 (Interest Coverage) = 영업이익 / |이자비용| interest_coverage = None if op_income is not None and int_expense is not None: abs_int = abs(int_expense) if abs_int > 0: interest_coverage = op_income / abs_int - # 4. 주가 기반 지표 (PER, PBR) - close_price = None - if not hist_df.empty: - try: - target_ts = pd.Timestamp(date_val) - if target_ts in hist_df.index: - close_price = float(hist_df.loc[target_ts]['Close']) - else: - idx = hist_df.index.get_indexer([target_ts], method='pad') - if idx[0] != -1: - close_price = float(hist_df.iloc[idx[0]]['Close']) - except: - close_price = None - - # PER - per = None - if close_price is not None and eps is not None and eps != 0: - per = close_price / eps - - # PBR - pbr = None - if close_price is not None and equity is not None and shares_issued is not None and shares_issued != 0: - bps = equity / shares_issued - pbr = close_price / bps - processed_data.append(( str(ticker), date_val, @@ -143,12 +108,11 @@ def fetch_and_calculate_metrics(self, ticker: str): total_assets, total_liabilities, equity, + shares_issued, eps, - per, - pbr, roe, debt_ratio, - interest_coverage, # [추가됨] + interest_coverage, operating_cash_flow )) @@ -170,7 +134,7 @@ def save_to_db(self, ticker: str, data: List[tuple]): insert_query = """ INSERT INTO public.financial_statements ( ticker, date, revenue, net_income, total_assets, - total_liabilities, equity, eps, per, pbr, roe, debt_ratio, + total_liabilities, equity, shares_issued, eps, roe, debt_ratio, interest_coverage, operating_cash_flow ) VALUES %s @@ -181,9 +145,8 @@ def save_to_db(self, ticker: str, data: List[tuple]): total_assets = EXCLUDED.total_assets, total_liabilities = EXCLUDED.total_liabilities, equity = EXCLUDED.equity, + shares_issued = EXCLUDED.shares_issued, eps = EXCLUDED.eps, - per = EXCLUDED.per, - pbr = EXCLUDED.pbr, roe = EXCLUDED.roe, debt_ratio = EXCLUDED.debt_ratio, interest_coverage = EXCLUDED.interest_coverage, diff --git a/AI/modules/data_collector/index_data.py b/AI/modules/data_collector/index_data.py deleted file mode 100644 index bac8fed5..00000000 --- a/AI/modules/data_collector/index_data.py +++ /dev/null @@ -1,166 +0,0 @@ -#AI/modules/data_collector/index_data.py -import sys -import os -import yfinance as yf -import pandas as pd -from datetime import datetime, timedelta -from psycopg2.extras import execute_values - -# 프로젝트 루트 경로 설정 -current_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.abspath(os.path.join(current_dir, "../../..")) -if project_root not in sys.path: - sys.path.append(project_root) - -from AI.libs.database.connection import get_db_conn - -class IndexDataCollector: - """ - 주요 주식 시장 지수(Benchmark Index)를 수집하여 price_data 테이블에 저장하는 클래스 - - S&P 500, NASDAQ, KOSPI, KOSDAQ 등 - - 개별 종목과 동일하게 price_data에 저장되지만, 티커명이 '^'로 시작함 - """ - - def __init__(self, db_name: str = "db"): - self.db_name = db_name - # 관리할 주요 지수 목록 (필요에 따라 추가) - self.INDICES = { - '^GSPC': 'S&P 500', - '^IXIC': 'NASDAQ Composite', - '^DJI': 'Dow Jones Industrial Average', - '^KS11': 'KOSPI Composite', - '^KQ11': 'KOSDAQ Composite', - '^RUT': 'Russell 2000' - } - self.FIXED_START_DATE = "2010-01-01" - - def get_start_date(self, ticker: str, repair_mode: bool) -> str: - """DB에서 마지막 수집일을 조회""" - if repair_mode: - return self.FIXED_START_DATE - - conn = get_db_conn(self.db_name) - cursor = conn.cursor() - try: - cursor.execute("SELECT MAX(date) FROM public.price_data WHERE ticker = %s", (ticker,)) - last_date = cursor.fetchone()[0] - if last_date: - return (last_date + timedelta(days=1)).strftime("%Y-%m-%d") - return self.FIXED_START_DATE - except Exception: - return self.FIXED_START_DATE - finally: - cursor.close() - conn.close() - - def fetch_index_data(self, ticker: str, start_date: str) -> pd.DataFrame: - """yfinance를 통해 지수 데이터 수집""" - try: - # 지수는 수정주가(Adj Close) 개념이 명확하지 않으나 통일성을 위해 수집 - df = yf.download(ticker, start=start_date, progress=False, auto_adjust=False, threads=False) - if df.empty: - return pd.DataFrame() - - if isinstance(df.columns, pd.MultiIndex): - df.columns = df.columns.get_level_values(0) - - # 거래대금 계산 (지수는 Volume이 0이거나 없는 경우가 많음) - if 'Close' in df.columns and 'Volume' in df.columns: - # 지수의 'Volume'은 거래량이 아닌 경우가 많아(계약수 등) 단순 참고용 - df['Amount'] = df['Close'] * df['Volume'] - else: - df['Amount'] = 0 - - return df - except Exception as e: - print(f" [Error] 지수 {ticker} 수집 실패: {e}") - return pd.DataFrame() - - def save_to_db(self, ticker: str, df: pd.DataFrame): - """DB 저장 (market_data와 동일한 price_data 테이블 사용)""" - conn = get_db_conn(self.db_name) - cursor = conn.cursor() - - insert_query = """ - INSERT INTO public.price_data ( - date, ticker, open, high, low, close, volume, adjusted_close, amount - ) - VALUES %s - ON CONFLICT (date, ticker) DO UPDATE SET - open = EXCLUDED.open, - high = EXCLUDED.high, - low = EXCLUDED.low, - close = EXCLUDED.close, - volume = EXCLUDED.volume, - adjusted_close = EXCLUDED.adjusted_close; - """ - - try: - data_to_insert = [] - has_adj = 'Adj Close' in df.columns - - for index, row in df.iterrows(): - try: - date_val = index.date() - - def get_val(col): - val = row.get(col, 0) - if hasattr(val, 'iloc'): return float(val.iloc[0]) - return float(val) if pd.notnull(val) else 0 - - open_val = get_val('Open') - high_val = get_val('High') - low_val = get_val('Low') - close_val = get_val('Close') - vol_val = int(get_val('Volume')) - adj_close_val = get_val('Adj Close') if has_adj else close_val - amount_val = get_val('Amount') - - data_to_insert.append(( - date_val, ticker, open_val, high_val, low_val, - close_val, vol_val, adj_close_val, amount_val - )) - except: - continue - - if data_to_insert: - execute_values(cursor, insert_query, data_to_insert) - conn.commit() - print(f" [{self.INDICES[ticker]}] {len(data_to_insert)}일치 저장 완료.") - else: - print(f" [{self.INDICES[ticker]}] 최신 데이터임.") - - except Exception as e: - conn.rollback() - print(f" [Error] DB 저장 오류: {e}") - finally: - cursor.close() - conn.close() - - def run(self, repair_mode: bool = False): - """전체 지수 업데이트 실행""" - mode_msg = "[Repair Mode]" if repair_mode else "[Update Mode]" - print(f"[IndexCollector] {mode_msg} 주요 시장 지수 업데이트 시작...") - - today = datetime.now().strftime("%Y-%m-%d") - - for ticker, name in self.INDICES.items(): - start_date = self.get_start_date(ticker, repair_mode) - - if not repair_mode and start_date > today: - print(f" [{name}] 이미 최신입니다.") - continue - - print(f" [{name}({ticker})] 수집 시작 ({start_date} ~ )...") - df = self.fetch_index_data(ticker, start_date) - if not df.empty: - self.save_to_db(ticker, df) - -if __name__ == "__main__": - import argparse - parser = argparse.ArgumentParser() - parser.add_argument("--repair", action="store_true", help="전체 기간 재수집") - args = parser.parse_args() - - collector = IndexDataCollector() - collector.run(repair_mode=args.repair) \ No newline at end of file diff --git a/AI/modules/data_collector/macro_data.py b/AI/modules/data_collector/macro_data.py index 44e982ef..0ccbf1a8 100644 --- a/AI/modules/data_collector/macro_data.py +++ b/AI/modules/data_collector/macro_data.py @@ -1,4 +1,4 @@ -#AI/modules/data_collector/macro_data.py +# AI/modules/data_collector/macro_data.py import sys import os import pandas as pd @@ -102,7 +102,6 @@ def fetch_yahoo_data(self, start_date): data = yf.download(ticker, start=start_date, progress=False) if not data.empty: # 'Close' 컬럼만 추출하여 이름 변경 - # yfinance 버전 이슈로 인해 MultiIndex 컬럼일 수 있음 처리 if isinstance(data.columns, pd.MultiIndex): price_series = data['Close'][ticker] else: @@ -172,21 +171,33 @@ def save_to_db(self, combined_df): try: data_to_insert = [] + for date_idx, row in combined_df.iterrows(): - # NaN 값을 None(SQL NULL)으로 변환 - row = row.where(pd.notnull(row), None) - + # [수정] Numpy 타입을 Python Native Type으로 변환하는 헬퍼 함수 + def to_py(val): + # NaN 또는 None 체크 + if pd.isna(val) or val is None: + return None + # Numpy 숫자 타입이면 .item()으로 변환 + if hasattr(val, 'item'): + return val.item() + return float(val) + data_to_insert.append(( date_idx.date(), - row.get('cpi'), row.get('gdp'), row.get('ppi'), row.get('jolt'), row.get('cci'), - row.get('interest_rate'), row.get('trade_balance'), - row.get('core_cpi'), row.get('real_gdp'), row.get('unemployment_rate'), row.get('consumer_sentiment'), - row.get('ff_targetrate_upper'), row.get('ff_targetrate_lower'), - row.get('pce'), row.get('core_pce'), - row.get('tradebalance_goods'), row.get('trade_import'), row.get('trade_export'), - row.get('us10y'), row.get('us2y'), row.get('yield_spread'), - row.get('vix_close'), row.get('dxy_close'), - row.get('wti_price'), row.get('gold_price'), row.get('credit_spread_hy') + to_py(row.get('cpi')), to_py(row.get('gdp')), to_py(row.get('ppi')), + to_py(row.get('jolt')), to_py(row.get('cci')), + to_py(row.get('interest_rate')), to_py(row.get('trade_balance')), + to_py(row.get('core_cpi')), to_py(row.get('real_gdp')), + to_py(row.get('unemployment_rate')), to_py(row.get('consumer_sentiment')), + to_py(row.get('ff_targetrate_upper')), to_py(row.get('ff_targetrate_lower')), + to_py(row.get('pce')), to_py(row.get('core_pce')), + to_py(row.get('tradebalance_goods')), to_py(row.get('trade_import')), + to_py(row.get('trade_export')), + to_py(row.get('us10y')), to_py(row.get('us2y')), to_py(row.get('yield_spread')), + to_py(row.get('vix_close')), to_py(row.get('dxy_close')), + to_py(row.get('wti_price')), to_py(row.get('gold_price')), + to_py(row.get('credit_spread_hy')) )) if data_to_insert: diff --git a/AI/modules/data_collector/market_breadth_data.py b/AI/modules/data_collector/market_breadth_data.py index 7e9acced..a70e73d1 100644 --- a/AI/modules/data_collector/market_breadth_data.py +++ b/AI/modules/data_collector/market_breadth_data.py @@ -1,4 +1,4 @@ -#AI/modules/data_collector/market_breadth_data.py +# AI/modules/data_collector/market_breadth_data.py import sys import os import yfinance as yf @@ -16,19 +16,20 @@ class MarketBreadthCollector: """ - [시장 폭 및 섹터 데이터 수집기] - - GICS 11개 섹터 + SPY(시장 전체) 수집 - - 매핑되지 않는 섹터의 '보험(Fallback)'으로 SPY를 사용 + [시장 폭, 섹터 및 벤치마크 지수 수집기] + - GICS 섹터 ETF, SPY(시장 전체), 주요 국가 지수(Index)를 수집하여 + - public.sector_returns 테이블에 통합 저장합니다. + - 거래량(Volume) 없이 종가와 수익률만 저장하므로 노이즈가 없습니다. """ def __init__(self, db_name: str = "db"): self.db_name = db_name - # [핵심] 섹터 매핑 정의 (표준화) - # Key: DB에 저장될 표준 섹터명 - # Value: 대표 ETF 티커 + # [핵심] 섹터 및 벤치마크 매핑 정의 + # Key: DB에 저장될 표준 명칭 (sector 컬럼) + # Value: 야후 파이낸스 티커 (etf_ticker 컬럼) self.SECTOR_ETF_MAP = { - # 1. GICS 11개 표준 섹터 + # 1. 미국 GICS 11개 표준 섹터 (ETF) 'Technology': 'XLK', 'Financial Services': 'XLF', 'Healthcare': 'XLV', @@ -41,57 +42,65 @@ def __init__(self, db_name: str = "db"): 'Real Estate': 'XLRE', 'Communication Services': 'XLC', - # 2. [추가] 매핑 실패 시 사용할 '시장 전체' (Fallback) - 'Market': 'SPY' + # 2. 시장 전체 (Tradable Proxy) fallback 용 지수 + 'Market': 'SPY', + + # 3. [통합] 주요 시장 벤치마크 지수 (Index) + # 지수는 Volume 데이터가 부정확하므로 sector_returns 테이블(수익률/종가)에 넣는 것이 최적입니다. + 'S&P 500': '^GSPC', + 'NASDAQ': '^IXIC', + 'Dow Jones': '^DJI', + 'Russell 2000': '^RUT', + 'KOSPI': '^KS11', + 'KOSDAQ': '^KQ11', + 'VIX': '^VIX' # VIX도 여기서 함께 추적 } - # [보완] yfinance의 변칙적인 섹터명을 표준명으로 연결하는 사전 - # (나중에 전처리나 stock_info 수집 시 활용 가능하지만, - # 여기서는 '어떤 ETF 데이터를 어디에 매핑할지'가 중요하므로 참고용 주석) - # 예: 'Information Technology' -> 'Technology' - # 'Materials' -> 'Basic Materials' - def fetch_and_save_sector_returns(self, days_back: int = 365): - # ... (이전 코드와 로직 동일) ... - # self.SECTOR_ETF_MAP에 'Market': 'SPY'가 추가되었으므로 - # 자동으로 SPY 데이터도 'Market'이라는 섹터명으로 저장됩니다. - - print("[Breadth] 섹터 ETF(+SPY) 데이터 수집 시작...") + print("[Breadth] 섹터 ETF 및 벤치마크 지수 데이터 수집 시작...") + # 수집 안전성을 위해 10일 더 여유있게 조회 start_date = (datetime.now() - timedelta(days=days_back + 10)).strftime('%Y-%m-%d') tickers = list(self.SECTOR_ETF_MAP.values()) + + # 티커 -> 섹터명 역매핑 (저장 시 사용) ticker_to_sector = {v: k for k, v in self.SECTOR_ETF_MAP.items()} try: - # 1. 데이터 다운로드 + # 1. 데이터 다운로드 (일괄 다운로드) + # auto_adjust=True: 수정 주가 반영 data = yf.download(tickers, start=start_date, progress=False, auto_adjust=True, threads=True) if data.empty: print(" >> 수집된 데이터가 없습니다.") return - # MultiIndex 처리 (yfinance 버전에 따른 안전장치) + # 2. 종가(Close) 데이터 추출 로직 + # yfinance 버전에 따라 컬럼 구조가 다를 수 있어 방어 코드 적용 + closes = pd.DataFrame() + if isinstance(data.columns, pd.MultiIndex): - # 'Close' 레벨이 있으면 가져오고, 없으면 전체가 Close라고 가정 시도 - try: + # Case A: (Price, Ticker) 구조 또는 (Ticker, Price) 구조 + # 'Close' 레벨이 존재하는지 확인 + if 'Close' in data.columns.get_level_values(0): closes = data.xs('Close', axis=1, level=0) - except KeyError: - # columns 구조가 (Ticker, PriceType)이 아니라 (PriceType, Ticker) 일수도 있음 - # yfinance 최근 버전은 (Price, Ticker) 구조임. - # 여기서는 간단히 'Adj Close'나 'Close'를 찾아서 처리 - if 'Close' in data.columns.get_level_values(0): - closes = data.xs('Close', axis=1, level=0) - else: - # 구조를 알 수 없을 때 - print(" [Warning] 데이터 컬럼 구조 인식 불가. Skip.") - return + elif 'Close' in data.columns.get_level_values(1): + closes = data.xs('Close', axis=1, level=1) + else: + # 'Close'가 없으면 최후의 수단으로 그냥 data 사용 + print(" [Error] 데이터 프레임 구조에서 Close 컬럼을 찾을 수 없습니다.") + return else: - closes = data['Close'] + # MultiIndex가 아닌 경우 (단일 티커 요청 시 등) + if 'Close' in data.columns: + closes = data['Close'] + else: + closes = data # 전체가 종가라고 가정 - # 2. 수익률 계산 + # 3. 수익률 계산 (전일 대비 변동률) returns = closes.pct_change() - # 3. DB 저장 + # 4. DB 저장 conn = get_db_conn(self.db_name) cursor = conn.cursor() @@ -107,54 +116,66 @@ def fetch_and_save_sector_returns(self, days_back: int = 365): data_to_insert = [] + # DataFrame 순회하며 데이터 리스트 생성 for date_idx, row in returns.iterrows(): date_val = date_idx.date() - for etf_ticker in tickers: - if etf_ticker not in row: continue - ret_val = row[etf_ticker] - if pd.isna(ret_val): continue + for ticker in tickers: + # 해당 날짜/티커에 데이터가 없으면 스킵 + if ticker not in row or pd.isna(row[ticker]): + continue - # 종가 (closes 데이터프레임에서 가져옴) - if etf_ticker in closes.columns: - close_val = closes.loc[date_idx, etf_ticker] - else: - close_val = 0.0 + ret_val = row[ticker] + + # 종가 가져오기 (closes DF에서) + close_val = 0.0 + if ticker in closes.columns: + val = closes.loc[date_idx, ticker] + if pd.notna(val): + close_val = float(val) - sector_name = ticker_to_sector.get(etf_ticker, 'Unknown') + sector_name = ticker_to_sector.get(ticker, 'Unknown') data_to_insert.append(( date_val, sector_name, - etf_ticker, + ticker, float(ret_val), float(close_val) )) if data_to_insert: + # 대량 Insert 시 배치 처리 batch_size = 1000 for i in range(0, len(data_to_insert), batch_size): execute_values(cursor, insert_query, data_to_insert[i:i+batch_size]) + conn.commit() - print(f" >> 섹터(+Market) 수익률 {len(data_to_insert)}건 저장 완료.") + print(f" >> 섹터/SPY/지수 데이터 {len(data_to_insert)}건 저장 완료.") + else: + print(" >> 저장할 유효한 데이터가 없습니다.") except Exception as e: conn.rollback() - print(f" [Error] 섹터 데이터 처리 중 오류: {e}") + print(f" [Error] 섹터/지수 데이터 처리 중 오류: {e}") finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals(): conn.close() def run(self, repair_mode: bool = False): - # Repair 모드면 2010년부터, 아니면 최근 2년치 - days = 365 * 15 if repair_mode else 365 * 2 + """ + 실행 진입점 + - repair_mode=True: 2010년부터 전체 재수집 + - repair_mode=False: 최근 2년치만 업데이트 + """ + days = 365 * 16 if repair_mode else 365 * 2 self.fetch_and_save_sector_returns(days_back=days) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() - parser.add_argument("--repair", action="store_true") - parser.add_argument("--db", default="db") + parser.add_argument("--repair", action="store_true", help="전체 기간 재수집") + parser.add_argument("--db", default="db", help="DB 이름") args = parser.parse_args() collector = MarketBreadthCollector(db_name=args.db) diff --git a/AI/modules/data_collector/market_data.py b/AI/modules/data_collector/market_data.py index b1d7277c..27e9cd88 100644 --- a/AI/modules/data_collector/market_data.py +++ b/AI/modules/data_collector/market_data.py @@ -1,8 +1,9 @@ -#AI/modules/data_collector/market_data.py +# AI/modules/data_collector/market_data.py import sys import os import yfinance as yf import pandas as pd +import numpy as np from datetime import datetime, timedelta from typing import List from psycopg2.extras import execute_values @@ -21,6 +22,7 @@ class MarketDataCollector: - yfinance를 사용하여 데이터 수집 - 수리(Repair) 모드 지원 (누락 데이터 복구) - 거래대금(Amount) 자동 계산 + - 일별 PER/PBR 고속 계산 (Vectorized) """ def __init__(self, db_name: str = "db"): @@ -57,26 +59,79 @@ def get_start_date(self, ticker: str, repair_mode: bool) -> str: def fetch_ohlcv(self, ticker: str, start_date: str) -> pd.DataFrame: """ yfinance에서 OHLCV 데이터를 다운로드하고 전처리합니다. + (중복 호출 제거 및 벡터 연산 적용) """ try: + # 1. 데이터 다운로드 # auto_adjust=False: Adj Close 별도 확보 df = yf.download(ticker, start=start_date, progress=False, auto_adjust=False, threads=False) if df.empty: return pd.DataFrame() - # MultiIndex 컬럼 평탄화 (yfinance 버전 이슈 대응) + # 2. MultiIndex 컬럼 평탄화 (yfinance 버전 이슈 대응) if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) - # 거래대금(Amount) 계산: 종가 * 거래량 (근사치) - # yfinance는 원화/달러 여부에 따라 단위가 다르지만, raw value 유지 + # 3. 거래대금(Amount) 계산: 종가 * 거래량 (근사치) if 'Close' in df.columns and 'Volume' in df.columns: df['Amount'] = df['Close'] * df['Volume'] else: df['Amount'] = 0 + # 4. PER/PBR 계산을 위한 재무 데이터 조회 (최근 1건) + # 매일 변하는 주가에 고정된 최근 실적(EPS, BPS)을 적용합니다. + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + equity, shares_issued, eps = None, None, None + try: + # 가장 최근 확정 실적 조회 + query = """ + SELECT equity, shares_issued, eps + FROM public.financial_statements + WHERE ticker = %s + ORDER BY date DESC LIMIT 1 + """ + cursor.execute(query, (ticker,)) + result = cursor.fetchone() + + if result: + equity, shares_issued, eps = result + + # [수정] Decimal 타입을 float로 변환 (나눗셈 에러 방지) + if equity is not None: equity = float(equity) + if shares_issued is not None: shares_issued = float(shares_issued) + if eps is not None: eps = float(eps) + + except Exception as e: + print(f" [Warning] 재무 데이터 조회 실패 ({ticker}): {e}") + finally: + cursor.close() + conn.close() + + # 5. 벡터 연산으로 PER/PBR 일괄 계산 (반복문 제거로 속도 향상) + + # PER 계산 (EPS가 유효할 때만 계산) + if eps and eps != 0: + # df['Close']는 전체 열을 의미하므로 한 번에 계산됩니다. + df['per'] = df['Close'] / eps + else: + df['per'] = None # 데이터가 없으면 컬럼을 비워둠 + + # PBR 계산 (자본/주식수가 유효할 때만 계산) + if equity and shares_issued and shares_issued != 0: + bps = equity / shares_issued + df['pbr'] = df['Close'] / bps + else: + df['pbr'] = None + + # 무한대(inf)나 NaN 값 정리 (DB 저장 에러 방지) + df.replace([np.inf, -np.inf], None, inplace=True) + df = df.where(pd.notnull(df), None) + return df + except Exception as e: print(f" [Error] {ticker} 다운로드 중 에러: {e}") return pd.DataFrame() @@ -88,9 +143,10 @@ def save_to_db(self, ticker: str, df: pd.DataFrame): conn = get_db_conn(self.db_name) cursor = conn.cursor() + # [수정됨] 쿼리 컬럼 개수와 VALUES 매핑 일치 insert_query = """ INSERT INTO public.price_data ( - date, ticker, open, high, low, close, volume, adjusted_close, amount + date, ticker, open, high, low, close, volume, adjusted_close, amount, per, pbr ) VALUES %s ON CONFLICT (date, ticker) DO UPDATE SET @@ -100,37 +156,48 @@ def save_to_db(self, ticker: str, df: pd.DataFrame): close = EXCLUDED.close, volume = EXCLUDED.volume, adjusted_close = EXCLUDED.adjusted_close, - amount = EXCLUDED.amount; + amount = EXCLUDED.amount, + per = EXCLUDED.per, + pbr = EXCLUDED.pbr; """ - # repair_mode에서도 기존 값이 수정될 수 있도록 DO UPDATE로 변경하거나, - # 원본 보존을 원하면 DO NOTHING을 유지할 수 있음. - # 여기서는 최신 데이터로 갱신하는 DO UPDATE 전략 채택. try: data_to_insert = [] has_adj = 'Adj Close' in df.columns + # DataFrame에 해당 컬럼이 있는지 확인 + has_per = 'per' in df.columns + has_pbr = 'pbr' in df.columns for index, row in df.iterrows(): date_val = index.date() - # 안전한 값 추출 (Series일 경우 첫 번째 값 사용) - def get_val(col): - val = row.get(col, 0) + # 안전한 값 추출 헬퍼 함수 + def get_val(col, default=0): + val = row.get(col, default) + # Pandas Series 객체인 경우 처리 if hasattr(val, 'iloc'): - return float(val.iloc[0]) - return float(val) if pd.notnull(val) else 0 - - open_val = get_val('Open') - high_val = get_val('High') - low_val = get_val('Low') - close_val = get_val('Close') - vol_val = int(get_val('Volume')) - amount_val = get_val('Amount') - + val = val.iloc[0] + # None 체크 + if val is None or pd.isna(val): + return None + return float(val) + + open_val = get_val('Open') or 0 + high_val = get_val('High') or 0 + low_val = get_val('Low') or 0 + close_val = get_val('Close') or 0 + vol_val = int(get_val('Volume') or 0) + amount_val = get_val('Amount') or 0 adj_close_val = get_val('Adj Close') if has_adj else close_val + + # [중요] 쿼리에 맞춰 per, pbr 값 추가 + per_val = get_val('per', None) if has_per else None + pbr_val = get_val('pbr', None) if has_pbr else None + # 튜플 순서: date, ticker, open, high, low, close, volume, adj_close, amount, per, pbr data_to_insert.append(( - date_val, ticker, open_val, high_val, low_val, close_val, vol_val, adj_close_val, amount_val + date_val, ticker, open_val, high_val, low_val, close_val, + vol_val, adj_close_val, amount_val, per_val, pbr_val )) if data_to_insert: diff --git a/AI/modules/data_collector/run.py b/AI/modules/data_collector/run.py index 1e87b687..9da4c086 100644 --- a/AI/modules/data_collector/run.py +++ b/AI/modules/data_collector/run.py @@ -23,7 +23,6 @@ from AI.modules.data_collector.company_fundamentals_data import FundamentalsDataCollector from AI.modules.data_collector.macro_data import MacroDataCollector from AI.modules.data_collector.crypto_data import CryptoDataCollector -from AI.modules.data_collector.index_data import IndexDataCollector from AI.modules.data_collector.event_data import EventDataCollector from AI.modules.data_collector.market_breadth_data import MarketBreadthCollector from AI.modules.data_collector.market_breadth_stats import MarketBreadthStatsCollector @@ -59,7 +58,6 @@ def main(): # 모듈별 스킵 옵션 parser.add_argument("--skip-price", action="store_true", help="주식 시세 수집 Skip") - parser.add_argument("--skip-index", action="store_true", help="시장 지수 수집 Skip") parser.add_argument("--skip-info", action="store_true", help="주식 정보 수집 Skip") parser.add_argument("--skip-fund", action="store_true", help="재무제표 수집 Skip") parser.add_argument("--skip-macro", action="store_true", help="거시경제 수집 Skip") @@ -85,7 +83,6 @@ def main(): # 1. Stats Only if args.stats_only: args.skip_price = True - args.skip_index = True args.skip_info = True args.skip_fund = True args.skip_macro = True @@ -98,7 +95,6 @@ def main(): # 2. Event Only if args.event_only: args.skip_price = True - args.skip_index = True args.skip_info = True args.skip_fund = True args.skip_macro = True @@ -111,7 +107,6 @@ def main(): # 3. Market Breadth Only (ETF Sector Returns) if args.market_breadth_only: args.skip_price = True - args.skip_index = True args.skip_info = True args.skip_fund = True args.skip_macro = True @@ -153,57 +148,48 @@ def main(): collector.run(lookback_days=365*5 if args.repair else 365*2) except Exception as e: print(f"[Error] Macro Data 수집 중단: {e}") - - # (2) 시장 지수 (Index) - if not args.skip_index: - try: - print("\n>>> [Step 2] 시장 지수(Index) 업데이트") - collector = IndexDataCollector(db_name=args.db) - collector.run(repair_mode=args.repair) - except Exception as e: - print(f"[Error] Index Data 수집 중단: {e}") - # (3) 주가 데이터 (Stocks OHLCV) + # (2) 주가 데이터 (Stocks OHLCV) if not args.skip_price and stock_tickers: try: - print("\n>>> [Step 3] 개별 주식 시세(OHLCV) 업데이트") + print("\n>>> [Step 2] 개별 주식 시세(OHLCV) 업데이트") collector = MarketDataCollector(db_name=args.db) collector.update_tickers(stock_tickers, repair_mode=args.repair) except Exception as e: print(f"[Error] Market Data 수집 중단: {e}") - # (4) 암호화폐 데이터 (Crypto) + # (3) 암호화폐 데이터 (Crypto) if not args.skip_crypto: try: - print("\n>>> [Step 4] 암호화폐(Crypto) 업데이트") + print("\n>>> [Step 3] 암호화폐(Crypto) 업데이트") target_crypto = ["BTC-USD", "ETH-USD"] collector = CryptoDataCollector(db_name=args.db) collector.update_tickers(target_crypto, repair_mode=args.repair) except Exception as e: print(f"[Error] Crypto Data 수집 중단: {e}") - # (5) 재무제표 (Fundamentals) + # (4) 재무제표 (Fundamentals) if not args.skip_fund and stock_tickers: try: - print("\n>>> [Step 5] 기업 재무제표(Fundamentals) 업데이트") + print("\n>>> [Step 4] 기업 재무제표(Fundamentals) 업데이트") collector = FundamentalsDataCollector(db_name=args.db) collector.update_tickers(stock_tickers) except Exception as e: print(f"[Error] Fundamentals 수집 중단: {e}") - # (6) 주식 기본 정보 (Stock Info) + # (5) 주식 기본 정보 (Stock Info) if not args.skip_info and stock_tickers: try: - print("\n>>> [Step 6] 주식 정보(Stock Info) 업데이트") + print("\n>>> [Step 5] 주식 정보(Stock Info) 업데이트") collector = StockInfoCollector(db_name=args.db) collector.update_tickers(stock_tickers) except Exception as e: print(f"[Error] Stock Info 수집 중단: {e}") - # (7) 이벤트 일정 (Earnings, Macro Events) + # (6) 이벤트 일정 (Earnings, Macro Events) if not args.skip_event: try: - print("\n>>> [Step 7] 이벤트 일정(Event Data) 업데이트") + print("\n>>> [Step 6] 이벤트 일정(Event Data) 업데이트") collector = EventDataCollector(db_name=args.db) collector.update_macro_events(force_update=args.repair) if stock_tickers: @@ -211,19 +197,19 @@ def main(): except Exception as e: print(f"[Error] Event Data 수집 중단: {e}") - # (8) 시장 폭 및 섹터 데이터 (Market Breadth - Sector Returns) + # (7) 시장 폭 및 섹터 데이터 (Market Breadth - Sector Returns) if not args.skip_breadth: try: - print("\n>>> [Step 8] 시장 폭 및 섹터 데이터(Sector Returns) 업데이트") + print("\n>>> [Step 7] 시장 폭 및 섹터 데이터(Sector Returns) 업데이트") collector = MarketBreadthCollector(db_name=args.db) collector.run(repair_mode=args.repair) except Exception as e: print(f"[Error] Sector Data 수집 중단: {e}") - # (9) 시장 통계 계산 (Market Breadth Stats - Internal Aggregation) + # (8) 시장 통계 계산 (Market Breadth Stats - Internal Aggregation) if not args.skip_stats: try: - print("\n>>> [Step 9] 시장 통계(NH-NL, MA200%) 계산 및 저장") + print("\n>>> [Step 8] 시장 통계(NH-NL, MA200%) 계산 및 저장") collector = MarketBreadthStatsCollector(db_name=args.db) collector.run(repair_mode=args.repair) except Exception as e: diff --git a/AI/modules/features/event_features.py b/AI/modules/features/event_features.py new file mode 100644 index 00000000..7db4be77 --- /dev/null +++ b/AI/modules/features/event_features.py @@ -0,0 +1,18 @@ +# AI/modules/signal/core/features/event_features.py +import pandas as pd + +def add_date_distance(df: pd.DataFrame, event_dates: pd.Series, col_name: str) -> pd.DataFrame: + """asof_date 기준 특정 이벤트 후 경과일 계산 [명세서 준수]""" + # event_dates는 각 행(날짜)별로 가장 최근의 이벤트 날짜를 가지고 있어야 함 + df[f'days_since_{col_name}'] = (df.index - pd.to_datetime(event_dates)).dt.days + # 음수(미래)는 0 처리 + df[f'days_since_{col_name}'] = df[f'days_since_{col_name}'].clip(lower=0) + return df + +def add_event_window_flags(df: pd.DataFrame, event_dates_list: list, col_name: str) -> pd.DataFrame: + """FOMC/CPI 전후 1일 여부 (True/False) [명세서 준수]""" + # 이벤트 당일 플래그 + df[f'event_window_flag_{col_name}'] = df.index.isin(event_dates_list) + # 전후 1일로 확장 (rolling max) + df[f'event_window_flag_{col_name}'] = df[f'event_window_flag_{col_name}'].rolling(window=3, center=True).max().fillna(0).astype(bool) + return df \ No newline at end of file diff --git a/AI/modules/signal/core/features.py b/AI/modules/features/legacy/technical_features.py similarity index 97% rename from AI/modules/signal/core/features.py rename to AI/modules/features/legacy/technical_features.py index 72e7969f..9ec83ddf 100644 --- a/AI/modules/signal/core/features.py +++ b/AI/modules/features/legacy/technical_features.py @@ -1,5 +1,6 @@ -# AI/modules/signal/core/features.py +# AI/modules/features/features.py """ +레거시 버전입니다. [Stationary Multi-Timeframe Features - Fixed] - 절대 가격(Price)을 Ratio로 변환합니다. - [수정] 주가 데이터를 파괴하던 clip(-10, 10) 로직을 제거했습니다. @@ -63,9 +64,6 @@ def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: # -------------------------------------------------------- df.replace([np.inf, -np.inf], np.nan, inplace=True) df = df.fillna(0) - - # [삭제됨] df[numeric_cols].clip(-10, 10) <-- 범인 제거 완료 - return df def add_multi_timeframe_features(df: pd.DataFrame) -> pd.DataFrame: diff --git a/AI/modules/features/market_derived.py b/AI/modules/features/market_derived.py new file mode 100644 index 00000000..a8c550af --- /dev/null +++ b/AI/modules/features/market_derived.py @@ -0,0 +1,103 @@ +# AI/modules/features/market_derived.py +import pandas as pd +import numpy as np +from features.technical import compute_rsi, compute_atr, compute_macd, compute_bollinger_bands + +def add_market_changes(df: pd.DataFrame) -> pd.DataFrame: + """가격 및 거래량 기반 변화율 계산 [명세서 준수]""" + df['ret_1d'] = df['close'].pct_change() + df['log_return'] = np.log(df['close'] / df['close'].shift(1)) + # 일중변동성비율 : (High - Low) / Close + df['intraday_vol'] = (df['high'] - df['low']) / (df['close'] + 1e-9) + return df + +def add_macro_changes(df: pd.DataFrame) -> pd.DataFrame: + """VIX, 금리, 달러 변화율 계산 [명세서 준수]""" + # 원천 데이터 컬럼명(vix_close, us10y, dxy_close)이 존재한다고 가정 + if 'vix_close' in df.columns: + df['vix_change_rate'] = df['vix_close'].pct_change() + if 'us10y' in df.columns: + df['us10y_chg'] = df['us10y'].diff() # 금리는 주로 bp 단위 변화량 사용 + if 'dxy_close' in df.columns: + df['dxy_chg'] = df['dxy_close'].pct_change() + return df + +def add_relative_strength(df: pd.DataFrame, sector_col: str) -> pd.DataFrame: + """상대강도: 종목_ret - 섹터_ret [명세서 준수]""" + if sector_col in df.columns: + df[f'sector_return_rel_{sector_col}'] = df['ret_1d'] - df[sector_col] + return df + +def add_standard_technical_features(df: pd.DataFrame) -> pd.DataFrame: + """레거시 로직 + 명세서 신규 지표 통합""" + epsilon = 1e-9 + + # 1. RSI (rsi_14) + df['rsi_14'] = compute_rsi(df['close'], 14) / 100.0 + + # 2. MACD (macd, macd_signal) + df['macd'], df['macd_signal'] = compute_macd(df['close']) + # 모델 입력용 macd_ratio (Stationary) + df['macd_ratio'] = df['macd'] / (df['close'] + epsilon) + + # 3. Bollinger Bands (ub, lb) + std20 = df['close'].rolling(window=20).std() + ma20 = df['close'].rolling(window=20).mean() + df['bollinger_ub'] = ma20 + (std20 * 2) + df['bollinger_lb'] = ma20 - (std20 * 2) + # 모델 입력용 포지션 + df['bb_position'] = (df['close'] - df['bollinger_lb']) / ( (df['bollinger_ub'] - df['bollinger_lb']).replace(0, epsilon) ) + + # 4. ATR (atr_14) + df['atr_14'] = compute_atr(df['high'], df['low'], df['close'], 14) + + # 5. Moving Averages (ma_20, ma_60) + for w in [20, 60]: + df[f'ma_{w}'] = df['close'].rolling(window=w).mean() + # 모델 입력용 이격도 (Standard Key: ma_trend_score 등에 활용) + df[f'ma{w}_ratio'] = (df['close'] - df[f'ma_{w}']) / (df[f'ma_{w}'] + epsilon) + + return df + +def add_multi_timeframe_features(df: pd.DataFrame) -> pd.DataFrame: + """일봉 데이터에 주봉(Weekly) 및 월봉(Monthly) 피처 결합 [레거시 기능 복구]""" + if df.empty: return df + + # 인덱스가 날짜여야 resample 가능 + df_origin = df.copy() + if 'date' in df_origin.columns: + df_origin = df_origin.set_index('date').sort_index() + + epsilon = 1e-9 + + # --- 1. 주봉(Weekly) 계산 --- + df_weekly = df_origin.resample('W-FRI').agg({ + 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' + }) + # 명세서 키 적용: week_ma20_ratio, week_rsi, week_bb_pos + w_ma20 = df_weekly['close'].rolling(window=20).mean() + df_weekly['week_ma20_ratio'] = (df_weekly['close'] - w_ma20) / (w_ma20 + epsilon) + df_weekly['week_rsi'] = compute_rsi(df_weekly['close'], 14) / 100.0 + + w_upper, w_lower = compute_bollinger_bands(df_weekly['close'], 20) + df_weekly['week_bb_pos'] = (df_weekly['close'] - w_lower) / ((w_upper - w_lower).replace(0, epsilon)) + df_weekly['week_vol_change'] = df_weekly['volume'].pct_change() + + # --- 2. 월봉(Monthly) 계산 --- + df_monthly = df_origin.resample('ME').agg({ + 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' + }) + # 명세서 키 적용: month_ma12_ratio, month_rsi + m_ma12 = df_monthly['close'].rolling(window=12).mean() + df_monthly['month_ma12_ratio'] = (df_monthly['close'] - m_ma12) / (m_ma12 + epsilon) + df_monthly['month_rsi'] = compute_rsi(df_monthly['close'], 14) / 100.0 + + # --- 3. 데이터 병합 (Reindex & ffill) --- + weekly_cols = ['week_ma20_ratio', 'week_rsi', 'week_bb_pos', 'week_vol_change'] + monthly_cols = ['month_ma12_ratio', 'month_rsi'] + + # 일봉 날짜 인덱스에 맞춰 주/월봉 데이터를 앞으로 채움(ffill) + df_origin = df_origin.join(df_weekly[weekly_cols].reindex(df_origin.index, method='ffill')) + df_origin = df_origin.join(df_monthly[monthly_cols].reindex(df_origin.index, method='ffill')) + + return df_origin.reset_index() \ No newline at end of file diff --git a/AI/modules/features/processor.py b/AI/modules/features/processor.py new file mode 100644 index 00000000..92a6a4b6 --- /dev/null +++ b/AI/modules/features/processor.py @@ -0,0 +1,42 @@ +# AI/modules/features/processor.py +import pandas as pd +from .market_derived import add_standard_technical_features, add_multi_timeframe_features +from .event_features import add_event_features +from .technical import compute_correlation_spike, compute_recent_loss_ema + +class FeatureProcessor: + """ + SISC 파생 피처 레이어 통합 관리 클래스. + 기존 features.py의 기능을 포함하며 명세서의 Standard Key를 생성합니다. + """ + def __init__(self, df: pd.DataFrame): + # 원본 데이터 복사 및 정렬 + self.df = df.copy() + if 'date' in self.df.columns: + self.df['date'] = pd.to_datetime(self.df['date']) + self.df = self.df.sort_values('date') + + def execute_pipeline(self, event_info=None, sector_df=None): + """전체 파생 피처 생성 파이프라인 실행""" + + # 1. 일봉 기준 표준 기술적 지표 및 수익률 계산 (Standard Key 생성) + self.df = add_standard_technical_features(self.df) + + # 2. 주봉/월봉 멀티 타임프레임 피처 결합 (Legacy 로직 완벽 대체) + self.df = add_multi_timeframe_features(self.df) + + # 3. 이벤트 기반 피처 (IPO 경과일, 실적발표 등) + if event_info: + self.df = add_event_features(self.df, event_info) + + # 4. 데이터 정제 (Legacy 안정성 로직) + self.df = self.finalize_data() + + return self.df + + def finalize_data(self): + """무한대 값 제거 및 결측치 0 채움 (수치 안정성 확보)""" + import numpy as np + self.df.replace([np.inf, -np.inf], np.nan, inplace=True) + self.df = self.df.fillna(0) + return self.df diff --git a/AI/modules/features/technical.py b/AI/modules/features/technical.py new file mode 100644 index 00000000..01d6567d --- /dev/null +++ b/AI/modules/features/technical.py @@ -0,0 +1,61 @@ +# AI/modules/features/technical.py +import pandas as pd +import numpy as np + +def compute_z_score(series: pd.Series, window: int = 20) -> pd.Series: + """VIX Z-score (Window=20~60) [명세서 준수]""" + rolling_mean = series.rolling(window=window).mean() + rolling_std = series.rolling(window=window).std() + return (series - rolling_mean) / (rolling_std + 1e-9) + +def compute_atr_rank(high: pd.Series, low: pd.Series, close: pd.Series, window: int = 14) -> pd.Series: + """ATR 백분위수 (변동성 레벨) [명세서 준수]""" + tr = pd.concat([high - low, + abs(high - close.shift(1)), + abs(low - close.shift(1))], axis=1).max(axis=1) + atr = tr.rolling(window=window).mean() + # 최근 252일 중 현재 ATR이 어느 정도 위치인지 백분위 계산 + return atr.rolling(window=252).apply(lambda x: pd.Series(x).rank(pct=True).iloc[-1]) + +def compute_ma_trend_score(close: pd.Series) -> pd.Series: + """이평선 정배열 점수: 5 > 20 > 60일 기준 [명세서 준수]""" + ma5 = close.rolling(5).mean() + ma20 = close.rolling(20).mean() + ma60 = close.rolling(60).mean() + score = (ma5 > ma20).astype(int) + (ma20 > ma60).astype(int) + return score / 2.0 + +def compute_correlation_spike(series1: pd.Series, series2: pd.Series, window: int = 20) -> pd.Series: + """자산 간 상관계수 급등 시 1 (correlation_spike_flag) [명세서 준수]""" + rolling_corr = series1.rolling(window=window).corr(series2) + corr_mean = rolling_corr.rolling(window=60).mean() + corr_std = rolling_corr.rolling(window=60).std() + # 평균 대비 2표준편차 이상 급등 시 플래그 1 + return (rolling_corr > (corr_mean + 2 * corr_std)).astype(int) + +def compute_recent_loss_ema(y_true: pd.Series, y_pred: pd.Series, span: int = 20) -> pd.Series: + """최근 모델 예측 오차 EMA (게이팅용) [명세서 준수]""" + error = (y_true - y_pred).abs() + return error.ewm(span=span, adjust=False).mean() + +def compute_rsi(series: pd.Series, period: int = 14) -> pd.Series: + """RSI 계산 (0~100)""" + delta = series.diff() + gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() + rs = gain / (loss + 1e-9) + return 100 - (100 / (1 + rs)) + +def compute_bollinger_bands(series: pd.Series, window: int = 20): + """볼린저 밴드 상단, 하단 계산""" + ma = series.rolling(window=window).mean() + std = series.rolling(window=window).std() + return ma + (std * 2), ma - (std * 2) + +def compute_macd(series: pd.Series): + """MACD 및 시그널 라인 계산""" + exp12 = series.ewm(span=12, adjust=False).mean() + exp26 = series.ewm(span=26, adjust=False).mean() + macd = exp12 - exp26 + signal = macd.ewm(span=9, adjust=False).mean() + return macd, signal \ No newline at end of file diff --git a/AI/modules/finder/TFT_Risk_Manager.py b/AI/modules/finder/TFT_Risk_Manager.py new file mode 100644 index 00000000..c91a6361 --- /dev/null +++ b/AI/modules/finder/TFT_Risk_Manager.py @@ -0,0 +1,57 @@ +# AI/modules/finder/TFT_Risk_Manager.py +import pandas as pd +import numpy as np +from ..features.processor import FeatureProcessor + +class TFTRiskManager: + """ + 2차 Finder: TFT(Temporal Fusion Transformer) 기반 리스크 관리 모듈. + 시장 및 종목의 급락 확률(crash_prob)을 계산합니다. + """ + def __init__(self, model_version="v2.1.0"): + self.model_ver = model_version + # 급락 기준 정의 (예: 1일 -5% 이하) + self.crash_threshold = -0.05 + + def predict_crash_probability(self, feature_df: pd.DataFrame): + """ + 입력된 파생 피처를 바탕으로 급락 확률 산출 [명세서 3번 준수] + """ + # 1. 핵심 입력 데이터 추출 (Primary) + primary_keys = [ + 'vix_z_score', 'us10y_chg', 'yield_spread', 'dxy_chg', + 'wti_price', 'debt_ratio', 'interest_coverage', 'atr_rank', 'ret_1d' + ] + + # 2. 보조 입력 데이터 추출 (Secondary) + secondary_keys = [ + 'credit_spread_hy', 'mkt_breadth_nh_nl', 'surprise_earnings', 'btc_close' + ] + + input_data = feature_df[primary_keys + secondary_keys] + + # 3. 리스크 스코어링 알고리즘 (TFT 모델 추론부 - 예시 로직) + # 실제 구현 시 학습된 TFT 가중치를 로드하여 연산합니다. + risk_score = self._calculate_tft_inference(input_data) + + return risk_score + + def filter_survivors(self, df: pd.DataFrame, risk_threshold=0.7): + """ + 급락 확률이 높은 종목을 제거하고 생존 종목(survivors_c2) 반환 + """ + df['crash_prob'] = self.predict_crash_probability(df) + + # 확률이 문턱값(threshold)보다 낮은 종목만 생존 + survivors_c2 = df[df['crash_prob'] < risk_threshold].copy() + + return survivors_c2, df['crash_prob'] + + def _calculate_tft_inference(self, data): + """TFT 모델의 추론 엔진 (가중치 연산 대행)""" + # 상세 주석: 거시 지표 변화율과 변동성 Z-score의 가중 합산으로 리스크 레벨 산출 + # 실제 운영 환경에서는 Keras/PyTorch 모델 객체가 호출됩니다. + mock_prob = np.clip(data['vix_z_score'] * 0.4 + data['atr_rank'] * 0.3, 0, 1) + return mock_prob + +# 주석: 본 모듈의 결과물인 survivors_c2는 다음 단계인 TCN/PatchTST의 입력 후보가 됩니다. \ No newline at end of file diff --git a/AI/modules/finder/__init__.py b/AI/modules/finder/__init__.py index 14d0d330..942d3ec9 100644 --- a/AI/modules/finder/__init__.py +++ b/AI/modules/finder/__init__.py @@ -1,12 +1,8 @@ # AI/modules/finder/__init__.py """ -[Finder 패키지] -- 유망 종목 발굴 및 리스트 관리 기능을 제공합니다. +[종목 탐색기 모듈] """ - -from ...libs.database.ticker_loader import get_target_tickers, load_all_tickers_from_db -from .evaluator import evaluate_ticker - +from .TFT_Risk_Manager import TFTRiskManager __all__ = [ - "evaluate_ticker", -] \ No newline at end of file + "TFTRiskManager", +] diff --git a/AI/modules/finder/legacy/__init__.py b/AI/modules/finder/legacy/__init__.py new file mode 100644 index 00000000..f1137047 --- /dev/null +++ b/AI/modules/finder/legacy/__init__.py @@ -0,0 +1,11 @@ +# AI/modules/finder/legacy/__init__.py +""" +[Finder 패키지] +- 유망 종목 발굴 및 리스트 관리 기능을 제공합니다. +""" + +from .evaluator import evaluate_ticker + +__all__ = [ + "evaluate_ticker", +] \ No newline at end of file diff --git a/AI/modules/finder/evaluator.py b/AI/modules/finder/legacy/evaluator.py similarity index 96% rename from AI/modules/finder/evaluator.py rename to AI/modules/finder/legacy/evaluator.py index 2618a248..ccc6ad34 100644 --- a/AI/modules/finder/evaluator.py +++ b/AI/modules/finder/legacy/evaluator.py @@ -18,7 +18,7 @@ sys.path.append(project_root) from AI.libs.database.fetcher import fetch_price_data -from AI.modules.signal.core.features import add_technical_indicators +from AI.modules.features.legacy.technical_features import add_technical_indicators def evaluate_ticker(ticker: str) -> Dict[str, float]: """ diff --git a/AI/modules/signal/core/__init__.py b/AI/modules/signal/core/__init__.py index cce34e37..09135091 100644 --- a/AI/modules/signal/core/__init__.py +++ b/AI/modules/signal/core/__init__.py @@ -1,10 +1,8 @@ # AI/modules/signal/core/__init__.py from .base_model import BaseSignalModel from .data_loader import DataLoader -from .features import add_technical_indicators __all__ = [ "BaseSignalModel", "DataLoader", - "add_technical_indicators", ] \ No newline at end of file diff --git a/AI/modules/signal/core/data_loader.py b/AI/modules/signal/core/data_loader.py index f76c5f42..d60988c7 100644 --- a/AI/modules/signal/core/data_loader.py +++ b/AI/modules/signal/core/data_loader.py @@ -29,7 +29,7 @@ fetch_news_sentiment, fetch_fundamentals ) -from AI.modules.signal.core.features import add_technical_indicators +from AI.modules.features.legacy.technical_features import add_technical_indicators class DataLoader: def __init__(self, db_name="db", lookback=60, horizons: List[int] = None): diff --git a/AI/modules/signal/models/PatchTST/architecture.py b/AI/modules/signal/models/PatchTST/architecture.py index 5ef4085d..79046dc0 100644 --- a/AI/modules/signal/models/PatchTST/architecture.py +++ b/AI/modules/signal/models/PatchTST/architecture.py @@ -1,67 +1,124 @@ -# AI/modules/signal/models/transformer/architecture.py -import tensorflow as tf -from tensorflow.keras import layers, models, Input +# AI/modules/signal/models/PatchTST/architecture.py +import torch +import torch.nn as nn -def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0): - # Attention and Normalization - x = layers.MultiHeadAttention( - key_dim=head_size, num_heads=num_heads, dropout=dropout - )(inputs, inputs) - x = layers.Dropout(dropout)(x) - x = layers.LayerNormalization(epsilon=1e-6)(x) - res = x + inputs +class RevIN(nn.Module): + """ + Reverse Instance Normalization: 시계열 데이터의 분포 변화(Distribution Shift) 문제를 해결 + """ + def __init__(self, num_features: int, eps=1e-5, affine=True): + super(RevIN, self).__init__() + self.num_features = num_features + self.eps = eps + self.affine = affine + if self.affine: + self._init_params() - # Feed Forward Part - x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res) - x = layers.Dropout(dropout)(x) - x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) - x = layers.LayerNormalization(epsilon=1e-6)(x) - return x + res + def _init_params(self): + self.affine_weight = nn.Parameter(torch.ones(self.num_features)) + self.affine_bias = nn.Parameter(torch.zeros(self.num_features)) -def build_transformer_model( - input_shape, - n_tickers, - n_sectors, - head_size=256, - num_heads=4, - ff_dim=4, - num_transformer_blocks=4, - mlp_units=[128], - dropout=0.2, - mlp_dropout=0.2, - n_outputs=1 -): - # 1. 시계열 입력 - ts_input = Input(shape=input_shape, name="ts_input") - - # 2. 임베딩 입력 (Ticker, Sector) - ticker_input = Input(shape=(1,), name="ticker_input") - sector_input = Input(shape=(1,), name="sector_input") + def forward(self, x, mode: str): + if mode == 'norm': + self._get_statistics(x) + x = self._normalize(x) + elif mode == 'denorm': + x = self._denormalize(x) + return x - ticker_embedding = layers.Embedding(n_tickers + 1, 16)(ticker_input) - ticker_embedding = layers.Flatten()(ticker_embedding) - - sector_embedding = layers.Embedding(n_sectors + 1, 16)(sector_input) - sector_embedding = layers.Flatten()(sector_embedding) + def _get_statistics(self, x): + dim2reduce = tuple(range(1, x.ndim - 1)) + self.mean = torch.mean(x, dim=dim2reduce, keepdim=True).detach() + self.stdev = torch.sqrt(torch.var(x, dim=dim2reduce, keepdim=True, unbiased=False) + self.eps).detach() - # 3. Transformer Block - x = ts_input - for _ in range(num_transformer_blocks): - x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout) + def _normalize(self, x): + x = x - self.mean + x = x / self.stdev + if self.affine: + x = x * self.affine_weight + self.affine_bias + return x - # 4. Global Pooling & Concat - x = layers.GlobalAveragePooling1D()(x) - x = layers.Concatenate()([x, ticker_embedding, sector_embedding]) + def _denormalize(self, x): + if self.affine: + x = (x - self.affine_bias) / (self.affine_weight + 1e-9) + x = x * self.stdev + x = x + self.mean + return x - # 5. MLP Head - for dim in mlp_units: - x = layers.Dense(dim, activation="relu")(x) - x = layers.Dropout(mlp_dropout)(x) +class PatchTST_Model(nn.Module): + """ + SISC 맞춤형 PatchTST 모델 + - 입력: [Batch, Seq_Len, Features] (예: 120일치 데이터) + - 출력: [Batch, 1] (상승 확률 Logits) + """ + def __init__(self, + seq_len=120, + enc_in=7, # Feature 개수 + patch_len=16, + stride=8, + d_model=128, + n_heads=4, + e_layers=3, + d_ff=256, + dropout=0.1): + super(PatchTST_Model, self).__init__() + + self.seq_len = seq_len + self.patch_len = patch_len + self.stride = stride + self.num_patches = int((seq_len - patch_len) / stride) + 1 - # 6. Output Layer - outputs = layers.Dense(n_outputs, activation="sigmoid", name="output")(x) + # 1. RevIN (입력 정규화) + self.revin = RevIN(enc_in) - return models.Model(inputs=[ts_input, ticker_input, sector_input], outputs=outputs) + # 2. Patching & Embedding + self.patch_embedding = nn.Linear(patch_len, d_model) + self.position_embedding = nn.Parameter(torch.randn(1, enc_in, self.num_patches, d_model)) + self.dropout = nn.Dropout(dropout) -def build_regression_model(input_shape, n_tickers, n_sectors): - return build_transformer_model(input_shape, n_tickers, n_sectors, n_outputs=1) \ No newline at end of file + # 3. Transformer Encoder Backbone (Channel Independent) + encoder_layer = nn.TransformerEncoderLayer(d_model, n_heads, d_ff, dropout, batch_first=True) + self.encoder = nn.TransformerEncoder(encoder_layer, e_layers) + + # 4. Flatten & Head (Prediction) + self.head = nn.Sequential( + nn.Flatten(start_dim=1), + nn.Linear(enc_in * self.num_patches * d_model, 256), + nn.GELU(), + nn.Dropout(dropout), + nn.Linear(256, 1) # 최종 출력: 상승 확률 Logit (Sigmoid 전) + ) + + def forward(self, x): + # x shape: [Batch, Seq_Len, Features] + + # 1. Normalization + x = self.revin(x, 'norm') # [B, S, F] + + # 2. Channel Independence handling: [B, S, F] -> [B, F, S] -> [B*F, S] + B, S, F = x.shape + x = x.permute(0, 2, 1).reshape(B * F, S) + + # 3. Patching: [B*F, S] -> [B*F, Num_Patches, Patch_Len] + x = x.unfold(dimension=1, size=self.patch_len, step=self.stride) + + # 4. Embedding: [B*F, Num_Patches, d_model] + x = self.patch_embedding(x) + + # Position Embedding 더하기 + # [B*F, N, D] 형태로 맞춤 + pos_emb = self.position_embedding.repeat(B, 1, 1, 1).reshape(B * F, self.num_patches, -1) + x = x + pos_emb + x = self.dropout(x) + + # 5. Transformer Encoder + x = self.encoder(x) # [B*F, N, D] + + # 6. Reshape back: [B, F, N, D] + x = x.reshape(B, F, self.num_patches, -1) + + # 7. Final Prediction Head + # 모든 채널과 패치 정보를 합쳐서 하나의 확률값 예측 + out = self.head(x) # [B, 1] + + return out # Logits 반환 (BCEWithLogitsLoss 사용 권장) \ No newline at end of file diff --git a/AI/modules/signal/models/PatchTST/train.py b/AI/modules/signal/models/PatchTST/train.py index b1a9b3d1..0b6831ff 100644 --- a/AI/modules/signal/models/PatchTST/train.py +++ b/AI/modules/signal/models/PatchTST/train.py @@ -1,171 +1,103 @@ -import os -import sys -import pickle +# AI/modules/signal/models/PatchTST/train.py +import torch +import torch.nn as nn +import torch.optim as optim import numpy as np -import pandas as pd -from sklearn.model_selection import train_test_split -import tensorflow as tf - -# [추가] 필수 콜백 함수 임포트 -from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau - -# -------------------------------------------------------------------------- -# 1. GPU 및 환경 설정 -# -------------------------------------------------------------------------- -print("텐서플로우 버전:", tf.__version__) -print("GPU 목록:", tf.config.list_physical_devices('GPU')) -print("\n" + "="*50) - -# OOM(메모리 부족) 방지를 위한 GPU 메모리 점진적 할당 설정 -gpus = tf.config.list_physical_devices('GPU') -if gpus: - try: - for gpu in gpus: - tf.config.experimental.set_memory_growth(gpu, True) - print(f"🚀 GPU 발견됨! ({len(gpus)}대): {gpus}") - except RuntimeError as e: - print(e) -else: - print("🐢 GPU 없음... CPU 사용합니다.") -print("="*50 + "\n") - -# 프로젝트 루트 경로 설정 -current_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) -if project_root not in sys.path: - sys.path.append(project_root) - -from AI.modules.signal.core.data_loader import DataLoader -from AI.modules.signal.models.PatchTST.architecture import build_transformer_model +import os +from torch.utils.data import DataLoader, TensorDataset +from .architecture import PatchTST_Model -def train_single_pipeline(): - print("==================================================") - print(" [Training] Multi-Horizon Model (1, 3, 5, 7 Days)") - print("==================================================") +# 설정값 +CONFIG = { + 'seq_len': 120, + 'input_features': 7, # log_return, ma_trend_score 등 피처 개수와 일치시켜야 함 + 'batch_size': 32, + 'learning_rate': 0.0001, + 'epochs': 100, + 'patience': 10, # Early Stopping + 'model_save_path': 'AI/data/weights/PatchTST_best.pt' +} - # -------------------------------------------------------------------------- - # 2. 데이터 로드 및 기간 분리 (Data Leakage 방지) - # -------------------------------------------------------------------------- - loader = DataLoader(lookback=60) - print(">> 데이터 로딩 및 지표 생성 중...") - - # 전체 데이터를 가져온 뒤... - full_df = loader.load_data_from_db(start_date="2015-01-01") - - # [핵심] 2023년 12월 31일 이전 데이터만 학습에 사용! (미래 데이터 차단) - raw_df = full_df[full_df['date'] <= '2023-12-31'].copy() +def train_model(train_loader, val_loader, device): + """PatchTST 모델 학습 파이프라인""" - print(f">> 학습 데이터 기간: {raw_df['date'].min()} ~ {raw_df['date'].max()}") - print(f">> 총 데이터 행 수: {len(raw_df)} rows") + # 모델 초기화 + model = PatchTST_Model( + seq_len=CONFIG['seq_len'], + enc_in=CONFIG['input_features'] + ).to(device) - # -------------------------------------------------------------------------- - # 3. 데이터셋 생성 (Sequencing) - # -------------------------------------------------------------------------- - # y_class는 (N, 4) 형태: [1일뒤, 3일뒤, 5일뒤, 7일뒤] - X_ts, X_ticker, X_sector, y_class, _, info = loader.create_dataset(raw_df) + # 손실함수 & 옵티마이저 + # 이진 분류(상승/하락) 문제이므로 BCEWithLogitsLoss 사용 + criterion = nn.BCEWithLogitsLoss() + optimizer = optim.AdamW(model.parameters(), lr=CONFIG['learning_rate']) - # [디버그] 정답 분포 확인 - horizons = info.get("horizons", [1]) - n_outputs = len(horizons) # 보통 4개 + best_val_loss = float('inf') + patience_counter = 0 - print("\n" + "="*50) - print(f" 🚨 [DEBUG] Multi-Horizon 데이터 점검 ({horizons}일)") - print("="*50) - for i, h in enumerate(horizons): - col_data = y_class[:, i] - unique, counts = np.unique(col_data, return_counts=True) - dist = dict(zip(unique, counts)) - ratio = counts[1] / sum(counts) * 100 if 1 in dist else 0 - print(f" - [{h}일 뒤] 상승 비율: {ratio:.2f}% (분포: {dist})") - print("="*50 + "\n") + print(f"🚀 PatchTST 학습 시작 (Device: {device})") - # -------------------------------------------------------------------------- - # 4. 데이터 분할 (Train / Validation) - # -------------------------------------------------------------------------- - X_ts_train, X_ts_val, \ - X_tick_train, X_tick_val, \ - X_sec_train, X_sec_val, \ - y_train, y_val = train_test_split( - X_ts, X_ticker, X_sector, y_class, - test_size=0.2, shuffle=True, random_state=42 - ) - - # -------------------------------------------------------------------------- - # 5. 모델 빌드 - # -------------------------------------------------------------------------- - print(f">> 모델 빌드 중 (Outputs: {n_outputs})...") - - model = build_transformer_model( - input_shape=(X_ts.shape[1], X_ts.shape[2]), - n_tickers=info['n_tickers'], - n_sectors=info['n_sectors'], - n_outputs=n_outputs # 4개 출력 - ) - - optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001) - - model.compile( - optimizer=optimizer, - loss="binary_crossentropy", - metrics=["accuracy"] - ) - - # -------------------------------------------------------------------------- - # 6. 콜백 설정 (학습 전략의 핵심) - # -------------------------------------------------------------------------- - save_dir = os.path.join(project_root, "AI/data/weights/transformer") - os.makedirs(save_dir, exist_ok=True) - model_save_path = os.path.join(save_dir, "multi_horizon_model.keras") + for epoch in range(CONFIG['epochs']): + # --- Training --- + model.train() + train_loss = 0 + for X_batch, y_batch in train_loader: + X_batch, y_batch = X_batch.to(device), y_batch.to(device) + + optimizer.zero_grad() + output = model(X_batch) + loss = criterion(output, y_batch.view(-1, 1)) + + loss.backward() + optimizer.step() + train_loss += loss.item() + + avg_train_loss = train_loss / len(train_loader) - # (1) 최고 성능 모델 저장 (전성기 캡처) - chk_point = ModelCheckpoint( - filepath=model_save_path, - monitor='val_loss', - save_best_only=True, - verbose=1 - ) - - # (2) 조기 종료 (10번 참았는데 안 좋아지면 멈춤) - early_stop = EarlyStopping( - monitor='val_loss', - patience=10, - restore_best_weights=True - ) + # --- Validation --- + model.eval() + val_loss = 0 + with torch.no_grad(): + for X_val, y_val in val_loader: + X_val, y_val = X_val.to(device), y_val.to(device) + output = model(X_val) + loss = criterion(output, y_val.view(-1, 1)) + val_loss += loss.item() + + avg_val_loss = val_loss / len(val_loader) + + print(f"Epoch [{epoch+1}/{CONFIG['epochs']}] Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}") - # (3) 학습률 조정 (5번 참았는데 안 좋아지면 더 세밀하게 학습) - reduce_lr = ReduceLROnPlateau( - monitor='val_loss', - factor=0.5, - patience=5, - min_lr=1e-6, - verbose=1 - ) + # --- Early Stopping & Save --- + if avg_val_loss < best_val_loss: + best_val_loss = avg_val_loss + patience_counter = 0 + # 저장 경로 디렉토리 생성 + os.makedirs(os.path.dirname(CONFIG['model_save_path']), exist_ok=True) + torch.save(model.state_dict(), CONFIG['model_save_path']) + print(f"✅ 모델 저장됨: {CONFIG['model_save_path']}") + else: + patience_counter += 1 + if patience_counter >= CONFIG['patience']: + print("🛑 Early Stopping 발동") + break + + return model - # -------------------------------------------------------------------------- - # 7. 학습 시작 - # -------------------------------------------------------------------------- - print(">> 학습 시작 (Epochs=50)...") - model.fit( - x=[X_ts_train, X_tick_train, X_sec_train], - y=y_train, - validation_data=([X_ts_val, X_tick_val, X_sec_val], y_val), - epochs=50, - batch_size=32, # [중요] OOM 방지를 위해 32로 설정 - shuffle=True, - callbacks=[chk_point, early_stop, reduce_lr] # 콜백 적용 - ) +def run_training(X_train, y_train, X_val, y_val): + """ + 외부에서 호출 가능한 학습 진입점 + X: [Samples, Seq_Len, Features] numpy array + y: [Samples] numpy array (0 or 1) + """ + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - # -------------------------------------------------------------------------- - # 8. 스케일러 저장 (필수) - # -------------------------------------------------------------------------- - # 모델은 chk_point가 이미 저장했으므로, 스케일러만 따로 저장합니다. - scaler_save_path = os.path.join(save_dir, "multi_horizon_scaler.pkl") - with open(scaler_save_path, "wb") as f: - pickle.dump(info['scaler'], f) - - print(f"\n[완료] 학습 종료. 모델 및 스케일러가 저장되었습니다.") - print(f" - 모델 경로: {model_save_path}") - print(f" - 스케일러: {scaler_save_path}") - -if __name__ == "__main__": - train_single_pipeline() \ No newline at end of file + # Tensor 변환 + train_data = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)) + val_data = TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)) + + train_loader = DataLoader(train_data, batch_size=CONFIG['batch_size'], shuffle=True) + val_loader = DataLoader(val_data, batch_size=CONFIG['batch_size'], shuffle=False) + + trained_model = train_model(train_loader, val_loader, device) + return trained_model \ No newline at end of file diff --git a/AI/modules/signal/models/PatchTST/wrapper.py b/AI/modules/signal/models/PatchTST/wrapper.py index ab35600f..ffb831d2 100644 --- a/AI/modules/signal/models/PatchTST/wrapper.py +++ b/AI/modules/signal/models/PatchTST/wrapper.py @@ -1,113 +1,59 @@ -# AI/modules/signal/models/transformer/wrapper.py -""" -[Transformer 모델 래퍼] -- BaseSignalModel 인터페이스를 구현한 실제 실행 클래스입니다. -- architecture.py에서 정의한 모델을 빌드하고, 학습/예측/저장 로직을 수행합니다. -""" - -import os +# AI/modules/signal/models/PatchTST/wrapper.py +import torch import numpy as np -import tensorflow as tf -from typing import Dict, Any, Optional +import pandas as pd from AI.modules.signal.core.base_model import BaseSignalModel -from .architecture import build_transformer_model - -class TransformerSignalModel(BaseSignalModel): - def __init__(self, config: Dict[str, Any]): - super().__init__(config) - self.model_name = "transformer_v1" +#from .architecture import build_transformer_model + +class PatchTSTWrapper(BaseSignalModel): + """ + PatchTST 모델 Wrapper: 중장기 추세 및 패턴 분석 엔진 [명세서 3번 준수] + """ + def __init__(self, model_path=None, config=None): + super().__init__(model_path, config) + self.model_name = "PatchTST_Trend_Specialist" - def build(self, input_shape: tuple): - """설정(config)에 따라 모델 아키텍처 생성""" - # 차원 검증 - if len(input_shape) != 2: - # input_shape가 (timesteps, features) 2차원이 아니라면 경고 또는 에러 - # 일부 환경에서 (None, timesteps, features)로 올 수 있으므로 유연하게 처리 - if len(input_shape) == 3 and input_shape[0] is None: - input_shape = input_shape[1:] - else: - raise ValueError(f"입력 차원은 (timesteps, features) 2차원이어야 합니다. 현재: {input_shape}") - - self.model = build_transformer_model( - input_shape=input_shape, - head_size=self.config.get("head_size", 256), - num_heads=self.config.get("num_heads", 4), - ff_dim=self.config.get("ff_dim", 4), - num_transformer_blocks=self.config.get("num_blocks", 4), - mlp_units=self.config.get("mlp_units", [128]), - dropout=self.config.get("dropout", 0.4), - mlp_dropout=self.config.get("mlp_dropout", 0.25) - ) + # 명세서에 정의된 핵심 및 보조 입력 키 설정 + self.feature_cols = [ + 'log_return', 'ma_trend_score', 'atr_14', + 'sector_return_rel', 'us10y_chg', 'dxy_chg', 'days_since_earnings' + ] + + def preprocess(self, df: pd.DataFrame): + """ + PatchTST 입력을 위한 데이터 추출 및 RevIN 정규화 준비 [명세서 4번 준수] + """ + # 1. 명세서 키 기반 피처 추출 + # PatchTST는 Long Lookback이 특징이므로 충분한 시계열 데이터 필요 + data = df[self.feature_cols].values - # 컴파일 - learning_rate = self.config.get("learning_rate", 1e-4) - self.model.compile( - loss="binary_crossentropy", - optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), - metrics=["accuracy", "AUC"] - ) - - def train( - self, - X_train: np.ndarray, - y_train: np.ndarray, - X_val: Optional[np.ndarray] = None, - y_val: Optional[np.ndarray] = None, - **kwargs - ): - """모델 학습 수행""" - if self.model is None: - raise ValueError("모델이 빌드되지 않았습니다. build()를 먼저 호출하세요.") - - # ✅ 호출자가 주면 우선, 없으면 config, 없으면 default - epochs = int(kwargs.pop("epochs", self.config.get("epochs", 50))) - batch_size = int(kwargs.pop("batch_size", self.config.get("batch_size", 32))) - verbose = int(kwargs.pop("verbose", 1)) - - # callbacks는 pop으로 빼서 중복 전달 방지 - callbacks = kwargs.pop("callbacks", []) - - # validation_data는 (X_val, y_val)이 둘 다 있을 때만 - validation_data = (X_val, y_val) if (X_val is not None and y_val is not None) else None - - history = self.model.fit( - X_train, y_train, - validation_data=validation_data, - epochs=epochs, - batch_size=batch_size, - callbacks=callbacks, - verbose=verbose, - **kwargs - ) - return history - - - - - def predict(self, X_input: np.ndarray, **kwargs) -> np.ndarray: - """추론 수행""" - if self.model is None: - raise ValueError("모델이 없습니다. load()하거나 build() 하세요.") + # 2. 결측치 처리 (ffill 규칙 준수 및 최종 0 채우기) + data = np.nan_to_num(data, nan=0.0) + + # 3. 3D 텐서 변환 [Batch, Seq, Features] + # 명세서 권장 사항에 따라 Long Lookback(예: 120일) 적용 + seq_len = self.config.get('seq_len', 120) + if len(data) < seq_len: + return None - # Keras 모델은 (batch, time, feat) 형태를 기대하므로 차원 확인 - if len(X_input.shape) == 2: - X_input = np.expand_dims(X_input, axis=0) + x = data[-seq_len:].reshape(1, seq_len, len(self.feature_cols)) + return torch.FloatTensor(x) + + def predict(self, df: pd.DataFrame): + """ + PatchTST 시그널(signal_patch) 생성 + """ + x = self.preprocess(df) + if x is None: + return 0.0 - return self.model.predict(X_input, **kwargs) - - def save(self, filepath: str): - """모델 저장""" - if self.model is None: - print("저장할 모델이 없습니다.") - return - - os.makedirs(os.path.dirname(filepath), exist_ok=True) - self.model.save(filepath) - print(f"모델 저장 완료: {filepath}") - - def load(self, filepath: str): - """모델 로드""" - if not os.path.exists(filepath): - raise FileNotFoundError(f"모델 파일이 없습니다: {filepath}") + self.model.eval() + with torch.no_grad(): + # 모델 내부적으로 RevIN 정규화가 수행됨 + output = self.model(x) + # 예측값은 Trader 표준 스키마(prob_up)로 변환 + signal_patch = torch.sigmoid(output).item() - self.model = tf.keras.models.load_model(filepath) \ No newline at end of file + return signal_patch + +# 주석: PatchTST는 개별 시계열의 장기 패턴을 Patch 단위로 분석하여 signal_patch를 산출합니다. \ No newline at end of file diff --git a/AI/modules/signal/models/TCN/wrapper.py b/AI/modules/signal/models/TCN/wrapper.py new file mode 100644 index 00000000..759bbeb9 --- /dev/null +++ b/AI/modules/signal/models/TCN/wrapper.py @@ -0,0 +1,56 @@ +# AI/modules/signal/models/TCN/wrapper.py +import torch +import numpy as np +import pandas as pd +from AI.modules.signal.core.base_model import BaseSignalModel + +class TCNWrapper(BaseSignalModel): + """ + TCN 모델 Wrapper: 단기 패턴 및 전환점 포착 엔진 [명세서 3번 준수] + """ + def __init__(self, model_path=None, config=None): + super().__init__(config) + self.model_name = "TCN_Local_Pattern" + # 명세서에 정의된 핵심 및 보조 입력 키 정의 + self.feature_cols = [ + 'log_return', 'vol_change', 'rsi_14', 'macd_ratio', + 'bollinger_ub', 'bollinger_lb', 'days_to_earnings', + 'vix_change_rate', 'sector_return_rel' + ] + + def preprocess(self, df: pd.DataFrame): + """ + TCN 입력을 위한 데이터 추출 및 텐서 변환 [명세서 4번 데이터 검증 준수] + """ + # 1. 명세서 키 기반 피처 추출 + data = df[self.feature_cols].values + + # 2. 결측치 처리 (명세서 규칙: 텐서 생성 시점 NaN 제거) + data = np.nan_to_num(data, nan=0.0) + + # 3. 3D 텐서 변환 [Batch, Seq, Features] + # TCN은 국소 패턴을 보므로 상대적으로 짧은 Lookback 사용 (예: 30일) + seq_len = self.config.get('seq_len', 30) + if len(data) < seq_len: + return None + + x = data[-seq_len:].reshape(1, seq_len, len(self.feature_cols)) + return torch.FloatTensor(x) + + def predict(self, df: pd.DataFrame): + """ + TCN 시그널(signal_tcn) 생성 + """ + x = self.preprocess(df) + if x is None: + return 0.0 + + self.model.eval() + with torch.no_grad(): + # 모델 출력은 Trader 표준 스키마인 prob_up으로 어댑팅됨 + output = self.model(x) + signal_tcn = torch.sigmoid(output).item() + + return signal_tcn + +# 주석: TCN은 시장 전체의 흐름보다 개별 시계열의 기술적 패턴에 집중하여 signal_tcn을 산출합니다. \ No newline at end of file diff --git a/AI/modules/signal/models/__init__.py b/AI/modules/signal/models/__init__.py index d1d81325..afd91809 100644 --- a/AI/modules/signal/models/__init__.py +++ b/AI/modules/signal/models/__init__.py @@ -2,11 +2,13 @@ from typing import Dict, Any from AI.modules.signal.core.base_model import BaseSignalModel # 절대 경로로 수정하여 모듈 찾기 에러 방지 -from AI.modules.signal.models.PatchTST.wrapper import TransformerSignalModel +from AI.modules.signal.models.PatchTST.wrapper import PatchTSTWrapper +from AI.modules.signal.models.transformer.wrapper import TransformerSignalModel # 모델 레지스트리 MODEL_REGISTRY = { "transformer": TransformerSignalModel, + "patchtst": PatchTSTWrapper } def get_model(model_name: str, config: Dict[str, Any]) -> BaseSignalModel: diff --git a/AI/modules/signal/models/itransformer/wrapper.py b/AI/modules/signal/models/itransformer/wrapper.py new file mode 100644 index 00000000..ba5b0474 --- /dev/null +++ b/AI/modules/signal/models/itransformer/wrapper.py @@ -0,0 +1,53 @@ +# AI/modules/signal/models/iTransformer/wrapper.py +import torch +import numpy as np +import pandas as pd +from AI.modules.signal.core.base_model import BaseSignalModel + +class ITransformerWrapper(BaseSignalModel): + """ + iTransformer 모델 Wrapper: 다변량 상관관계 및 거시 지표 분석 엔진 [명세서 3번 준수] + """ + def __init__(self, model_path=None, config=None): + super().__init__(model_path, config) + self.model_name = "iTransformer_Correlation_Expert" + + # 명세서에 정의된 핵심 및 보조 입력 키 (거시/자산 중심) + self.feature_cols = [ + 'us10y', 'yield_spread', 'dxy_close', 'wti_price', 'gold_price', + 'btc_close', 'credit_spread_hy', 'mkt_breadth_ma200', 'surprise_cpi' + ] + + def preprocess(self, df: pd.DataFrame): + """ + iTransformer 입력을 위한 데이터 추출 및 다변량 텐서 변환 [명세서 4번 준수] + """ + # 1. 거시 및 시장 지표 중심 피처 추출 + data = df[self.feature_cols].values + + # 2. 결측치 처리 (ffill 규칙 및 최종 0 채우기) + data = np.nan_to_num(data, nan=0.0) + + # 3. 3D 텐서 변환 [Batch, Seq, Features] + seq_len = self.config.get('seq_len', 60) + if len(data) < seq_len: + return None + + x = data[-seq_len:].reshape(1, seq_len, len(self.feature_cols)) + return torch.FloatTensor(x) + + def predict(self, df: pd.DataFrame): + """ + iTransformer 시그널(signal_itrans) 생성 + """ + x = self.preprocess(df) + if x is None: + return 0.0 + + self.model.eval() + with torch.no_grad(): + output = self.model(x) + # 예측값은 상승 확률(prob_up)로 규격화 + signal_itrans = torch.sigmoid(output).item() + + return signal_itrans \ No newline at end of file diff --git a/AI/modules/signal/models/PatchTST/__init__.py b/AI/modules/signal/models/transformer/__init__.py similarity index 100% rename from AI/modules/signal/models/PatchTST/__init__.py rename to AI/modules/signal/models/transformer/__init__.py diff --git a/AI/modules/signal/models/transformer/architecture.py b/AI/modules/signal/models/transformer/architecture.py new file mode 100644 index 00000000..5ef4085d --- /dev/null +++ b/AI/modules/signal/models/transformer/architecture.py @@ -0,0 +1,67 @@ +# AI/modules/signal/models/transformer/architecture.py +import tensorflow as tf +from tensorflow.keras import layers, models, Input + +def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0): + # Attention and Normalization + x = layers.MultiHeadAttention( + key_dim=head_size, num_heads=num_heads, dropout=dropout + )(inputs, inputs) + x = layers.Dropout(dropout)(x) + x = layers.LayerNormalization(epsilon=1e-6)(x) + res = x + inputs + + # Feed Forward Part + x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res) + x = layers.Dropout(dropout)(x) + x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) + x = layers.LayerNormalization(epsilon=1e-6)(x) + return x + res + +def build_transformer_model( + input_shape, + n_tickers, + n_sectors, + head_size=256, + num_heads=4, + ff_dim=4, + num_transformer_blocks=4, + mlp_units=[128], + dropout=0.2, + mlp_dropout=0.2, + n_outputs=1 +): + # 1. 시계열 입력 + ts_input = Input(shape=input_shape, name="ts_input") + + # 2. 임베딩 입력 (Ticker, Sector) + ticker_input = Input(shape=(1,), name="ticker_input") + sector_input = Input(shape=(1,), name="sector_input") + + ticker_embedding = layers.Embedding(n_tickers + 1, 16)(ticker_input) + ticker_embedding = layers.Flatten()(ticker_embedding) + + sector_embedding = layers.Embedding(n_sectors + 1, 16)(sector_input) + sector_embedding = layers.Flatten()(sector_embedding) + + # 3. Transformer Block + x = ts_input + for _ in range(num_transformer_blocks): + x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout) + + # 4. Global Pooling & Concat + x = layers.GlobalAveragePooling1D()(x) + x = layers.Concatenate()([x, ticker_embedding, sector_embedding]) + + # 5. MLP Head + for dim in mlp_units: + x = layers.Dense(dim, activation="relu")(x) + x = layers.Dropout(mlp_dropout)(x) + + # 6. Output Layer + outputs = layers.Dense(n_outputs, activation="sigmoid", name="output")(x) + + return models.Model(inputs=[ts_input, ticker_input, sector_input], outputs=outputs) + +def build_regression_model(input_shape, n_tickers, n_sectors): + return build_transformer_model(input_shape, n_tickers, n_sectors, n_outputs=1) \ No newline at end of file diff --git a/AI/modules/signal/models/transformer/train.py b/AI/modules/signal/models/transformer/train.py new file mode 100644 index 00000000..d334fbcf --- /dev/null +++ b/AI/modules/signal/models/transformer/train.py @@ -0,0 +1,169 @@ +import os +import sys +import pickle +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +import tensorflow as tf +from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau + +# -------------------------------------------------------------------------- +# 1. GPU 및 환경 설정 +# -------------------------------------------------------------------------- +print("텐서플로우 버전:", tf.__version__) +print("GPU 목록:", tf.config.list_physical_devices('GPU')) +print("\n" + "="*50) + +# OOM(메모리 부족) 방지를 위한 GPU 메모리 점진적 할당 설정 +gpus = tf.config.list_physical_devices('GPU') +if gpus: + try: + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + print(f"🚀 GPU 발견됨! ({len(gpus)}대): {gpus}") + except RuntimeError as e: + print(e) +else: + print("🐢 GPU 없음... CPU 사용합니다.") +print("="*50 + "\n") + +# 프로젝트 루트 경로 설정 +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.signal.core.data_loader import DataLoader +from AI.modules.signal.models.PatchTST.architecture import build_transformer_model + +def train_single_pipeline(): + print("==================================================") + print(" [Training] Multi-Horizon Model (1, 3, 5, 7 Days)") + print("==================================================") + + # -------------------------------------------------------------------------- + # 2. 데이터 로드 및 기간 분리 (Data Leakage 방지) + # -------------------------------------------------------------------------- + loader = DataLoader(lookback=60) + print(">> 데이터 로딩 및 지표 생성 중...") + + # 전체 데이터를 가져온 뒤... + full_df = loader.load_data_from_db(start_date="2015-01-01") + + # [핵심] 2023년 12월 31일 이전 데이터만 학습에 사용! (미래 데이터 차단) + raw_df = full_df[full_df['date'] <= '2023-12-31'].copy() + + print(f">> 학습 데이터 기간: {raw_df['date'].min()} ~ {raw_df['date'].max()}") + print(f">> 총 데이터 행 수: {len(raw_df)} rows") + + # -------------------------------------------------------------------------- + # 3. 데이터셋 생성 (Sequencing) + # -------------------------------------------------------------------------- + # y_class는 (N, 4) 형태: [1일뒤, 3일뒤, 5일뒤, 7일뒤] + X_ts, X_ticker, X_sector, y_class, _, info = loader.create_dataset(raw_df) + + # [디버그] 정답 분포 확인 + horizons = info.get("horizons", [1]) + n_outputs = len(horizons) # 보통 4개 + + print("\n" + "="*50) + print(f" 🚨 [DEBUG] Multi-Horizon 데이터 점검 ({horizons}일)") + print("="*50) + for i, h in enumerate(horizons): + col_data = y_class[:, i] + unique, counts = np.unique(col_data, return_counts=True) + dist = dict(zip(unique, counts)) + ratio = counts[1] / sum(counts) * 100 if 1 in dist else 0 + print(f" - [{h}일 뒤] 상승 비율: {ratio:.2f}% (분포: {dist})") + print("="*50 + "\n") + + # -------------------------------------------------------------------------- + # 4. 데이터 분할 (Train / Validation) + # -------------------------------------------------------------------------- + X_ts_train, X_ts_val, \ + X_tick_train, X_tick_val, \ + X_sec_train, X_sec_val, \ + y_train, y_val = train_test_split( + X_ts, X_ticker, X_sector, y_class, + test_size=0.2, shuffle=True, random_state=42 + ) + + # -------------------------------------------------------------------------- + # 5. 모델 빌드 + # -------------------------------------------------------------------------- + print(f">> 모델 빌드 중 (Outputs: {n_outputs})...") + + model = build_transformer_model( + input_shape=(X_ts.shape[1], X_ts.shape[2]), + n_tickers=info['n_tickers'], + n_sectors=info['n_sectors'], + n_outputs=n_outputs # 4개 출력 + ) + + optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001) + + model.compile( + optimizer=optimizer, + loss="binary_crossentropy", + metrics=["accuracy"] + ) + + # -------------------------------------------------------------------------- + # 6. 콜백 설정 (학습 전략의 핵심) + # -------------------------------------------------------------------------- + save_dir = os.path.join(project_root, "AI/data/weights/transformer") + os.makedirs(save_dir, exist_ok=True) + model_save_path = os.path.join(save_dir, "multi_horizon_model.keras") + + # (1) 최고 성능 모델 저장 (전성기 캡처) + chk_point = ModelCheckpoint( + filepath=model_save_path, + monitor='val_loss', + save_best_only=True, + verbose=1 + ) + + # (2) 조기 종료 (10번 참았는데 안 좋아지면 멈춤) + early_stop = EarlyStopping( + monitor='val_loss', + patience=10, + restore_best_weights=True + ) + + # (3) 학습률 조정 (5번 참았는데 안 좋아지면 더 세밀하게 학습) + reduce_lr = ReduceLROnPlateau( + monitor='val_loss', + factor=0.5, + patience=5, + min_lr=1e-6, + verbose=1 + ) + + # -------------------------------------------------------------------------- + # 7. 학습 시작 + # -------------------------------------------------------------------------- + print(">> 학습 시작 (Epochs=50)...") + model.fit( + x=[X_ts_train, X_tick_train, X_sec_train], + y=y_train, + validation_data=([X_ts_val, X_tick_val, X_sec_val], y_val), + epochs=50, + batch_size=32, # [중요] OOM 방지를 위해 32로 설정 + shuffle=True, + callbacks=[chk_point, early_stop, reduce_lr] # 콜백 적용 + ) + + # -------------------------------------------------------------------------- + # 8. 스케일러 저장 (필수) + # -------------------------------------------------------------------------- + # 모델은 chk_point가 이미 저장했으므로, 스케일러만 따로 저장합니다. + scaler_save_path = os.path.join(save_dir, "multi_horizon_scaler.pkl") + with open(scaler_save_path, "wb") as f: + pickle.dump(info['scaler'], f) + + print(f"\n[완료] 학습 종료. 모델 및 스케일러가 저장되었습니다.") + print(f" - 모델 경로: {model_save_path}") + print(f" - 스케일러: {scaler_save_path}") + +if __name__ == "__main__": + train_single_pipeline() \ No newline at end of file diff --git a/AI/modules/signal/models/transformer/wrapper.py b/AI/modules/signal/models/transformer/wrapper.py new file mode 100644 index 00000000..a95b0ef7 --- /dev/null +++ b/AI/modules/signal/models/transformer/wrapper.py @@ -0,0 +1,113 @@ +# AI/modules/signal/models/transformer/wrapper.py +""" +[Transformer 모델 래퍼] +- BaseSignalModel 인터페이스를 구현한 실제 실행 클래스입니다. +- architecture.py에서 정의한 모델을 빌드하고, 학습/예측/저장 로직을 수행합니다. +""" + +import os +import numpy as np +import tensorflow as tf +from typing import Dict, Any, Optional +from AI.modules.signal.core.base_model import BaseSignalModel +from .architecture import build_transformer_model + +class TransformerSignalModel(BaseSignalModel): + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self.model_name = "transformer" + + def build(self, input_shape: tuple): + """설정(config)에 따라 모델 아키텍처 생성""" + # 차원 검증 + if len(input_shape) != 2: + # input_shape가 (timesteps, features) 2차원이 아니라면 경고 또는 에러 + # 일부 환경에서 (None, timesteps, features)로 올 수 있으므로 유연하게 처리 + if len(input_shape) == 3 and input_shape[0] is None: + input_shape = input_shape[1:] + else: + raise ValueError(f"입력 차원은 (timesteps, features) 2차원이어야 합니다. 현재: {input_shape}") + + self.model = build_transformer_model( + input_shape=input_shape, + head_size=self.config.get("head_size", 256), + num_heads=self.config.get("num_heads", 4), + ff_dim=self.config.get("ff_dim", 4), + num_transformer_blocks=self.config.get("num_blocks", 4), + mlp_units=self.config.get("mlp_units", [128]), + dropout=self.config.get("dropout", 0.4), + mlp_dropout=self.config.get("mlp_dropout", 0.25) + ) + + # 컴파일 + learning_rate = self.config.get("learning_rate", 1e-4) + self.model.compile( + loss="binary_crossentropy", + optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), + metrics=["accuracy", "AUC"] + ) + + def train( + self, + X_train: np.ndarray, + y_train: np.ndarray, + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None, + **kwargs + ): + """모델 학습 수행""" + if self.model is None: + raise ValueError("모델이 빌드되지 않았습니다. build()를 먼저 호출하세요.") + + # ✅ 호출자가 주면 우선, 없으면 config, 없으면 default + epochs = int(kwargs.pop("epochs", self.config.get("epochs", 50))) + batch_size = int(kwargs.pop("batch_size", self.config.get("batch_size", 32))) + verbose = int(kwargs.pop("verbose", 1)) + + # callbacks는 pop으로 빼서 중복 전달 방지 + callbacks = kwargs.pop("callbacks", []) + + # validation_data는 (X_val, y_val)이 둘 다 있을 때만 + validation_data = (X_val, y_val) if (X_val is not None and y_val is not None) else None + + history = self.model.fit( + X_train, y_train, + validation_data=validation_data, + epochs=epochs, + batch_size=batch_size, + callbacks=callbacks, + verbose=verbose, + **kwargs + ) + return history + + + + + def predict(self, X_input: np.ndarray, **kwargs) -> np.ndarray: + """추론 수행""" + if self.model is None: + raise ValueError("모델이 없습니다. load()하거나 build() 하세요.") + + # Keras 모델은 (batch, time, feat) 형태를 기대하므로 차원 확인 + if len(X_input.shape) == 2: + X_input = np.expand_dims(X_input, axis=0) + + return self.model.predict(X_input, **kwargs) + + def save(self, filepath: str): + """모델 저장""" + if self.model is None: + print("저장할 모델이 없습니다.") + return + + os.makedirs(os.path.dirname(filepath), exist_ok=True) + self.model.save(filepath) + print(f"모델 저장 완료: {filepath}") + + def load(self, filepath: str): + """모델 로드""" + if not os.path.exists(filepath): + raise FileNotFoundError(f"모델 파일이 없습니다: {filepath}") + + self.model = tf.keras.models.load_model(filepath) \ No newline at end of file diff --git a/AI/modules/signal/workflows/inference.py b/AI/modules/signal/workflows/inference.py index e48ce41b..a9bcc3e7 100644 --- a/AI/modules/signal/workflows/inference.py +++ b/AI/modules/signal/workflows/inference.py @@ -25,7 +25,7 @@ from AI.libs.database.connection import get_db_conn from AI.modules.signal.core.data_loader import DataLoader -from AI.modules.signal.core.features import add_technical_indicators +from AI.modules.features.legacy.technical_features import add_technical_indicators # ───────────────────────────────────────────────────────────────────────────── # 추론 함수 @@ -34,9 +34,9 @@ def run_inference(ticker: str, model_type: str = "transformer") -> float: print(f"=== [Inference] {ticker} 예측 시작 ===") # 1. 경로 설정 및 파일 확인 - base_dir = os.path.join(project_root, "AI", "data", "weights", model_type) - model_path = os.path.join(base_dir, "universal_transformer.keras") - scaler_path = os.path.join(base_dir, "universal_scaler.pkl") + base_dir = os.path.join(project_root, "AI", "data", "weights", "prod" , model_type) + model_path = os.path.join(base_dir, "universal_transformer_prod.keras") + scaler_path = os.path.join(base_dir, "universal_scaler__prod.pkl") if not os.path.exists(model_path) or not os.path.exists(scaler_path): print(f"[Err] 학습된 모델이나 스케일러가 없습니다.") diff --git a/AI/modules/signal/workflows/optimize_thresholds.py b/AI/modules/signal/workflows/optimize_thresholds.py index da78b631..de3f8f1b 100644 --- a/AI/modules/signal/workflows/optimize_thresholds.py +++ b/AI/modules/signal/workflows/optimize_thresholds.py @@ -25,7 +25,7 @@ from AI.libs.database.connection import get_db_conn from AI.modules.signal.core.data_loader import DataLoader from AI.modules.signal.models.PatchTST.architecture import build_transformer_model -from AI.modules.signal.core.features import add_technical_indicators, add_multi_timeframe_features +from AI.modules.features.legacy.technical_features import add_technical_indicators, add_multi_timeframe_features def optimize_thresholds(): print("==================================================") diff --git a/AI/modules/signal/workflows/train_calibrator.py b/AI/modules/signal/workflows/train_calibrator.py index 0829f43c..7d91c118 100644 --- a/AI/modules/signal/workflows/train_calibrator.py +++ b/AI/modules/signal/workflows/train_calibrator.py @@ -1,4 +1,4 @@ -# AI/modules/signal/workflows/train_calibrator.py +#ㄴ AI/modules/signal/workflows/train_calibrator.py """ [Meta-Labeling / Calibration Model Training] - Stage 1: Transformer 모델이 예측한 점수(Probability)를 가져옵니다. @@ -25,7 +25,7 @@ from AI.modules.signal.core.data_loader import DataLoader from AI.modules.signal.models.PatchTST.architecture import build_transformer_model -from AI.modules.signal.core.features import add_technical_indicators, add_multi_timeframe_features +from AI.modules.features.legacy.technical_features import add_technical_indicators, add_multi_timeframe_features def train_calibrator(): print("==================================================") diff --git a/AI/modules/trader/backtest/run_backtrader_single.py b/AI/modules/trader/backtest/run_backtrader_single.py index aba5d47b..af2898e3 100644 --- a/AI/modules/trader/backtest/run_backtrader_single.py +++ b/AI/modules/trader/backtest/run_backtrader_single.py @@ -8,14 +8,27 @@ import sys import os +import warnings + +# 지저분한 경고 무시 +# [설정 1] TensorFlow oneDNN 최적화 메시지 숨기기 +os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' + +# [설정 2] TensorFlow 일반 로그(INFO, WARNING) 숨기기 (ERROR만 출력) +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' + +# [설정 3] NumPy/Keras의 np.object 관련 FutureWarning 숨기기 +warnings.filterwarnings("ignore", category=FutureWarning) +warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') +warnings.filterwarnings("ignore", category=UserWarning, module='pandas') + +import shutil import pickle import backtrader as bt import pandas as pd import numpy as np import tensorflow as tf -import warnings -# 지저분한 sklearn 경고 무시 -warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') + # 프로젝트 루트 경로 설정 current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -23,11 +36,9 @@ if project_root not in sys.path: sys.path.append(project_root) -# 모델 아키텍처 불러오기 -from AI.modules.signal.models.PatchTST.architecture import build_transformer_model -from AI.modules.signal.core.features import add_technical_indicators, add_multi_timeframe_features +from AI.modules.signal.models.transformer.architecture import build_transformer_model +from AI.modules.features.legacy.technical_features import add_technical_indicators, add_multi_timeframe_features from AI.modules.trader.strategies.rule_based import RuleBasedStrategy -# [추가] 실제 개수 파악을 위한 로더 from AI.modules.signal.core.data_loader import DataLoader class MultiHorizonScoreObserver(bt.Observer): @@ -85,17 +96,14 @@ def log(self, txt, dt=None): def _load_model(self): path = self.p.model_path if not path or not os.path.exists(path): - self.log("⚠️ 모델 파일이 없습니다.") + self.log(f"⚠️ 모델 파일이 없습니다: {path}") return None try: - # [수정] DataLoader를 통해 실제 DB에 저장된 종목/섹터 개수를 가져옵니다. - # 그래야 저장된 가중치 파일(weights)과 크기가 딱 맞습니다. + # DataLoader를 통해 실제 DB에 저장된 종목/섹터 개수를 가져옵니다. loader = DataLoader() real_n_tickers = len(loader.ticker_to_id) real_n_sectors = len(loader.sector_to_id) - - # self.log(f"DEBUG: Tickers={real_n_tickers}, Sectors={real_n_sectors}") # 모델 껍데기 생성 (동적 크기 할당) model = build_transformer_model( @@ -105,13 +113,45 @@ def _load_model(self): n_outputs=4 ) - # 가중치 로드 - model.load_weights(path) - self.log("✅ 멀티 호라이즌 AI 모델 로드 완료") - return model + # ------------------------------------------------------------------ + # [핵심 수정] HDF5 / Zip 포맷 호환성 처리 + # ------------------------------------------------------------------ + try: + # 1차 시도: 기본 로드 (.keras = Zip 포맷 가정) + model.load_weights(path) + self.log("✅ 멀티 호라이즌 AI 모델 로드 완료 (Standard)") + return model + + except Exception as e: + # 에러 메시지에 'zip' 혹은 'header' 관련 내용이 있다면 포맷 불일치 가능성 높음 + if "not a zip file" in str(e) or "header" in str(e): + self.log(f"⚠️ Zip 포맷 로드 실패 ({e}). HDF5 방식으로 재시도합니다.") + + # 확장자를 .h5로 변경한 임시 파일 생성 (Keras가 확장자를 보고 로더를 결정함) + temp_h5_path = path.replace(".keras", "_temp_fallback.h5") + + try: + shutil.copyfile(path, temp_h5_path) + # 임시 파일로 가중치 로드 + model.load_weights(temp_h5_path) + self.log("✅ 멀티 호라이즌 AI 모델 로드 완료 (HDF5 Fallback)") + + # 성공 시 모델 반환 (임시 파일 삭제는 finally에서) + return model + except Exception as e_h5: + self.log(f"❌ HDF5 로드도 실패했습니다: {e_h5}") + return None + finally: + # 임시 파일 정리 + if os.path.exists(temp_h5_path): + os.remove(temp_h5_path) + else: + # 다른 종류의 에러라면 그대로 출력 + self.log(f"⚠️ 모델 가중치 로드 중 알 수 없는 오류: {e}") + return None except Exception as e: - self.log(f"⚠️ 모델 로드 실패: {e}") + self.log(f"⚠️ 모델 초기화 실패: {e}") return None def _load_scaler(self): @@ -194,19 +234,29 @@ def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2025-0 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) weights_dir = os.path.join(project_root, "AI/data/weights/transformer") + + # [주의] 파일 경로가 정확한지 확인 (경로 오류 방지) model_path = os.path.join(weights_dir, "tests/multi_horizon_model_test.keras") scaler_path = os.path.join(weights_dir, "tests/multi_horizon_scaler_test.pkl") # 2. 데이터 로드 (DB 연결) from AI.libs.database.connection import get_db_conn conn = get_db_conn() + + # [수정] SQLAlchemy 경고 방지를 위해 try-except 및 read_sql_query 권장 query = f""" SELECT date, open, high, low, close, volume, adjusted_close, ticker FROM price_data WHERE ticker = '{ticker}' AND date >= '2022-01-01' ORDER BY date ASC """ - df = pd.read_sql(query, conn) + try: + df = pd.read_sql(query, conn) + except Exception as e: + print(f"❌ 데이터베이스 조회 실패: {e}") + conn.close() + return + conn.close() if df.empty: @@ -216,11 +266,20 @@ def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2025-0 df['date'] = pd.to_datetime(df['date']) print(">> 지표 생성 중...") - df = add_technical_indicators(df) - df = add_multi_timeframe_features(df) + try: + df = add_technical_indicators(df) + df = add_multi_timeframe_features(df) + except Exception as e: + print(f"⚠️ 지표 생성 중 오류 발생: {e}") + return mask = (df['date'] >= start_date) & (df['date'] <= end_date) backtest_df = df.loc[mask].copy() + + if backtest_df.empty: + print(f"❌ 해당 기간({start_date}~{end_date}) 데이터 없음") + return + backtest_df.set_index('date', inplace=True) # 3. Backtrader 설정 @@ -249,15 +308,26 @@ def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2025-0 cerebro.addanalyzer(bt.analyzers.Returns, _name='returns') print(f"💰 시작 자산: {cerebro.broker.getvalue():,.0f}원") - results = cerebro.run() + try: + results = cerebro.run() + except Exception as e: + print(f"❌ 백테스트 실행 중 오류 발생: {e}") + return + strat = results[0] final_val = cerebro.broker.getvalue() - mdd = strat.analyzers.drawdown.get_analysis()['max']['drawdown'] # MDD(Maximum Drawdown): 최대 낙폭 - # 안전하게 가져오기 (None이면 0.0으로) - sharpe_analysis = strat.analyzers.sharpe.get_analysis() - sharpe = sharpe_analysis.get('sharperatio') - if sharpe is None: + + # 결과 분석 (안전하게 가져오기) + try: + mdd_analysis = strat.analyzers.drawdown.get_analysis() + mdd = mdd_analysis.get('max', {}).get('drawdown', 0.0) + + sharpe_analysis = strat.analyzers.sharpe.get_analysis() + sharpe = sharpe_analysis.get('sharperatio') + if sharpe is None: sharpe = 0.0 + except: + mdd = 0.0 sharpe = 0.0 print("\n" + "="*40) @@ -270,7 +340,10 @@ def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2025-0 print("="*40 + "\n") if enable_plot: - cerebro.plot(style='candlestick', volume=False) + try: + cerebro.plot(style='candlestick', volume=False) + except Exception as e: + print(f"⚠️ 차트 그리기 실패: {e}") if __name__ == "__main__": # 원하는 종목으로 변경해서 테스트 (예: AAPL, 005930, TSLA) @@ -295,4 +368,4 @@ def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2025-0 run_single_backtest(ticker="KO", start_date="2024-01-01", end_date="2025-01-01") print("대상:PFE,방어형") run_single_backtest(ticker="PFE", start_date="2024-01-01", end_date="2025-01-01") - print("멀티 호라이즌 AI 모델 백테스트 종료") + print("멀티 호라이즌 AI 모델 백테스트 종료") \ No newline at end of file diff --git a/AI/modules/trader/backtest/run_portfolio.py b/AI/modules/trader/backtest/run_portfolio.py index 8d82b6b7..df265587 100644 --- a/AI/modules/trader/backtest/run_portfolio.py +++ b/AI/modules/trader/backtest/run_portfolio.py @@ -19,7 +19,7 @@ from AI.modules.signal.core.data_loader import DataLoader from AI.modules.signal.models import get_model -from AI.modules.signal.core.features import add_technical_indicators +from AI.modules.features.legacy.technical_features import add_technical_indicators from AI.modules.trader.strategies.portfolio_logic import calculate_portfolio_allocation class AIPortfolioStrategy(bt.Strategy): diff --git a/AI/tests/test_backtrader.py b/AI/tests/test_backtrader.py index 9cf7c9ae..50ae88f7 100644 --- a/AI/tests/test_backtrader.py +++ b/AI/tests/test_backtrader.py @@ -28,7 +28,7 @@ from AI.modules.signal.core.data_loader import DataLoader from AI.modules.signal.models import get_model from AI.modules.trader.backtrader.backtrader_engine import run_backtrader_engine -from AI.modules.signal.core.features import add_technical_indicators +from AI.modules.features.legacy.technical_features import add_technical_indicators # ───────────────────────────────────────────────────────────────────────────── # 테스트 헬퍼 함수 diff --git a/schema.sql b/schema.sql index e58ee7e1..0c5007fd 100644 --- a/schema.sql +++ b/schema.sql @@ -95,7 +95,8 @@ CREATE TABLE "company_fundamentals" ( "roe" numeric(18, 6), -- 자기자본이익률 (ROE) "debt_ratio" numeric(18, 6), -- 부채비율 "operating_cash_flow" numeric(30, 6), -- 영업활동현금흐름 - "interest_coverage" numeric(10, 2);, -- 이자보상배율 + "interest_coverage" numeric(10, 2), -- 이자보상배율 + "shares_issued" numeric(20, 2), -- 유통 주식 수 CONSTRAINT "company_fundamentals_pkey" PRIMARY KEY("ticker","date") );