IT기술/랭체인 (langchain)

LangChain 성능 최적화 완벽 가이드: 응답 속도 향상과 비용 절감 전략

후스파 2025. 7. 5. 22:58
반응형

LangChain을 활용한 AI 애플리케이션 개발 시 성능 최적화는 응답 속도 향상과 비용 절감에 중요한 요소입니다. 이 글에서는 LangChain의 성능을 최적화하는 다양한 방법과 전략을 소개합니다.


캐싱을 통한 성능 최적화

캐시의 개념과 중요성

캐싱은 이전에 계산된 결과를 저장해두고, 동일한 입력이 들어왔을 때 다시 계산하지 않고 저장된 결과를 재사용하는 기술입니다. LangChain에서 캐싱은 LLM 호출 결과를 저장하고, 동일한 프롬프트에 대해서는 LLM을 다시 호출하지 않고 캐시된 결과를 반환합니다.

캐싱 유형

인메모리 캐싱(In-Memory Cache)

from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache
from langchain_openai import ChatOpenAI

# 인메모리 캐시 설정
set_llm_cache(InMemoryCache(maxsize=100))

# 모델 초기화 및 사용
model = ChatOpenAI(model="gpt-3.5-turbo")

# 첫 번째 호출 (캐시 미스)
response1 = model.invoke("파이썬에서 리스트를 생성하는 방법은?")

# 두 번째 호출 (캐시 히트 - 즉시 반환)
response2 = model.invoke("파이썬에서 리스트를 생성하는 방법은?")

인메모리 캐싱은 실시간 데이터 분석 서비스나 뉴스 헤드라인 업데이트처럼 빠른 응답이 중요한 경우에 적합합니다.
SQLite 캐싱

from langchain_community.cache import SQLiteCache
from langchain_core.globals import set_llm_cache

# SQLite 캐시 설정 (영구 저장)
set_llm_cache(SQLiteCache(database_path=".langchain.db"))

# 모델 사용 - 캐시가 파일에 저장되어 재시작 후에도 유지됨
model = ChatOpenAI()
response = model.invoke("머신러닝의 기본 개념을 설명해주세요.")

SQLite 캐싱은 데이터를 파일 기반의 데이터베이스에 저장합니다. 메모리에 비해 접근 속도는 느리지만, 데이터를 영구적으로 보존할 수 있어 시스템 재시작 후에도 캐시된 데이터를 사용할 수 있습니다. FAQ 형식의 챗봇에 적합합니다.
Redis 캐싱 (분산 환경)

from langchain_community.cache import RedisCache
import redis

# Redis 캐시 설정 (분산 환경에 적합)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
set_llm_cache(RedisCache(redis_client))

# 여러 서버에서 동일한 캐시 공유 가능
model = ChatOpenAI()
response = model.invoke("딥러닝과 머신러닝의 차이점은?")

Python 내장 캐싱

from functools import lru_cache
from langchain.chains import RetrievalQA

@lru_cache(maxsize=100)
def get_cached_response(user_input: str):
    """자주 묻는 질문에 대한 캐싱 적용"""
    return qa_chain.invoke({"input": user_input})

# 사용 예시
response1 = get_cached_response("AI란 무엇인가요?")  # 첫 호출
response2 = get_cached_response("AI란 무엇인가요?")  # 캐시에서 반환

Python의 내장 lru_cache 데코레이터를 사용하여 자주 묻는 질문에 대해 캐싱을 적용할 수 있습니다.


비동기 처리를 통한 성능 최적화

asyncio와 LangChain 비동기 기능 활용

Python의 asyncio와 LangChain의 비동기 기능을 활용하면 여러 LLM 요청을 동시에 처리하여 전체 처리 시간을 단축할 수 있습니다.

import asyncio
import time
from langchain_openai import ChatOpenAI

async def process_single_query(llm, query):
    """단일 쿼리 비동기 처리"""
    return await llm.ainvoke(query)

