IT기술/랭체인 (langchain)

LangChain 기반 지능형 챗봇 구축 완벽 가이드: 환경 설정부터 프로덕션 배포까지

후스파 2025. 7. 10. 07:18
반응형

디지털 시대의 고객 경험을 혁신하는 대화형 AI 챗봇은 이제 비즈니스의 필수 요소가 되었습니다. 하지만 복잡한 자연어 처리와 데이터 통합 문제로 인해 많은 개발자들이 진입 장벽에 부딪히곤 합니다.
LangChain은 이러한 문제를 해결하며 빠른 시간 안에 지능형 챗봇을 구축할 수 있는 강력한 프레임워크를 제공합니다.
이 가이드에서는 환경 설정부터 RAG(검색 증강 생성) 기반의 고급 기능 확장까지, 실제 프로덕션에 바로 적용 가능한 단계별 구현 방법을 상세히 소개합니다.


환경 설정 및 필수 라이브러리 설치

개발 환경 구성

# 가상환경 생성 및 활성화
python -m venv langchain_chatbot_env
source langchain_chatbot_env/bin/activate  # Windows: langchain_chatbot_env\Scripts\activate

# 필수 패키지 설치
pip install langchain langchain-openai langchain-community python-dotenv 
pip install faiss-cpu tiktoken streamlit
pip install chromadb sentence-transformers

환경 변수 설정

# .env 파일 생성
OPENAI_API_KEY="your-openai-api-key-here"
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY="your-langsmith-api-key"
LANGCHAIN_PROJECT="chatbot-project"

# Streamlit 설정
STREAMLIT_SERVER_PORT=8501
STREAMLIT_SERVER_ADDRESS=localhost

프로젝트 구조

chatbot_project/
├── .env
├── requirements.txt
├── main.py
├── chatbot/
│   ├── __init__.py
│   ├── models.py
│   ├── memory.py
│   ├── rag.py
│   └── utils.py
├── data/
│   └── knowledge_base/
├── tests/
└── streamlit_app.py

언어 모델 초기화

기본 LLM 설정

# chatbot/models.py
import os
from langchain_openai import ChatOpenAI
from langchain_community.llms import Ollama
from dotenv import load_dotenv

load_dotenv()

class LLMManager:
    def __init__(self, model_type="openai"):
        self.model_type = model_type
        self.llm = self._initialize_model()

    def _initialize_model(self):
        if self.model_type == "openai":
            return ChatOpenAI(
                model="gpt-4",
                temperature=0.7,
                max_tokens=1000,
                streaming=True
            )
        elif self.model_type == "local":
            return Ollama(
                model="llama3.2:latest",
                temperature=0.7
            )
        else:
            raise ValueError(f"지원하지 않는 모델 타입: {self.model_type}")

    def get_model(self):
        return self.llm

# 사용 예시
llm_manager = LLMManager(model_type="openai")
llm = llm_manager.get_model()

프롬프트 템플릿 설계

시스템 메시지 템플릿

# chatbot/utils.py
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

class PromptManager:
    def __init__(self):
        self.system_templates = {
            "customer_support": self._create_customer_support_template(),
            "general": self._create_general_template(),
            "technical": self._create_technical_template()
        }

    def _create_customer_support_template(self):
        system_message = """당신은 전문 고객 지원 AI 어시스턴트입니다. 다음 규칙을 준수하세요:

1. **친절하고 공손한 어조 유지**: 항상 정중하고 도움이 되는 태도로 응답
2. **명확하고 간결한 답변**: 3-4문장 이내로 핵심 내용 전달
3. **정확한 정보 제공**: 확실하지 않은 내용은 솔직히 고백
4. **단계별 안내**: 복잡한 문제는 단계별로 해결 방법 제시
5. **추가 지원 제안**: 필요시 추가 도움이나 전문가 연결 제안

현재 시간: {current_time}
사용자 정보: {user_info}

컨텍스트 정보:
{context}

이전 대화 기록을 참고하여 일관성 있는 답변을 제공하세요."""

        return ChatPromptTemplate.from_messages([
            ("system", system_message),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}")
        ])

    def _create_general_template(self):
        return ChatPromptTemplate.from_messages([
            ("system", "당신은 도움이 되는 AI 어시스턴트입니다. 정확하고 유용한 정보를 제공하세요."),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}")
        ])

    def _create_technical_template(self):
        return ChatPromptTemplate.from_messages([
            ("system", """당신은 기술 지원 전문가입니다. 
            - 기술적 문제에 대해 정확한 해결책 제시
            - 코드 예시나 구체적인 단계 포함
            - 관련 문서나 리소스 링크 제공"""),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}")
        ])

    def get_prompt(self, template_type="customer_support"):
        return self.system_templates.get(template_type, self.system_templates["general"])

