랭그래프 활용한 주식 정보 도출 프로젝트

- 투자 의견 조회
ㄴ 전반적인 주식 판단 기준(ex. 채무, 리스크 등등)을 기준으로 판단하여 도출
- 투자 추천
ex)
{
  "tickers": [
    "NVDA", "GOOGL", "AAPL"
  ],
  "risk_type": "aggressive"
}
This commit is contained in:
2026-06-19 18:03:05 +09:00
parent 3be7886bfe
commit 315105cebb
19 changed files with 614 additions and 68 deletions
@@ -0,0 +1,41 @@
import yfinance as yf
COMPETITOR_MAP = {
"NVDA": ["AMD", "INTC", "TSM", "AVGO"],
"AMD": ["NVDA", "INTC", "TSM"],
"TSLA": ["RIVN", "GM", "F"],
"AAPL": ["MSFT", "GOOGL", "AMZN"],
"MSFT": ["AAPL", "GOOGL", "AMZN"],
"GOOGL": ["MSFT", "META", "AMZN"],
}
async def get_competitor_info(ticker: str):
"""
경쟁 관계 회사에 대한 핵심 정보 추출
"""
competitors_tickers = COMPETITOR_MAP.get(ticker, [])
results =[]
for comp in competitors_tickers:
company = yf.Ticker(comp)
info = company.info
results.append(
{
#
"ticker": comp,
# 회사 이름
"company_name": info.get("longName"),
# 시가총액
"market_cap": info.get("marketCap"),
# per
"pe_ratio": info.get("pe_ratio"),
# 매출 성장률
"revenue_growth": info.get("revenue_growth"),
# 순 이익률
"profit_margin": info.get("profitMargins"),
}
)
return results
@@ -1,4 +1,5 @@
import yahooquery as yf
import yfinance as yf
# from yahooquery import Ticker
async def get_financial_info(ticker: str):
"""
@@ -7,30 +8,29 @@ async def get_financial_info(ticker: str):
company = yf.Ticker(ticker)
info = company.info
income = company.income_stmtf
income = company.income_stmt
balance = company.balance_sheet
cashflow = company.cash_flow
return {
# 시가 총액
"market_cap":info.get("marketCap"),
# 현재 주가
"current_price":info.get("currentPrice"),
# 시가총액
"market_cap": info.get("marketCap"),
# 현재주가
"current_price": info.get("currentPrice"),
# 매출
"revenue":income.loc["Total Revenue"].iloc[0],
"revenue": income.loc["Total Revenue"].iloc[0],
# 손익계산
"operating_income":income.loc["Operating Income"].iloc[0],
"operating_income": income.loc["Operating Income"].iloc[0],
# 순이익
"net_income":income.loc["Net Income"].iloc[0],
"net_income": income.loc["Net Income"].iloc[0],
# 총자산
"total_assets":balance.loc["Total Assets"].iloc[0],
"total_assets": balance.loc["Total Assets"].iloc[0],
# 총부채
"total_debt":balance.loc["Total Debt"].iloc[0],
"total_debt": balance.loc["Total Debt"].iloc[0],
# 현금흐름
"free_cash_flow":cashflow.loc["Free Cash Flow"].iloc[0],
"free_cash_flow": cashflow.loc["Free Cash Flow"].iloc[0],
# per
"pe_ratio":info.get("trailingPE"),
"pe_ratio": info.get("trailingPE"),
# pbr
"pb_ratio":info.get("priceToBook")
}
"pb_ratio": info.get("priceToBook"),
}
@@ -1,6 +1,6 @@
from langchain_community.utilities import GoogleSerperAPIWrapper
from backend.schemas.news_schemas import NewsItem
from backend.config.settings import settings
async def get_news(company_name: str):
@@ -8,8 +8,9 @@ async def get_news(company_name: str):
구글 뉴스 검색 후
title, snippet, url, source, date 추출
"""
search = GoogleSerperAPIWrapper(type="news")
results = await search.results(f"{company_name} stock news")
search = GoogleSerperAPIWrapper(type="news", serper_api_key=f"{settings.serper_api_key}")
# results = search.aresults(f"{company_name} stock news")
results = await search.aresults(f"{company_name} stock news")
news_list = []
for item in results['news'][:10]:
@@ -0,0 +1,71 @@
from sqlalchemy.orm import Session
from backend.schemas.stock_schemas import RecommendStock, RiskType
from backend.repository.models import StockAnalysis
from fastapi import HTTPException
from backend.ai.llm import hugging_llm
def portfolio_service(req, db):
"""
사용자가 입력한 종목에 대해서 분석정보 찾기
risk_type: 비중 조절
"""
analyses = db.query(StockAnalysis).filter(StockAnalysis.ticker.in_(req.tickers)).all()
portfolio_items = []
for analysis in analyses:
if not analysis.opinion:
continue
portfolio_items.append(
{
"ticker": analysis.ticker,
"score": analysis.opinion.score,
"rating": analysis.opinion.rating,
}
)
if not portfolio_items:
raise HTTPException(status_code=5000, detail="분석된 종목을 찾을 수 없습니다.")
# 투자성향
risk_type = req.risk_type
import math
for item in portfolio_items:
score = item["score"]
if risk_type == RiskType.AGGRESSIVE:
item['adjusted_score'] = score ** 1.5
elif risk_type == RiskType.CONSERVATIVE:
item['adjusted_score'] = math.sqrt(score)
else:
item['adjusted_score'] = score
total_adjusted = sum(item['adjusted_score'] for item in portfolio_items)
for item in portfolio_items:
item['weight'] = round(item['adjusted_score'] / total_adjusted * 100,2)
return {"risk_type": req.risk_type, "portfolio": portfolio_items}
async def generate_portfolio_report(portfolio_items, risk_type):
portfolio_text = "\n".join([f"{item['ticker']}: {item['weight']}%" for item in portfolio_items])
prompt = f"""
당신은 월가의 포트폴리오 전략가입니다.
투자성향
{risk_type.value}
포트폴리오:
{portfolio_text}
다음을 작성하시오.
1. 포트폴리오 요약
2. 강점
3. 약점
4. 주요 리스크
5. 추천 투자기간
6. 리밸런싱 전략
"""
result = await hugging_llm.ainvoke(prompt)
return result.content
@@ -0,0 +1,137 @@
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from backend.ai.llm import hugging_llm
from backend.prompts.all_prompt import INVESTOR_SENTIMENT_PROMPT
def get_rating(score):
# 최종판단
if score >= 85:
return "Strong Buy"
elif score >= 70:
return "Buy"
elif score >= 50:
return "Hold"
elif score >= 30:
return "Sell"
return "Strong Sell"
# 재무 정보 1. PER(20점)
def get_per_score(per):
if per is None:
return 0
if per < 20:
return 20
elif per < 35:
return 15
return 10
# 2. 순이익률
def get_profit_margin(financials):
net_income = financials.get("net_income")
revenue = financials.get("revenue")
if not revenue:
return 0, 0
profit_margin = net_income / revenue
profit_score = 0
if profit_margin > 0.3:
profit_score = 20
elif profit_margin > 0.15:
profit_score = 15
else:
profit_score = 10
return profit_score, profit_margin
# 3. 기술분석 (20점)
def get_tech_score(technicals):
technicals_score = 0
trend = technicals['trend']
if trend == 'bullish':
technicals_score += 10
macd = technicals['macd']
macd_signal = technicals['macd_signal']
if macd > macd_signal:
technicals_score += 10
return technicals_score
# 4. 경쟁사 분석(20점)
def get_com_score(competitor, per, profit_margin):
# 경쟁사 per 평균 비교
competitors_score = 0
pe_list = [c["pe_ratio"] for c in competitor if c["pe_ratio"] is not None]
if not pe_list:
avg_pe = None
else:
avg_pe = sum(pe_list) / len(pe_list)
if avg_pe is not None and per < avg_pe:
competitors_score += 10
profit_margin_list = [c["profit_margin"] for c in competitor if c["profit_margin"] is not None]
avg_profit_margin = sum(profit_margin_list) / len(profit_margin_list)
if profit_margin > avg_profit_margin:
competitors_score += 10
return competitors_score
# 5. 뉴스 분석(20점)
async def get_news_score(news_list):
# 뉴스 투자 심리
news_text = "\n".join(
[f"제목 : {n['title']}\n내용 : {n['snippet']}" for n in news_list]
)
prompt = ChatPromptTemplate.from_template(INVESTOR_SENTIMENT_PROMPT)
chain = prompt | hugging_llm | JsonOutputParser()
sentiment = await chain.ainvoke({"news_text":news_text})
total = sentiment["positive"] + sentiment["negative"] + sentiment["neutral"]
if total == 0:
return 0
positive_ratio = sentiment["positive"] / total
return round(positive_ratio * 20)
async def evaluation(analysis):
score = 0
data = analysis.analysis_json
# 1. per 점수
pe_ratio = data['financials'].get('pe_ratio')
per_score = get_per_score(pe_ratio)
score += per_score
# 2. 순이익률 점수
profit_score, profit_margin = get_profit_margin(data['financials'])
score += profit_score
# 3. 기술적 점수
technicals_score = get_tech_score(data["technicals"])
score += technicals_score
# 4. 경쟁사 점수
competitor_score = get_com_score(data['competitors'], pe_ratio, profit_margin)
score += competitor_score
# 5. 뉴스 점수
news_score = await get_news_score(data['news'])
score += news_score
return {
"total_score": score,
"per_score": per_score,
"profit_score": profit_score,
"technical_score": technicals_score,
"competitor_score": competitor_score,
"news_score": news_score
}
@@ -1,6 +1,12 @@
from backend.schemas.stock_schemas import TickerInfo
import yahooquery as yq
from fastapi import HTTPException
from backend.graph.stock_graph import build_stock_graph
from backend.repository.models import StockAnalysis, InvestmentOpinion
from fastapi.encoders import jsonable_encoder
from typing import Optional
from backend.services.score_service import evaluation, get_rating
from backend.ai.llm import hugging_llm
COMPANY_MAP = {
"NVDA": {
@@ -115,6 +121,8 @@ class StockService:
if alias.lower() in query:
return TickerInfo(ticker = ticker, company_name = info["company_name"])
return None
def _extract_company_keyword(self, query:str):
"""
query : 엔비디아 분석해줘
@@ -145,4 +153,89 @@ class StockService:
return TickerInfo(ticker=result["symbol"], company_name=result["longname"])
except Exception:
return None
return None
async def analyze(self, query: str, db):
# 티커, 회사명 추출
ticker_info = await self._extract(query)
# 그래프 실행
stock_graph = build_stock_graph()
result = await stock_graph.ainvoke(
{
"query": query,
"ticker": ticker_info.ticker,
"company_name": ticker_info.company_name
}
)
# 데이터베이스 저장
# 테이블과 관련있는 모델 객체 생성
entity = StockAnalysis(
ticker = result['ticker'],
company_name = result['company_name'],
analysis_json = jsonable_encoder(result),
report = result['report']
)
db.add(entity)
db.commit()
db.refresh(entity)
return {"analysis_id":entity.analysis_id, **result}
async def opinion_service(self, analysis_id, db):
# analysis_id 디비 조회
analysis = db.get(StockAnalysis, analysis_id)
score_result = await evaluation(analysis)
#
total_score = score_result['total_score']
rating = get_rating(total_score)
# AI 투자 의견서
opinion = await self.generate_opinion(analysis.report, total_score, rating)
entity = InvestmentOpinion(
analysis_id = analysis_id,
opinion = opinion,
rating = rating,
score = total_score,
)
db.add(entity)
db.commit()
db.refresh(entity)
return {"opinion_id":entity.opinion_id, "analysis_id":analysis_id, "opinion":opinion, "rating":rating, **score_result,}
async def generate_opinion(self, report, total_score, rating):
prompt = f"""
당신은 월가의 수석 애널리스트입니다.
종합점수 :
{total_score} / 100
투자등급 :
{rating}
다음 기업 분석 보고서를 기반으로 투자 의견서를 작성하세요.
{report}
반드시 아래 형식으로 작성하세요.
1. 투자등급
2. 투자근거
3. 핵심리스크
4. 단기전망
5. 장기전망
6. 최종의견
"""
result = await hugging_llm.ainvoke(prompt)
return result.content
# 싱글톤(객체 하나만 생성) 서비스 인스턴스
_service:Optional[StockService] = None
def get_stock_service():
global _service
if _service is None:
_service = StockService()
return _service
@@ -1,4 +1,4 @@
import yahooquery as yf
import yfinance as yf
from ta.trend import MACD, SMAIndicator
from ta.momentum import RSIIndicator
@@ -10,7 +10,7 @@ async def get_technical_info(ticker: str):
# 주가 데이터 다운로드
df = yf.download(ticker, period="1y", interval="1d")
# 종가 가져오기
close = df['Close'].squeeze()
close = df["Close"].squeeze()
macd = MACD(close)
macd_value = float(macd.macd().iloc[-1])
@@ -18,19 +18,18 @@ async def get_technical_info(ticker: str):
trend = "bullish" if macd_value > signal_value else "bearish"
return {
# 현재 주가
"current_price":close.iloc[-1],
# 20 일선
"sma20":float(SMAIndicator(close, window=20).sma_indicator().iloc[-1]),
# 60 일선
"sma60":float(SMAIndicator(close, window=60).sma_indicator().iloc[-1]),
# 현재주가
"current_price": close.iloc[-1],
# 20일선
"sma20": float(SMAIndicator(close, window=20).sma_indicator().iloc[-1]),
# 60일선
"sma60": float(SMAIndicator(close, window=60).sma_indicator().iloc[-1]),
# rsi
"rsi":float(SMAIndicator(close, window=14).rsi().iloc[-1]),
"rsi": float(RSIIndicator(close, window=14).rsi().iloc[-1]),
# macd
"macd":macd.macd(),
"macd": macd_value,
# macd_signal
"macd_signal": signal_value,
# trend
"trend": trend,
}
"trend": trend,
}