async def process_queries_concurrently(queries):
    """여러 쿼리 동시 처리"""
    llm = ChatOpenAI(model="gpt-3.5-turbo")

    # 모든 쿼리를 동시에 처리
    tasks = [process_single_query(llm, query) for query in queries]
    results = await asyncio.gather(*tasks)

    return results

# 동기 처리와 비동기 처리 성능 비교
def process_queries_sync(queries):
    """동기 처리 (순차적)"""
    llm = ChatOpenAI(model="gpt-3.5-turbo")
    results = []

    for query in queries:
        result = llm.invoke(query)
        results.append(result)

    return results

# 사용 예시
queries = [
    "파이썬의 장점을 설명해주세요.",
    "자바스크립트의 특징은 무엇인가요?",
    "머신러닝의 기본 개념을 알려주세요.",
    "데이터베이스 정규화란 무엇인가요?",
    "클라우드 컴퓨팅의 이점을 설명해주세요."
]

# 비동기 처리 실행
async def main():
    start_time = time.time()
    async_results = await process_queries_concurrently(queries)
    async_time = time.time() - start_time

    print(f"비동기 처리 시간: {async_time:.2f}초")

    # 동기 처리와 비교
    start_time = time.time()
    sync_results = process_queries_sync(queries)
    sync_time = time.time() - start_time

    print(f"동기 처리 시간: {sync_time:.2f}초")
    print(f"성능 향상: {sync_time/async_time:.2f}배")

# 실행
asyncio.run(main())

체인과 에이전트의 비동기 처리

from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI

async def async_chain_processing():
    """체인의 비동기 처리"""
    llm = ChatOpenAI(model="gpt-3.5-turbo")

    # 여러 체인을 동시에 실행
    prompt_template = PromptTemplate(
        input_variables=["topic"],
        template="다음 주제에 대해 간단히 설명해주세요: {topic}"
    )

    chain = LLMChain(llm=llm, prompt=prompt_template)

    topics = ["인공지능", "블록체인", "양자컴퓨팅", "사물인터넷", "빅데이터"]

    # 모든 체인을 비동기로 실행
    tasks = [chain.arun(topic=topic) for topic in topics]
    results = await asyncio.gather(*tasks)

    return dict(zip(topics, results))

# 실행
results = asyncio.run(async_chain_processing())
for topic, explanation in results.items():
    print(f"{topic}: {explanation[:100]}...")

비동기 처리는 여러 작업을 동시에 처리함으로써 처리 속도를 크게 향상시킬 수 있습니다. 특히 여러 메뉴의 영양성분을 추출하는 API와 같이 독립적인 여러 작업을 병렬로 처리해야 하는 경우에 효과적입니다.


프롬프트 최적화

효율적인 프롬프트 설계는 LLM의 응답 품질뿐만 아니라 처리 속도와 토큰 사용량에도 영향을 미칩니다.

토큰 효율적인 프롬프트 설계

# 비효율적인 프롬프트 (토큰 낭비)
inefficient_prompt = """
안녕하세요! 저는 당신에게 질문을 하려고 합니다. 
이 질문은 매우 중요한 질문이며, 정확한 답변을 원합니다.
질문은 다음과 같습니다: 파이썬에서 리스트를 만드는 방법을 
자세히 설명해주시기 바랍니다. 가능하면 예시 코드도 
포함해서 설명해주세요.
"""

# 효율적인 프롬프트 (토큰 절약)
efficient_prompt = """
파이썬 리스트 생성 방법을 예시 코드와 함께 설명해주세요.
"""

# 토큰 사용량 비교
from langchain_openai import ChatOpenAI
import tiktoken

def count_tokens(text, model="gpt-3.5-turbo"):
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

print(f"비효율적 프롬프트 토큰: {count_tokens(inefficient_prompt)}")
print(f"효율적 프롬프트 토큰: {count_tokens(efficient_prompt)}")

컨텍스트 최적화 전략

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.summarize import load_summarize_chain