# 사용 예시
prompt_manager = PromptManager()
customer_prompt = prompt_manager.get_prompt("customer_support")

메모리 관리 구현

대화 기록 저장소 설정

# chatbot/memory.py
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryBufferMemory
from langchain_community.chat_message_histories import SQLChatMessageHistory
import sqlite3
from datetime import datetime

class MemoryManager:
    def __init__(self, memory_type="buffer_window", max_token_limit=1000):
        self.memory_type = memory_type
        self.max_token_limit = max_token_limit
        self.db_path = "chatbot_history.db"
        self._initialize_database()

    def _initialize_database(self):
        """SQLite 데이터베이스 초기화"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS chat_sessions (
                session_id TEXT PRIMARY KEY,
                user_id TEXT,
                created_at TIMESTAMP,
                last_activity TIMESTAMP
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS chat_messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id TEXT,
                message_type TEXT,
                content TEXT,
                timestamp TIMESTAMP,
                FOREIGN KEY (session_id) REFERENCES chat_sessions (session_id)
            )
        ''')
        conn.commit()
        conn.close()

    def create_memory(self, session_id, llm=None):
        """세션별 메모리 생성"""
        if self.memory_type == "buffer_window":
            return ConversationBufferWindowMemory(
                k=10,  # 최근 10개 대화만 유지
                memory_key="chat_history",
                return_messages=True,
                chat_memory=SQLChatMessageHistory(
                    session_id=session_id,
                    connection_string=f"sqlite:///{self.db_path}"
                )
            )
        elif self.memory_type == "summary_buffer":
            return ConversationSummaryBufferMemory(
                llm=llm,
                max_token_limit=self.max_token_limit,
                memory_key="chat_history",
                return_messages=True,
                chat_memory=SQLChatMessageHistory(
                    session_id=session_id,
                    connection_string=f"sqlite:///{self.db_path}"
                )
            )

    def get_session_history(self, session_id):
        """세션 기록 조회"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            SELECT message_type, content, timestamp 
            FROM chat_messages 
            WHERE session_id = ? 
            ORDER BY timestamp
        ''', (session_id,))

        messages = cursor.fetchall()
        conn.close()
        return messages

    def clear_session(self, session_id):
        """특정 세션 기록 삭제"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('DELETE FROM chat_messages WHERE session_id = ?', (session_id,))
        cursor.execute('DELETE FROM chat_sessions WHERE session_id = ?', (session_id,))
        conn.commit()
        conn.close()

# 사용 예시
memory_manager = MemoryManager(memory_type="buffer_window")
session_memory = memory_manager.create_memory("user_123_session_1")

외부 데이터 통합 (RAG)

문서 처리 파이프라인

# chatbot/rag.py
from langchain_community.document_loaders import (
    PyPDFLoader, WebBaseLoader, TextLoader, DirectoryLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
import os
from typing import List, Optional

class RAGSystem:
    def __init__(self, persist_directory="./chroma_db"):
        self.embeddings = OpenAIEmbeddings()
        self.persist_directory = persist_directory
        self.vectorstore = None
        self.retriever = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", " ", ""]
        )

    def load_documents(self, sources: List[str], source_type: str = "web"):
        """다양한 소스에서 문서 로드"""
        documents = []

        if source_type == "web":
            loader = WebBaseLoader(sources)
            documents = loader.load()
        elif source_type == "pdf":
            for source in sources:
                loader = PyPDFLoader(source)
                documents.extend(loader.load())
        elif source_type == "directory":
            for source in sources:
                loader = DirectoryLoader(source, glob="**/*.txt")
                documents.extend(loader.load())

        return documents

    def process_documents(self, documents):
        """문서 분할 및 벡터화"""
        # 문서 분할
        splits = self.text_splitter.split_documents(documents)

        # 벡터 스토어 생성 또는 업데이트
        if os.path.exists(self.persist_directory):
            self.vectorstore = Chroma(
                persist_directory=self.persist_directory,
                embedding_function=self.embeddings
            )
            self.vectorstore.add_documents(splits)
        else:
            self.vectorstore = Chroma.from_documents(
                documents=splits,
                embedding=self.embeddings,
                persist_directory=self.persist_directory
            )

        # 검색기 설정
        self.retriever = self.vectorstore.as_retriever(
            search_type="mmr",  # Maximum Marginal Relevance
            search_kwargs={"k": 3, "fetch_k": 6}
        )

        return len(splits)

    def create_rag_chain(self, llm, prompt_template):
        """RAG 체인 생성"""
        def format_docs(docs):
            return "\n\n".join([doc.page_content for doc in docs])

        rag_chain = (
            {
                "context": self.retriever | format_docs,
                "input": RunnablePassthrough(),
                "chat_history": RunnablePassthrough()
            }
            | prompt_template
            | llm
            | StrOutputParser()
        )

        return rag_chain

    def similarity_search(self, query: str, k: int = 3):
        """유사도 검색"""
        if self.vectorstore:
            return self.vectorstore.similarity_search(query, k=k)
        return []

    def add_documents_from_text(self, texts: List[str], metadatas: Optional[List[dict]] = None):
        """텍스트에서 직접 문서 추가"""
        if not self.vectorstore:
            self.vectorstore = Chroma(
                persist_directory=self.persist_directory,
                embedding_function=self.embeddings
            )

        self.vectorstore.add_texts(texts, metadatas=metadatas)
        self.retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": 3}
        )

# 사용 예시
rag_system = RAGSystem()

# 웹 페이지에서 지식 베이스 구축
knowledge_sources = [
    "https://docs.python.org/3/tutorial/",
    "https://langchain.readthedocs.io/en/latest/"
]

documents = rag_system.load_documents(knowledge_sources, source_type="web")
chunk_count = rag_system.process_documents(documents)
print(f"처리된 문서 청크 수: {chunk_count}")

체인 및 에이전트 구성

LCEL을 이용한 체인 생성

# main.py
from langchain_core.runnables import RunnablePassthrough, RunnableBranch
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from chatbot.models import LLMManager
from chatbot.memory import MemoryManager
from chatbot.rag import RAGSystem
from chatbot.utils import PromptManager

class ChatbotEngine:
    def __init__(self):
        self.llm_manager = LLMManager()
        self.memory_manager = MemoryManager()
        self.rag_system = RAGSystem()
        self.prompt_manager = PromptManager()

        self.llm = self.llm_manager.get_model()
        self.setup_chains()

    def setup_chains(self):
        """다양한 체인 설정"""
        # 기본 대화 체인
        self.basic_chain = self._create_basic_chain()

        # RAG 체인
        self.rag_chain = self._create_rag_chain()

        # 라우팅 체인 (의도 분류)
        self.routing_chain = self._create_routing_chain()

    def _create_basic_chain(self):
        """기본 대화 체인"""
        prompt = self.prompt_manager.get_prompt("general")
        return prompt | self.llm | StrOutputParser()

    def _create_rag_chain(self):
        """RAG 기반 체인"""
        if self.rag_system.retriever:
            prompt = self.prompt_manager.get_prompt("customer_support")
            return self.rag_system.create_rag_chain(self.llm, prompt)
        return self.basic_chain

    def _create_routing_chain(self):
        """의도 기반 라우팅 체인"""
        def classify_intent(input_text):
            # 간단한 키워드 기반 분류 (실제로는 더 정교한 분류기 사용)
            technical_keywords = ["오류", "버그", "설치", "설정", "API"]
            general_keywords = ["안녕", "도움", "문의", "질문"]

            if any(keyword in input_text for keyword in technical_keywords):
                return "technical"
            elif any(keyword in input_text for keyword in general_keywords):
                return "general"
            else:
                return "rag"

        return RunnableBranch(
            (lambda x: classify_intent(x["input"]) == "technical", 
             self.prompt_manager.get_prompt("technical") | self.llm),
            (lambda x: classify_intent(x["input"]) == "general", 
             self.basic_chain),
            self.rag_chain  # 기본값
        )

    def chat(self, user_input: str, session_id: str = "default"):
        """메인 채팅 함수"""
        # 세션 메모리 가져오기
        memory = self.memory_manager.create_memory(session_id, self.llm)

        # 대화 기록 로드
        chat_history = memory.chat_memory.messages

        # 체인 실행
        response = self.routing_chain.invoke({
            "input": user_input,
            "chat_history": chat_history
        })

        # 메모리에 대화 저장
        memory.save_context(
            {"input": user_input},
            {"output": response}
        )

        return response

# 사용 예시
chatbot = ChatbotEngine()

# 지식 베이스 설정
knowledge_sources = ["./data/knowledge_base/faq.txt"]
documents = chatbot.rag_system.load_documents(knowledge_sources, source_type="directory")
chatbot.rag_system.process_documents(documents)

# 대화 테스트
response = chatbot.chat("안녕하세요! 도움이 필요합니다.", session_id="user_123")
print(response)

대화형 인터페이스 구축 (Streamlit)

고급 Streamlit 애플리케이션

# streamlit_app.py
import streamlit as st
import uuid
from datetime import datetime
from main import ChatbotEngine
from langchain_community.callbacks import StreamlitCallbackHandler
import plotly.express as px
import pandas as pd

# 페이지 설정
st.set_page_config(
    page_title="AI 고객 지원 챗봇",
    page_icon="🤖",
    layout="wide",
    initial_sidebar_state="expanded"
)

# CSS 스타일링
st.markdown("""

    .main-header {
        font-size: 2.5rem;
        color: #1f77b4;
        text-align: center;
        margin-bottom: 2rem;
    }
    .chat-message {
        padding: 1rem;
        border-radius: 0.5rem;
        margin: 0.5rem 0;
    }
    .user-message {
        background-color: #e3f2fd;
        border-left: 4px solid #2196f3;
    }
    .assistant-message {
        background-color: #f1f8e9;
        border-left: 4px solid #4caf50;
    }

""", unsafe_allow_html=True)

class StreamlitChatbot:
    def __init__(self):
        self.initialize_session_state()
        self.chatbot_engine = self.load_chatbot_engine()

    def initialize_session_state(self):
        """세션 상태 초기화"""
        if "session_id" not in st.session_state:
            st.session_state.session_id = str(uuid.uuid4())

        if "messages" not in st.session_state:
            st.session_state.messages = []

        if "chat_history" not in st.session_state:
            st.session_state.chat_history = []

        if "feedback_scores" not in st.session_state:
            st.session_state.feedback_scores = []

    @st.cache_resource
    def load_chatbot_engine(_self):
        """챗봇 엔진 로드 (캐시됨)"""
        return ChatbotEngine()

    def render_sidebar(self):
        """사이드바 렌더링"""
        with st.sidebar:
            st.header("⚙️ 설정")

            # 세션 관리
            st.subheader("세션 관리")
            if st.button("새 대화 시작"):
                st.session_state.session_id = str(uuid.uuid4())
                st.session_state.messages = []
                st.session_state.chat_history = []
                st.rerun()

            st.text(f"세션 ID: {st.session_state.session_id[:8]}...")

            # 통계
            st.subheader("📊 대화 통계")
            st.metric("총 메시지", len(st.session_state.messages))

            if st.session_state.feedback_scores:
                avg_score = sum(st.session_state.feedback_scores) / len(st.session_state.feedback_scores)
                st.metric("평균 만족도", f"{avg_score:.1f}/5")

            # 지식 베이스 관리
            st.subheader("📚 지식 베이스")
            uploaded_file = st.file_uploader(
                "문서 업로드", 
                type=['txt', 'pdf', 'md'],
                help="새로운 지식을 추가하세요"
            )

            if uploaded_file and st.button("문서 추가"):
                self.process_uploaded_file(uploaded_file)

    def process_uploaded_file(self, uploaded_file):
        """업로드된 파일 처리"""
        try:
            if uploaded_file.type == "text/plain":
                content = str(uploaded_file.read(), "utf-8")
                self.chatbot_engine.rag_system.add_documents_from_text([content])
                st.success("문서가 성공적으로 추가되었습니다!")
            else:
                st.warning("현재 텍스트 파일만 지원됩니다.")
        except Exception as e:
            st.error(f"파일 처리 중 오류 발생: {e}")

    def render_chat_interface(self):
        """채팅 인터페이스 렌더링"""
        st.markdown('🤖 AI 고객 지원 챗봇', unsafe_allow_html=True)

        # 대화 기록 표시
        chat_container = st.container()

        with chat_container:
            for i, message in enumerate(st.session_state.messages):
                with st.chat_message(message["role"]):
                    st.markdown(message["content"])

                    # 어시스턴트 메시지에 피드백 버튼 추가
                    if message["role"] == "assistant" and i == len(st.session_state.messages) - 1:
                        self.render_feedback_buttons(i)

        # 사용자 입력
        if prompt := st.chat_input("질문을 입력하세요..."):
            self.handle_user_input(prompt)

    def render_feedback_buttons(self, message_index):
        """피드백 버튼 렌더링"""
        col1, col2, col3, col4, col5 = st.columns(5)

        feedback_given = False
        for i, col in enumerate([col1, col2, col3, col4, col5], 1):
            with col:
                if st.button(f"⭐" * i, key=f"feedback_{message_index}_{i}"):
                    st.session_state.feedback_scores.append(i)
                    st.success(f"{i}점 피드백 감사합니다!")
                    feedback_given = True

        return feedback_given

    def handle_user_input(self, prompt):
        """사용자 입력 처리"""
        # 사용자 메시지 추가
        st.session_state.messages.append({"role": "user", "content": prompt})

        with st.chat_message("user"):
            st.markdown(prompt)

        # 어시스턴트 응답 생성
        with st.chat_message("assistant"):
            with st.spinner("답변을 생성하고 있습니다..."):
                try:
                    # 스트리밍 콜백 설정
                    st_callback = StreamlitCallbackHandler(st.container())

                    # 챗봇 응답 생성
                    response = self.chatbot_engine.chat(
                        prompt, 
                        session_id=st.session_state.session_id
                    )

                    st.markdown(response)

                    # 응답을 세션에 저장
                    st.session_state.messages.append({
                        "role": "assistant", 
                        "content": response
                    })

                except Exception as e:
                    error_msg = f"죄송합니다. 오류가 발생했습니다: {str(e)}"
                    st.error(error_msg)
                    st.session_state.messages.append({
                        "role": "assistant", 
                        "content": error_msg
                    })

    def render_analytics(self):
        """분석 대시보드"""
        if len(st.session_state.messages) > 0:
            st.subheader("📈 대화 분석")

            # 메시지 길이 분석
            message_lengths = [len(msg["content"]) for msg in st.session_state.messages]

            col1, col2 = st.columns(2)

            with col1:
                # 메시지 길이 히스토그램
                fig = px.histogram(
                    x=message_lengths, 
                    title="메시지 길이 분포",
                    labels={"x": "문자 수", "y": "빈도"}
                )
                st.plotly_chart(fig, use_container_width=True)

            with col2:
                # 역할별 메시지 수
                role_counts = pd.DataFrame(st.session_state.messages)["role"].value_counts()
                fig = px.pie(
                    values=role_counts.values, 
                    names=role_counts.index,
                    title="역할별 메시지 비율"
                )
                st.plotly_chart(fig, use_container_width=True)

    def run(self):
        """메인 애플리케이션 실행"""
        self.render_sidebar()

        # 메인 탭 구성
        tab1, tab2 = st.tabs(["💬 채팅", "📊 분석"])

        with tab1:
            self.render_chat_interface()

        with tab2:
            self.render_analytics()

# 애플리케이션 실행
if __name__ == "__main__":
    app = StreamlitChatbot()
    app.run()

고급 기능 확장

실시간 알림 시스템 연동

# chatbot/notifications.py
import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from twilio.rest import Client
import requests
from datetime import datetime

class NotificationManager:
    def __init__(self):
        self.twilio_client = self._setup_twilio()
        self.email_config = self._setup_email()
        self.slack_webhook = os.getenv('SLACK_WEBHOOK_URL')

    def _setup_twilio(self):
        """Twilio 클라이언트 설정"""
        account_sid = os.getenv('TWILIO_SID')
        auth_token = os.getenv('TWILIO_TOKEN')

        if account_sid and auth_token:
            return Client(account_sid, auth_token)
        return None

    def _setup_email(self):
        """이메일 설정"""
        return {
            'smtp_server': os.getenv('SMTP_SERVER', 'smtp.gmail.com'),
            'smtp_port': int(os.getenv('SMTP_PORT', '587')),
            'email': os.getenv('EMAIL_ADDRESS'),
            'password': os.getenv('EMAIL_PASSWORD')
        }

    def send_sms_alert(self, message, phone_number=None):
        """SMS 알림 전송"""
        if not self.twilio_client:
            return False

        try:
            phone_number = phone_number or os.getenv('ADMIN_PHONE')

            self.twilio_client.messages.create(
                body=f"🚨 챗봇 알림: {message}",
                from_=os.getenv('TWILIO_PHONE'),
                to=phone_number
            )
            return True
        except Exception as e:
            print(f"SMS 전송 실패: {e}")
            return False

    def send_email_alert(self, subject, message, recipient=None):
        """이메일 알림 전송"""
        try:
            recipient = recipient or os.getenv('ADMIN_EMAIL')

            msg = MIMEMultipart()
            msg['From'] = self.email_config['email']
            msg['To'] = recipient
            msg['Subject'] = subject

            msg.attach(MIMEText(message, 'plain', 'utf-8'))

            server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
            server.starttls()
            server.login(self.email_config['email'], self.email_config['password'])

            text = msg.as_string()
            server.sendmail(self.email_config['email'], recipient, text)
            server.quit()

            return True
        except Exception as e:
            print(f"이메일 전송 실패: {e}")
            return False

    def send_slack_alert(self, message, channel=None):
        """Slack 알림 전송"""
        if not self.slack_webhook:
            return False

        try:
            payload = {
                "text": f"🤖 챗봇 알림: {message}",
                "channel": channel or "#alerts",
                "username": "ChatBot Alert",
                "icon_emoji": ":robot_face:"
            }

            response = requests.post(self.slack_webhook, json=payload)
            return response.status_code == 200
        except Exception as e:
            print(f"Slack 전송 실패: {e}")
            return False

    def send_critical_alert(self, message, alert_type="error"):
        """중요 알림 다중 채널 전송"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        full_message = f"[{timestamp}] {alert_type.upper()}: {message}"

        # 모든 채널로 알림 전송
        results = {
            'sms': self.send_sms_alert(full_message),
            'email': self.send_email_alert(f"챗봇 {alert_type} 알림", full_message),
            'slack': self.send_slack_alert(full_message)
        }

        return results

# 사용 예시
notification_manager = NotificationManager()

# 에러 발생 시 알림
def handle_critical_error(error_message):
    notification_manager.send_critical_alert(
        f"챗봇에서 심각한 오류 발생: {error_message}",
        alert_type="critical_error"
    )

성능 모니터링 시스템

# chatbot/monitoring.py
import time
import psutil
import logging
from functools import wraps
from datetime import datetime
import json
from collections import defaultdict

class PerformanceMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.setup_logging()

    def setup_logging(self):
        """로깅 설정"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('chatbot_performance.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def monitor_performance(self, operation_name):
        """성능 모니터링 데코레이터"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB

                try:
                    result = func(*args, **kwargs)
                    success = True
                    error = None
                except Exception as e:
                    result = None
                    success = False
                    error = str(e)
                    raise
                finally:
                    end_time = time.time()
                    end_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB

                    metrics = {
                        'operation': operation_name,
                        'duration': end_time - start_time,
                        'memory_used': end_memory - start_memory,
                        'success': success,
                        'error': error,
                        'timestamp': datetime.now().isoformat()
                    }

                    self.record_metrics(metrics)

                return result
            return wrapper
        return decorator

    def record_metrics(self, metrics):
        """메트릭 기록"""
        self.metrics[metrics['operation']].append(metrics)
        self.logger.info(f"Performance: {json.dumps(metrics)}")

        # 성능 임계값 확인
        if metrics['duration'] > 5.0:  # 5초 이상
            self.logger.warning(f"Slow operation detected: {metrics['operation']} took {metrics['duration']:.2f}s")

        if metrics['memory_used'] > 100:  # 100MB 이상
            self.logger.warning(f"High memory usage: {metrics['operation']} used {metrics['memory_used']:.2f}MB")

    def get_performance_summary(self, operation_name=None):
        """성능 요약 통계"""
        if operation_name:
            data = self.metrics.get(operation_name, [])
        else:
            data = []
            for metrics_list in self.metrics.values():
                data.extend(metrics_list)

        if not data:
            return {}

        durations = [m['duration'] for m in data if m['success']]
        memory_usage = [m['memory_used'] for m in data if m['success']]
        success_rate = sum(1 for m in data if m['success']) / len(data)

        return {
            'total_operations': len(data),
            'success_rate': success_rate,
            'avg_duration': sum(durations) / len(durations) if durations else 0,
            'max_duration': max(durations) if durations else 0,
            'avg_memory_usage': sum(memory_usage) / len(memory_usage) if memory_usage else 0,
            'max_memory_usage': max(memory_usage) if memory_usage else 0
        }

# 전역 모니터 인스턴스
performance_monitor = PerformanceMonitor()

# 사용 예시
@performance_monitor.monitor_performance("chat_response")
def generate_chat_response(user_input, session_id):
    # 챗봇 응답 생성 로직
    time.sleep(0.5)  # 시뮬레이션
    return "응답 생성 완료"

테스트 및 최적화 전략

자동화된 테스트 스위트

# tests/test_chatbot.py
import pytest
import asyncio
from main import ChatbotEngine
from chatbot.memory import MemoryManager
from chatbot.rag import RAGSystem

class TestChatbot:
    @pytest.fixture
    def chatbot_engine(self):
        return ChatbotEngine()

    @pytest.fixture
    def sample_questions(self):
        return [
            "안녕하세요",
            "도움이 필요합니다",
            "제품 사용법을 알려주세요",
            "오류가 발생했습니다",
            "환불 정책은 무엇인가요?"
        ]

    def test_basic_response(self, chatbot_engine):
        """기본 응답 테스트"""
        response = chatbot_engine.chat("안녕하세요")
        assert response is not None
        assert len(response) > 0
        assert isinstance(response, str)

    def test_session_memory(self, chatbot_engine):
        """세션 메모리 테스트"""
        session_id = "test_session"

        # 첫 번째 메시지
        response1 = chatbot_engine.chat("제 이름은 김철수입니다", session_id)

        # 두 번째 메시지에서 이름 기억하는지 확인
        response2 = chatbot_engine.chat("제 이름이 무엇인지 기억하시나요?", session_id)

        assert "김철수" in response2 or "철수" in response2

    def test_rag_functionality(self, chatbot_engine):
        """RAG 기능 테스트"""
        # 테스트 문서 추가
        test_docs = ["회사 정책: 환불은 구매 후 30일 이내 가능합니다."]
        chatbot_engine.rag_system.add_documents_from_text(test_docs)

        response = chatbot_engine.chat("환불 정책이 무엇인가요?")
        assert "30일" in response

    @pytest.mark.asyncio
    async def test_concurrent_requests(self, chatbot_engine, sample_questions):
        """동시 요청 처리 테스트"""
        async def send_request(question, session_id):
            return chatbot_engine.chat(question, session_id)

        tasks = [
            send_request(question, f"session_{i}")
            for i, question in enumerate(sample_questions)
        ]

        responses = await asyncio.gather(*tasks)

        # 모든 응답이 정상적으로 생성되었는지 확인
        assert len(responses) == len(sample_questions)
        assert all(response is not None for response in responses)

    def test_performance_benchmark(self, chatbot_engine, sample_questions):
        """성능 벤치마크 테스트"""
        import time

        response_times = []

        for question in sample_questions:
            start_time = time.time()
            response = chatbot_engine.chat(question)
            end_time = time.time()

            response_times.append(end_time - start_time)
            assert response is not None

        avg_response_time = sum(response_times) / len(response_times)
        max_response_time = max(response_times)

        # 성능 기준 확인
        assert avg_response_time  80:
            print(f"⚠️ 높은 메모리 사용량: {memory_percent:.1f}%")
            print("해결책:")
            print("  1. 대화 기록을 10회로 제한")
            print("  2. 벡터 스토어 크기 최적화")
            print("  3. 배치 크기 조정")
            return False

        print(f"✅ 메모리 사용량 정상: {memory_percent:.1f}%")
        return True

    def check_response_quality(self):
        """응답 품질 확인"""
        suggestions = [
            "프롬프트 템플릿 개선",
            "검색 결과를 상위 3개로 제한",
            "컨텍스트 길이 최적화",
            "temperature 값 조정 (0.3-0.7)",
            "시스템 메시지 구체화"
        ]

        print("📝 응답 품질 개선 방법:")
        for i, suggestion in enumerate(suggestions, 1):
            print(f"  {i}. {suggestion}")

        return True

    def run_full_diagnostic(self):
        """전체 진단 실행"""
        print("🔍 챗봇 시스템 진단을 시작합니다...\n")

        results = {}
        for issue_name, check_func in self.common_issues.items():
            print(f"검사 중: {issue_name}")
            results[issue_name] = check_func()
            print()

        # 요약
        passed = sum(results.values())
        total = len(results)

        print(f"📊 진단 결과: {passed}/{total} 통과")

        if passed == total:
            print("🎉 모든 검사를 통과했습니다!")
        else:
            print("⚠️ 일부 문제가 발견되었습니다. 위의 해결책을 참고하세요.")

        return results

# 사용 예시
if __name__ == "__main__":
    troubleshooter = TroubleshootingGuide()
    troubleshooter.run_full_diagnostic()

마무리

OpenAI API와의 원활한 통합, FAISS를 활용한 지식 기반 응답 생성, Streamlit으로 구현하는 직관적인 UI까지, 모든 요소가 하나의 파이프라인으로 연결됩니다.
특히 RAG 기술을 접목하면 단순한 질의응답을 넘어 회사의 내부 문서를 활용한 전문성 있는 답변이 가능해집니다.
핵심 포인트:

  • 모듈화된 아키텍처: 각 컴포넌트의 독립적 개발과 테스트 가능
  • RAG 시스템 통합: 외부 지식 베이스를 활용한 정확한 답변 생성
  • 실시간 모니터링: 성능 메트릭과 알림 시스템으로 안정적 운영
  • 확장 가능한 배포: Docker와 마이크로서비스 아키텍처로 스케일링
  • 사용자 경험 최적화: Streamlit 기반 직관적 인터페이스
  • 프로덕션 준비: 테스트, 모니터링, 문제 해결 가이드 포함

이 가이드를 통해 구축한 챗봇은 단순한 프로토타입을 넘어 실제 비즈니스 환경에서 활용 가능한 수준의 지능형 고객 지원 시스템으로 발전할 수 있습니다.

반응형