Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ __pycache__/
/venv/
/env
/.vs
/.venv/

# ===== Backend =====
backend/src/main/java/org/sejongisc/backend/stock/TestController.java
2 changes: 1 addition & 1 deletion AI/configs/config.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
{
"db": {
"host": "ep-misty-lab-adgec0kl-pooler.c-2.us-east-1.aws.neon.tech",
"user": "neondb_owner",
Expand Down
260 changes: 144 additions & 116 deletions AI/libs/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os
import os
import sys
from typing import List, Dict
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from datetime import datetime, timedelta, timezone
import pandas as pd

# --- 프로젝트 루트 경로 설정 ---
Expand All @@ -16,160 +15,189 @@
from libs.utils.fetch_ohlcv import fetch_ohlcv
from xai.run_xai import run_xai
from libs.utils.get_db_conn import get_db_conn
from libs.utils.save_reports_to_db import save_reports_to_db
# ---------------------------------

# DB 이름 상수(실제 등록된 키와 반드시 일치해야 함)
MARKET_DB_NAME = "db" # 시세/원천 데이터 DB
REPORT_DB_NAME = "report_DB" # 리포트 저장 DB

REQUIRED_LOG_COLS = {
"ticker", "date", "action", "price",
"feature1", "feature2", "feature3",
"prob1", "prob2", "prob3"
}

def run_weekly_finder() -> List[str]:
"""
주간 종목 발굴(Finder)을 실행하고 결과(종목 리스트)를 반환합니다.
"""
print("--- [PIPELINE-STEP 1] Finder 모듈 실행 시작 ---")
#top_tickers = run_finder()
top_tickers = ['AAPL', 'MSFT', 'GOOGL'] # 임시 데이터
print(f"--- [PIPELINE-STEP 1] Finder 모듈 실행 완료 ---")
# top_tickers = run_finder()
top_tickers = ["AAPL", "MSFT", "GOOGL"] # 임시 데이터
print("--- [PIPELINE-STEP 1] Finder 모듈 실행 완료 ---")
return top_tickers

def run_signal_transformer(tickers: List[str], config: Dict) -> pd.DataFrame:
def _utcnow() -> datetime:
return datetime.now(timezone.utc)

def run_signal_transformer(tickers: List[str], db_name: str) -> pd.DataFrame:
"""
종목 리스트를 받아 Transformer 모듈을 실행하고, 신호(결정 로그)를 반환합니다.
"""
try:
with open(os.path.join(project_root, 'configs', 'config.json'), 'r') as f:
config = json.load(f)
except FileNotFoundError:
print("Config file not found")
except json.JSONDecodeError:
print("Invalid JSON format in config file")
db_config = (config or {}).get("db", {}) # ★ db 섹션만 추출
print("--- [PIPELINE-STEP 2] Transformer 모듈 실행 시작 ---")

# --- 실제 Transformer 모듈 호출 ---
end_date = datetime.now()

if not tickers:
print("[WARN] 빈 종목 리스트가 입력되어 Transformer를 건너뜁니다.")
return pd.DataFrame()

#end_date = _utcnow() # 한국 시간 기준 당일 종가까지 사용, 서버 사용시 주석 해제
end_date = datetime.strptime("2024-10-30", "%Y-%m-%d") #임시 고정 날짜
start_date = end_date - timedelta(days=600)
Comment on lines +54 to 56
Copy link

@coderabbitai coderabbitai bot Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

고정된 end_date 때문에 2024-10-30 이후 데이터가 모두 빠집니다.

Line 55에서 end_date를 2024-10-30으로 고정해 두면, 오늘(2025-11-05) 실행 시에도 1년 이상 지난 가격만 조회되고 최신 신호·리포트가 전혀 생성되지 않습니다. _utcnow()를 복원하거나 최소한 현재 일자를 기준으로 계산하도록 반드시 되돌려 주세요.

-    #end_date = _utcnow() # 한국 시간 기준 당일 종가까지 사용, 서버 사용시 주석 해제
-    end_date = datetime.strptime("2024-10-30", "%Y-%m-%d") #임시 고정 날짜
+    end_date = _utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
🤖 Prompt for AI Agents
In AI/libs/core/pipeline.py around lines 54 to 56, end_date is hard-coded to
2024-10-30 which excludes all post-2024-10-30 data; restore dynamic calculation
by replacing the fixed date with the current UTC date (e.g., call _utcnow() or
datetime.utcnow()) so end_date reflects "today" and then compute start_date as
end_date - timedelta(days=600); ensure you use the same timezone/UTC helper used
elsewhere (_utcnow()) to keep consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

추후변경예정
이슈 등록해놔

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

@twq110, 알겠습니다! 나중에 수정하시겠다는 점 확인했습니다. 이 문제를 추적하기 위해 이슈를 등록해드리겠습니다.


🏁 Script executed:

#!/bin/bash
# GitHub CLI를 사용하여 이슈 생성
gh issue create \
  --repo SISC-IT/sisc-web \
  --title "파이프라인 end_date 하드코딩 제거 필요" \
  --body "## 설명
AI/libs/core/pipeline.py의 run_signal_transformer 함수에서 end_date가 2024-10-30으로 고정되어 있어, 최신 데이터가 조회되지 않는 문제가 있습니다.

## 위치
- 파일: AI/libs/core/pipeline.py
- 라인: 54-56

## 현재 코드
\`\`\`python
#end_date = _utcnow() # 한국 시간 기준 당일 종가까지 사용, 서버 사용시 주석 해제
end_date = datetime.strptime(\"2024-10-30\", \"%Y-%m-%d\") #임시 고정 날짜
\`\`\`

## 필요한 작업
현재 UTC 날짜를 기준으로 동적으로 계산하도록 변경이 필요합니다.

## 참조
- PR: https://github.com/SISC-IT/sisc-web/pull/80
- 리뷰 코멘트: https://github.com/SISC-IT/sisc-web/pull/80#discussion_r2493423198
- 요청자: @twq110" \
  --assignee twq110

Length of output: 45


@twq110, 이슈가 성공적으로 등록되었습니다! 🎫

이슈 링크: #82

이슈에 현재 상황과 필요한 작업 내용을 정리해두었고, twq110님께 할당해두었습니다. 나중에 편하실 때 수정하시면 되겠습니다!


🧬 Code Graph Analysis Results

<AI/xai/run_xai.py> (lines 29-33)

def run_xai(decision: Dict[str, Any], api_key: str) -> str:

    evidence = decision.pop("evidence", [])
    report = generate_report_from_yf(decision, evidence, api_key)
    return report

<AI/libs/utils/get_db_conn.py> (lines 105-118)

def get_db_conn(db_name: str):
    """
    - psycopg2 로우 커넥션 생성(직접 커서 열어 사용할 때)
    - pandas 경고가 싫다면 read_sql에는 get_engine() 사용을 권장
    """
    cfg = _get_db_config(db_name)
    return psycopg2.connect(
        host=cfg["host"],
        user=cfg["user"],
        password=cfg["password"],
        dbname=cfg["dbname"],
        port=int(cfg.get("port", 5432)),
        sslmode=cfg.get("sslmode", None),  # 필요 시 자동 적용
    )

<AI/libs/utils/save_reports_to_db.py> (lines 65-99)

def save_reports_to_db(rows: List[ReportRow], db_name: str) -> int:
    """
    한국어 주석:
    - SQLAlchemy로 안전하게 INSERT.
    - pandas 경고 제거, 커넥션 관리 자동화, 프리핑으로 죽은 커넥션 방지.
    """
    if not rows:
        print("[INFO] 저장할 리포트가 없습니다.")
        return 0

    engine = get_engine(db_name)
    ensure_table_schema(engine)

    created_at = utcnow()
    params = build_insert_params(rows, created_at)
    if not params:
        print("[WARN] 유효한 저장 파라미터가 없어 INSERT를 건너뜁니다.")
        return 0

    insert_sql = text("""
        INSERT INTO public.xai_reports (ticker, signal, price, date, report, created_at)
        VALUES (:ticker, :signal, :price, :date, :report, :created_at)
    """)

    inserted = 0
    # 대량이면 청크 분할 권장
    CHUNK = 1000
    with engine.begin() as conn:
        for i in range(0, len(params), CHUNK):
            batch = params[i:i+CHUNK]
            conn.execute(insert_sql, batch)
            inserted += len(batch)

    print(f"--- {inserted}개의 XAI 리포트가 데이터베이스에 저장되었습니다. ---")
    return inserted

<AI/finder/main.py> (lines 17-72)

def run_finder():
    """
    전체 프로세스를 조율하여 최종 Top 3 투자 종목 반환
    """
    # --- 1단계: 의존성 객체 및 데이터 준비 ---
    try:
        llm = get_ollama_client()  # ✅ 헬스체크 및 모델 확인 포함
    except Exception as e:
        print(str(e))
        return []

    csv_path = os.path.join(project_root, "data", "stability_score_2025.csv")

    try:
        stability_df = pd.read_csv(csv_path)
    except FileNotFoundError:
        print(f"오류: {csv_path} 파일을 찾을 수 없습니다.")
        return []

    # --- 2단계: 주간 뉴스 데이터 수집 및 요약 ---
    try:
        weekly_news_df = news_processing.get_weekly_news_summary(days=5, llm_client=llm)
    except requests.exceptions.ConnectionError as e:
        print(f"[LLM 연결 오류] 뉴스 요약 단계에서 LLM 서버 연결 실패: {e}")
        return []
    except requests.exceptions.Timeout as e:
        print(f"[LLM 타임아웃] 뉴스 요약 단계에서 응답 지연: {e}")
        return []
    except Exception as e:
        print(f"[예기치 못한 오류] 뉴스 요약 단계: {e}")
        return []

    if weekly_news_df is None or getattr(weekly_news_df, "empty", False):
        print("분석할 뉴스 데이터가 없어 프로세스를 종료합니다.")
        return []

    # --- 3단계: 뉴스 데이터와 재무 데이터를 기반으로 Top 3 종목 선정 ---
    try:
        top_3_tickers = ticker_selector.select_top_stocks(
            news_summary_df=weekly_news_df,
            stability_df=stability_df,
            llm_client=llm
        )
    except requests.exceptions.ConnectionError as e:
        print(f"[LLM 연결 오류] 종목 선정 단계에서 LLM 서버 연결 실패: {e}")
        return []
    except requests.exceptions.Timeout as e:
        print(f"[LLM 타임아웃] 종목 선정 단계에서 응답 지연: {e}")
        return []
    except Exception as e:
        print(f"[예기치 못한 오류] 종목 선정 단계: {e}")
        return []

    print("\n🎉 [Finder 모듈 최종 결과] 투자 추천 Top 3 종목 🎉")
    print(top_3_tickers)
    return top_3_tickers

<AI/libs/utils/fetch_ohlcv.py> (lines 10-68)

def fetch_ohlcv(
    ticker: str,
    start: str,
    end: str,
    interval: str = "1d",
    db_name: str = "db",
) -> pd.DataFrame:
    """
    특정 티커, 날짜 범위의 OHLCV 데이터를 DB에서 불러오기 (SQLAlchemy 엔진 사용)

    Args:
        ticker (str): 종목 코드 (예: "AAPL")
        start (str): 시작일자 'YYYY-MM-DD' (inclusive)
        end (str): 종료일자 'YYYY-MM-DD' (inclusive)
        interval (str): 데이터 간격 ('1d' 등) - 현재 테이블이 일봉만 제공하면 무시됨
        db_name (str): get_engine()가 참조할 설정 블록 이름 (예: "db", "report_DB")

    Returns:
        pd.DataFrame: 컬럼 = [ticker, date, open, high, low, close, adjusted_close, volume]
                      (date 컬럼은 pandas datetime으로 변환됨)
    """

    # 1) SQLAlchemy engine 얻기 ( configs/config.json 기준 )
    engine = get_engine(db_name)

    # 2) 쿼리: named parameter(:ticker 등) 사용 -> 안전하고 가독성 좋음
    #    - interval 분기가 필요하면 테이블/파티션 구조에 따라 쿼리를 분기하도록 확장 가능
    query = text("""
        SELECT ticker, date, open, high, low, close, adjusted_close, volume
        FROM public.price_data
        WHERE ticker = :ticker
          AND date BETWEEN :start AND :end
        ORDER BY date;
    """)

    # 3) DB에서 읽기 (with 문으로 커넥션 자동 정리)
    with engine.connect() as conn:
        df = pd.read_sql(
            query,
            con=conn,  # 꼭 키워드 인자로 con=conn
            params={"ticker": ticker, "start": start, "end": end},  # 튜플 X, 딕셔너리 O
            )

    # 4) 후처리: 컬럼 정렬 및 date 타입 통일
    if df is None or df.empty:
        # 빈 DataFrame이면 일관된 컬럼 스키마로 반환
        return pd.DataFrame(columns=["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"])

    # date 컬럼을 datetime으로 변경 (UTC로 맞추고 싶으면 pd.to_datetime(..., utc=True) 사용)
    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])

    # 선택: 컬럼 순서 고정 (일관성 유지)
    desired_cols = ["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"]
    # 존재하는 컬럼만 가져오기
    cols_present = [c for c in desired_cols if c in df.columns]
    df = df.loc[:, cols_present]

    return df

<AI/transformer/main.py> (lines 12-84)

def run_transformer(
    *,
    finder_df: pd.DataFrame,
    seq_len: int,
    pred_h: int,
    raw_data: pd.DataFrame,
    run_date: Optional[str] = None,
    weights_path: Optional[str] = None,
    interval: str = "1d",
) -> Dict[str, pd.DataFrame]:
    """

    Parameters
    ----------
    finder_df : pd.DataFrame
        ['ticker'] 컬럼 포함. Finder 단계에서 선정된 추론 대상 종목 목록.
    seq_len : int
        모델 입력 시퀀스 길이(예: 64).
    pred_h : int
        예측 지평(예: 5). 라벨링/정책 기준(로그, 가중치 산정 보조)에 쓰이며
        추론 확률 계산 자체에는 직접 관여하지 않음.
    raw_data : pd.DataFrame
        OHLCV 시계열. 필수 컬럼:
        ['ticker','open','high','low','close','volume', ('ts_local' or 'date')]
    run_date : Optional[str]
        'YYYY-MM-DD' 형식. 지정 시, 해당 날짜(포함)까지의 데이터만 사용해 추론.
        미지정 시, Asia/Seoul 기준 당일 종가까지 사용.
    config : Optional[dict]
        config["transformer"]["model_path"] 에 학습된 가중치 경로가 존재해야 함.
        예) {"transformer": {"model_path": "artifacts/transformer_cls.h5"}}
        (추후 추론 방식 옵션이 늘어나면 이 dict 에 플래그/파라미터를 확장하세요.)
    interval : str
        캔들 간격 표기(로그용). 예: '1d', '1h' 등.

    Returns
    -------
    Dict[str, pd.DataFrame]
        {"logs": DataFrame} 형식.
        컬럼: ["ticker","date","action","price","weight",
               "feature1","feature2","feature3","prob1","prob2","prob3"]

    Notes
    -----
    - 이 래퍼는 '이름/시그니처의 안정성' 확보가 목적입니다.
      내부 추론 엔진이 변경되어도 외부 호출부 수정 없이 교체가 가능합니다.
    """

    # 1) weights_path 경로지정
    PROJECT_ROOT = Path(__file__).resolve().parents[1]

    weights_dir = PROJECT_ROOT / "transformer" / "weights"
    candidate = weights_dir / "initial.weights.h5"

    weights_path = str(candidate)
    if candidate.exists():
            
            print(f"[TRANSFORMER] weights_path 설정됨: {weights_path}")

    if not weights_path:
        print("[TRANSFORMER][WARN] weights_path 미설정 → 가중치 없이 랜덤 초기화로 추론될 수 있음(품질 저하).")
        print("  config 예시: {'transformer': {'weights_path': 'weights/initial.weights.h5'}}")


    # 2) 실제 추론 실행(모듈 위임)
    return run_inference(
        finder_df=finder_df,
        raw_data=raw_data,
        seq_len=seq_len,
        pred_h=pred_h,
        weights_path=weights_path,   # ★ 학습 가중치 경로 전달
        run_date=run_date,
        interval=interval,
    )

all_ohlcv_df = []

all_ohlcv_df: List[pd.DataFrame] = []
for ticker in tickers:
ohlcv_df = fetch_ohlcv(
ticker=ticker,
start=start_date.strftime('%Y-%m-%d'),
end=end_date.strftime('%Y-%m-%d'),
config=db_config
)
ohlcv_df['ticker'] = ticker
all_ohlcv_df.append(ohlcv_df)
try:
ohlcv_df = fetch_ohlcv(
ticker=ticker,
start=start_date.strftime("%Y-%m-%d"),
end=end_date.strftime("%Y-%m-%d"),
db_name=db_name
)
if ohlcv_df is None or ohlcv_df.empty:
print(f"[WARN] OHLCV 미수집: {ticker}")
continue
ohlcv_df = ohlcv_df.copy()
ohlcv_df["ticker"] = ticker
all_ohlcv_df.append(ohlcv_df)
except Exception as e:
print(f"[ERROR] OHLCV 수집 실패({ticker}): {e}")

if not all_ohlcv_df:
print("OHLCV 데이터를 가져오지 못했습니다.")
print("[ERROR] 어떤 티커에서도 OHLCV 데이터를 가져오지 못했습니다.")
return pd.DataFrame()

raw_data = pd.concat(all_ohlcv_df, ignore_index=True)
finder_df = pd.DataFrame(tickers, columns=['ticker'])
transformer_result = run_transformer(

finder_df = pd.DataFrame(tickers, columns=["ticker"])
transformer_result: Dict = run_transformer(
finder_df=finder_df,
seq_len=60,
pred_h=1,
raw_data=raw_data,
config=config
)
logs_df = transformer_result.get("logs", pd.DataFrame())

# --- 임시 결정 로그 데이터 (주석 처리) ---
# data = {
# 'ticker': ['AAPL', 'GOOGL', 'MSFT'],
# 'date': ['2025-09-17', '2025-09-17', '2025-09-17'],
# 'action': ['SELL', 'BUY', 'SELL'],
# 'price': [238.99, 249.52, 510.01],
# 'weight': [0.16, 0.14, 0.15],
# 'feature1': ['RSI', 'Stochastic', 'MACD'],
# 'feature2': ['MACD', 'MA_5', 'ATR'],
# 'feature3': ['Bollinger_Bands_lower', 'RSI', 'MA_200'],
# 'prob1': [0.5, 0.4, 0.6],
# 'prob2': [0.3, 0.25, 0.2],
# 'prob3': [0.1, 0.15, 0.1]
# }
# logs_df = pd.DataFrame(data)

print(f"--- [PIPELINE-STEP 2] Transformer 모듈 실행 완료 ---")
raw_data=raw_data
) or {}

logs_df: pd.DataFrame = transformer_result.get("logs", pd.DataFrame())
if logs_df is None or logs_df.empty:
print("[WARN] Transformer 결과 로그가 비어 있습니다.")
return pd.DataFrame()

# 필수 컬럼 검증
missing_cols = REQUIRED_LOG_COLS - set(logs_df.columns)
if missing_cols:
print(f"[ERROR] 결정 로그에 필수 컬럼 누락: {sorted(missing_cols)}")
return pd.DataFrame()

print("--- [PIPELINE-STEP 2] Transformer 모듈 실행 완료 ---")
return logs_df

def run_xai_report(decision_log: pd.DataFrame) -> List[str]:
# --- 안전 변환 유틸 ---
def _to_iso_date(v) -> str:
import pandas as pd
from datetime import datetime
try:
if isinstance(v, (pd.Timestamp, datetime)):
return v.strftime("%Y-%m-%d")
return str(v)
except Exception:
return str(v)

def _to_float(v, fallback=0.0) -> float:
try:
return float(v)
except Exception:
return float(fallback)

# --- XAI 리포트: 5-튜플(rows)로 반환 ---
from typing import List, Tuple

def run_xai_report(decision_log: pd.DataFrame) -> List[Tuple[str, str, float, str, str]]:
"""
결정 로그를 바탕으로 실제 XAI 리포트를 생성합니다.
save_reports_to_db()가 기대하는 형식:
rows = List[ (ticker, signal, price, date_str, report_text) ]
"""
print("--- [PIPELINE-STEP 3] XAI 리포트 생성 시작 ---")
api_key = os.environ.get("GROQ_API_KEY")
if not api_key:
raise ValueError("XAI 리포트 생성을 위해 GROQ_API_KEY 환경 변수를 설정해주세요.")
reports = []
if decision_log.empty:
return reports
print("[STOP] GROQ_API_KEY 미설정: XAI 리포트 단계를 건너뜁니다.")
return []

if decision_log is None or decision_log.empty:
print("[WARN] 비어있는 결정 로그가 입력되어 XAI 리포트를 생성하지 않습니다.")
return []

rows: List[Tuple[str, str, float, str, str]] = []

for _, row in decision_log.iterrows():
decision = {
"ticker": row['ticker'],
"date": row['date'],
"signal": row['action'],
"price": row['price'],
ticker = str(row.get("ticker", "UNKNOWN"))
date_s = _to_iso_date(row.get("date", ""))
signal = str(row.get("action", ""))
price = _to_float(row.get("price", 0.0))

# evidence 등은 DB에 안 넣는 설계로 보이므로 내부 호출에만 사용
decision_payload = {
"ticker": ticker,
"date": date_s,
"signal": signal,
"price": price,
"evidence": [
{"feature_name": row['feature1'], "contribution": row['prob1']},
{"feature_name": row['feature2'], "contribution": row['prob2']},
{"feature_name": row['feature3'], "contribution": row['prob3']},
]
{"feature_name": str(row.get("feature1", "")), "contribution": _to_float(row.get("prob1", 0.0))},
{"feature_name": str(row.get("feature2", "")), "contribution": _to_float(row.get("prob2", 0.0))},
{"feature_name": str(row.get("feature3", "")), "contribution": _to_float(row.get("prob3", 0.0))},
],
}

try:
report = run_xai(decision, api_key)
reports.append(report)
print(f"--- {row['ticker']} XAI 리포트 생성 완료 ---")
report_text = run_xai(decision_payload, api_key)
report_text = str(report_text) # 혹시 모를 비문자 타입 대비
print(f"--- {ticker} XAI 리포트 생성 완료 ---")
except Exception as e:
error_message = f"--- {row['ticker']} XAI 리포트 생성 중 오류 발생: {e} ---"
print(error_message)
reports.append(error_message)
print(f"--- [PIPELINE-STEP 3] XAI 리포트 생성 완료 ---")
return reports
report_text = f"[ERROR] XAI 리포트 생성 실패: {e}"
print(f"--- {ticker} XAI 리포트 생성 중 오류: {e} ---")

def save_reports_to_db(reports: List[str], config: Dict):
"""
생성된 XAI 리포트를 데이터베이스에 저장합니다.
"""
db_config = config.get("report_DB", {})
conn = get_db_conn(db_config)
cursor = conn.cursor()
insert_query = """
INSERT INTO xai_reports (report_text, created_at)
VALUES (%s, %s);
"""
for report in reports:
cursor.execute(insert_query, (report, datetime.now()))
conn.commit()
cursor.close()
conn.close()
print(f"--- {len(reports)}개의 XAI 리포트가 데이터베이스에 저장되었습니다. ---")

# --- 전체 파이프라인 실행 ---
def run_pipeline():
rows.append((ticker, signal, price, date_s, report_text))

print("--- [PIPELINE-STEP 3] XAI 리포트 생성 완료 ---")
return rows




def run_pipeline() -> Optional[List[str]]:
"""
전체 파이프라인(Finder -> Transformer -> XAI)을 실행합니다.
"""
#--- 설정 파일 로드 ---
config : Dict = {}
try:
with open(os.path.join(project_root, 'configs', 'config.json'), 'r') as f:
config = json.load(f)
except FileNotFoundError:
print("[WARN] configs/config.json 파일을 찾을 수 없어 DB 연결이 필요 없는 기능만 작동합니다.")

#--- 파이프라인 단계별 실행 ---
top_tickers = run_weekly_finder()
if not top_tickers:
print("Finder에서 종목을 찾지 못해 파이프라인을 중단합니다.")
# 1) Finder
tickers = run_weekly_finder()
if not tickers:
print("[STOP] Finder에서 종목을 찾지 못해 파이프라인을 중단합니다.")
return None
decision_log = run_signal_transformer(top_tickers, config)
if decision_log.empty:
print("Transformer에서 신호를 생성하지 못해 파이프라인을 중단합니다.")

# 2) Transformer
logs_df = run_signal_transformer(tickers, MARKET_DB_NAME)
if logs_df is None or logs_df.empty:
print("[STOP] Transformer에서 신호를 생성하지 못해 파이프라인을 중단합니다.")
return None
xai_reports = run_xai_report(decision_log)

save_reports_to_db(xai_reports, config)

return xai_reports
# 3) XAI
reports = run_xai_report(logs_df)

# 4) 저장
save_reports_to_db(reports, REPORT_DB_NAME)

return reports

# --- 테스트를 위한 실행 코드 ---
# --- 테스트 실행 ---
if __name__ == "__main__":
print(">>> 파이프라인 (Finder -> Transformer -> XAI) 테스트를 시작합니다.")
final_reports = run_pipeline()
Expand Down
Empty file removed AI/libs/db/.gitkeep
Empty file.
Loading