def optimize_context_for_llm(long_text, max_tokens=2000):
    """긴 텍스트를 LLM에 최적화된 형태로 압축"""

    # 1. 텍스트 분할
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=100
    )
    chunks = text_splitter.split_text(long_text)

    # 2. 중요한 부분만 추출 (요약)
    llm = ChatOpenAI(model="gpt-3.5-turbo")
    summarize_chain = load_summarize_chain(llm, chain_type="map_reduce")

    # 3. 요약된 컨텍스트 반환
    summary = summarize_chain.run(chunks)
    return summary

# Few-shot 예제 최적화
def create_optimized_few_shot_prompt(examples, max_examples=3):
    """효과적인 few-shot 예제 선택"""

    # 가장 대표적인 예제만 선택
    selected_examples = examples[:max_examples]

    prompt = "다음 예시를 참고하여 답변해주세요:\n\n"

    for i, example in enumerate(selected_examples, 1):
        prompt += f"예시 {i}:\n"
        prompt += f"질문: {example['question']}\n"
        prompt += f"답변: {example['answer']}\n\n"

    return prompt

배치 처리와 병렬 처리 최적화

배치 처리 구현

from typing import List
import asyncio

class BatchProcessor:
    def __init__(self, llm, batch_size=10):
        self.llm = llm
        self.batch_size = batch_size

    async def process_batch(self, queries: List[str]):
        """배치 단위로 쿼리 처리"""
        tasks = [self.llm.ainvoke(query) for query in queries]
        return await asyncio.gather(*tasks)

    async def process_large_dataset(self, all_queries: List[str]):
        """대용량 데이터셋을 배치로 나누어 처리"""
        results = []

        for i in range(0, len(all_queries), self.batch_size):
            batch = all_queries[i:i + self.batch_size]
            batch_results = await self.process_batch(batch)
            results.extend(batch_results)

            # 배치 간 지연 (API 레이트 리밋 방지)
            await asyncio.sleep(0.1)

        return results

# 사용 예시
async def main():
    llm = ChatOpenAI(model="gpt-3.5-turbo")
    processor = BatchProcessor(llm, batch_size=5)

    # 대량의 쿼리 처리
    queries = [f"주제 {i}에 대해 설명해주세요." for i in range(50)]
    results = await processor.process_large_dataset(queries)

    print(f"처리된 쿼리 수: {len(results)}")

asyncio.run(main())

모델 라우팅 최적화

from langchain_openai import ChatOpenAI

class SmartModelRouter:
    def __init__(self):
        self.fast_model = ChatOpenAI(model="gpt-3.5-turbo")  # 빠르고 저렴
        self.powerful_model = ChatOpenAI(model="gpt-4")      # 느리지만 강력

    def classify_query_complexity(self, query: str) -> str:
        """쿼리 복잡도 분류"""
        # 간단한 휴리스틱 기반 분류
        if len(query.split())  50:  # 너무 짧은 문서 제외
                doc.page_content = cleaned_content
                processed.append(doc)
        return processed

# 효율적인 검색 구현
class OptimizedRetriever:
    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.retriever = vectorstore.as_retriever(
            search_type="mmr",  # Maximum Marginal Relevance
            search_kwargs={"k": 5, "fetch_k": 20}
        )

    async def retrieve_with_reranking(self, query: str):
        """재순위화를 통한 검색 품질 향상"""
        # 1차 검색
        docs = self.retriever.get_relevant_documents(query)

        # 2차 재순위화 (간단한 키워드 기반)
        query_keywords = set(query.lower().split())

        scored_docs = []
        for doc in docs:
            doc_keywords = set(doc.page_content.lower().split())
            overlap_score = len(query_keywords & doc_keywords) / len(query_keywords)
            scored_docs.append((doc, overlap_score))

        # 점수순 정렬
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        return [doc for doc, score in scored_docs[:3]]

리소스 관리 전략

토큰 사용량 모니터링

from langchain.callbacks import get_openai_callback
import logging

