fastAPI 심화
- Chart.js - pdf, csv 파일 업로드 후 데이터 정제하여 llm으로 처리 후 결과 도출 - sqlite로 데이터 저장 - ORM - SQLAlchemy
This commit is contained in:
@@ -0,0 +1,140 @@
|
||||
from fastapi import UploadFile
|
||||
from langchain_core.runnables import RunnablePassthrough
|
||||
|
||||
from backend.services.db_service import get_connection
|
||||
from backend.services.db_service import get_table_columns
|
||||
from langchain_core.prompts import ChatPromptTemplate
|
||||
from langchain_core.output_parsers import StrOutputParser
|
||||
from backend.ai.llm import watson_llm
|
||||
import csv
|
||||
|
||||
async def upload_csv(file :UploadFile):
|
||||
"""
|
||||
csv 파일을 테이블에 저장
|
||||
"""
|
||||
|
||||
conn =get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# file.read() : 비동기 함수임
|
||||
contents = await file.read()
|
||||
csv_text = contents.decode('utf-8')
|
||||
reader = csv.DictReader(csv_text.splitlines())
|
||||
count = 0
|
||||
|
||||
for row in reader:
|
||||
cursor.execute("""
|
||||
INSERT INTO transactions (date, category, merchant, amount)
|
||||
VALUES (?, ?,?, ?)"""
|
||||
,
|
||||
(row['date'], row['category'], row['merchant'], row['amount'])
|
||||
)
|
||||
count += 1
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return {"message": f"{count} 건 저장 완료"}
|
||||
|
||||
def card_history():
|
||||
"""
|
||||
db에서 카드 정보 조회
|
||||
"""
|
||||
conn =get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
try :
|
||||
cursor.execute("""
|
||||
SELECT * FROM transactions ORDER BY date DESC
|
||||
""")
|
||||
rows = cursor.fetchall()
|
||||
query_result = [dict(row) for row in rows]
|
||||
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
query_result = [f"SQL 실행 오류 : {e}"]
|
||||
|
||||
return query_result
|
||||
|
||||
def get_dashboard():
|
||||
"""
|
||||
db에서 대시보드 정보 조회
|
||||
"""
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 카테고리 별 사용금액
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT category, SUM(amount)
|
||||
FROM transactions
|
||||
GROUP BY category
|
||||
ORDER BY SUM(amount) DESC
|
||||
"""
|
||||
)
|
||||
|
||||
category_rows = cursor.fetchall()
|
||||
|
||||
# 월별 사용금액
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT strftime('%Y-%m', date), SUM(amount)
|
||||
FROM transactions
|
||||
GROUP BY strftime('%Y-%m', date)
|
||||
ORDER BY strftime('%Y-%m', date)
|
||||
"""
|
||||
)
|
||||
month_rows = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
return {
|
||||
"category": [{"category":row[0], "amount":row[1]} for row in category_rows],
|
||||
"monthly": [{"month":row[0], "amount":row[1]} for row in month_rows],
|
||||
}
|
||||
|
||||
def sql_generate_llm(question):
|
||||
"""자연어 -> SQL"""
|
||||
sql_prompt = ChatPromptTemplate.from_template("""
|
||||
당신은 SQLite 전문가입니다.
|
||||
|
||||
테이블명 : transactions
|
||||
|
||||
컬럼 : {columns}
|
||||
|
||||
질문 : {question}
|
||||
|
||||
SQL 만 출력하세요
|
||||
""")
|
||||
|
||||
columns = get_table_columns("transactions")
|
||||
sql_chain = sql_prompt | watson_llm | StrOutputParser()
|
||||
|
||||
sql = sql_chain.invoke({"columns": columns, "question": question})
|
||||
print(f"sql: {sql}")
|
||||
|
||||
sql = sql.replace("```sql","").replace("```", "").strip()
|
||||
|
||||
# sql 문 실제 실행
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(sql)
|
||||
rows = cursor.fetchall()
|
||||
query_result = [dict(row) for row in rows]
|
||||
conn.close()
|
||||
|
||||
return query_result
|
||||
|
||||
def card_analysis(question):
|
||||
query_result = sql_generate_llm(question)
|
||||
|
||||
analysis_prompt = ChatPromptTemplate.from_template("""
|
||||
사용자 질문 : {question}
|
||||
|
||||
SQL 결과
|
||||
{result}
|
||||
|
||||
결과를 설명하고 소비 습관을 분석하고 절약 팁을 제시해 주세요.
|
||||
""")
|
||||
analysis_chain = analysis_prompt | watson_llm | StrOutputParser()
|
||||
answer = analysis_chain.invoke({"question": question, "result": query_result})
|
||||
|
||||
return {"message": answer}
|
||||
@@ -0,0 +1,44 @@
|
||||
import sqlite3
|
||||
import os
|
||||
|
||||
DB_PATH = "db/history.db"
|
||||
|
||||
"""
|
||||
cursor.execute('select id, name for users')
|
||||
cursor.fetchone() => (1, 'alice', '010-1234-5678')
|
||||
|
||||
row['id']
|
||||
"""
|
||||
|
||||
def get_connection():
|
||||
coon = sqlite3.connect(DB_PATH, isolation_level=None)
|
||||
# 조회 결과를 어떻게 반활할지 설정
|
||||
coon.row_factory = sqlite3.Row
|
||||
return coon
|
||||
|
||||
def init_db():
|
||||
# 디렉토리 생성
|
||||
os.makedirs("db", exist_ok=True)
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""create table if not exists transactions (
|
||||
id integer primary key AUTOINCREMENT,
|
||||
date text,
|
||||
category text,
|
||||
merchant text,
|
||||
amount integer
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
print("DB 초기화")
|
||||
|
||||
def get_table_columns(table_name):
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("PRAGMA table_info({})".format(table_name))
|
||||
columns = cursor.fetchall()
|
||||
conn.close()
|
||||
return [column[1] for column in columns]
|
||||
|
||||
Reference in New Issue
Block a user