class TokenUsageMonitor:
    def __init__(self):
        self.total_tokens = 0
        self.total_cost = 0.0
        self.logger = logging.getLogger(__name__)

    def track_usage(self, func):
        """토큰 사용량 추적 데코레이터"""
        def wrapper(*args, **kwargs):
            with get_openai_callback() as cb:
                result = func(*args, **kwargs)

                self.total_tokens += cb.total_tokens
                self.total_cost += cb.total_cost

                self.logger.info(f"함수: {func.__name__}")
                self.logger.info(f"토큰 사용량: {cb.total_tokens}")
                self.logger.info(f"비용: ${cb.total_cost:.4f}")

                return result
        return wrapper

    def get_usage_report(self):
        """사용량 리포트 생성"""
        return {
            "total_tokens": self.total_tokens,
            "total_cost": self.total_cost,
            "average_tokens_per_call": self.total_tokens / max(1, self.call_count)
        }

# 사용 예시
monitor = TokenUsageMonitor()

@monitor.track_usage
def process_query(query):
    llm = ChatOpenAI(model="gpt-3.5-turbo")
    return llm.invoke(query)

# 토큰 사용량이 자동으로 추적됨
result = process_query("AI의 미래에 대해 설명해주세요.")

적응형 배치 크기 조정

import time
from typing import List

class AdaptiveBatchProcessor:
    def __init__(self, llm, initial_batch_size=5):
        self.llm = llm
        self.batch_size = initial_batch_size
        self.performance_history = []

    async def adaptive_batch_process(self, queries: List[str]):
        """성능에 따라 배치 크기를 동적 조정"""
        results = []

        for i in range(0, len(queries), self.batch_size):
            batch = queries[i:i + self.batch_size]

            start_time = time.time()
            try:
                batch_results = await self.process_batch(batch)
                processing_time = time.time() - start_time

                # 성능 기록
                self.performance_history.append({
                    'batch_size': len(batch),
                    'processing_time': processing_time,
                    'throughput': len(batch) / processing_time
                })

                # 배치 크기 조정
                self.adjust_batch_size()

                results.extend(batch_results)

            except Exception as e:
                # 오류 발생 시 배치 크기 감소
                self.batch_size = max(1, self.batch_size // 2)
                self.logger.warning(f"배치 처리 오류, 크기 조정: {self.batch_size}")

                # 개별 처리로 폴백
                for query in batch:
                    try:
                        result = await self.llm.ainvoke(query)
                        results.append(result)
                    except:
                        results.append(None)

        return results

    def adjust_batch_size(self):
        """성능 히스토리를 기반으로 배치 크기 조정"""
        if len(self.performance_history)  2.0:  # 초당 2개 이상 처리
            self.batch_size = min(20, self.batch_size + 1)
        elif avg_throughput < 0.5:  # 초당 0.5개 미만 처리
            self.batch_size = max(1, self.batch_size - 1)

마무리

LangChain의 성능 최적화는 캐싱, 비동기 처리, 프롬프트 최적화, 효율적인 데이터 통합, 그리고 리소스 관리 전략을 통해 달성할 수 있습니다.
이러한 방법을 적절히 조합하여 응용 프로그램의 성능을 향상시키고 비용을 절감할 수 있습니다.
특히 캐싱은 동일한 쿼리에 대한 반복적인 API 호출을 줄이고, 비동기 처리는 여러 작업을 동시에 처리하여 전체 처리 시간을 단축하는 데 효과적입니다.
핵심 포인트:

  • 다층 캐싱 전략: 인메모리, SQLite, Redis 캐시의 적절한 조합
  • 비동기 처리: asyncio를 활용한 동시 처리로 응답 시간 단축
  • 스마트 모델 라우팅: 쿼리 복잡도에 따른 적절한 모델 선택
  • 토큰 최적화: 프롬프트 효율화와 컨텍스트 압축
  • 적응형 배치 처리: 성능에 따른 동적 배치 크기 조정
  • 실시간 모니터링: 토큰 사용량과 성능 지표 추적
반응형