IT기술/랭체인 (langchain)

LangChain 기반 PDF 처리 및 문서 검색 시스템 구축 완벽 가이드: 비구조화 데이터의 지능형 활용

후스파 2025. 7. 11. 19:01
반응형

PDF 파일은 법률, 금융, 연구 등 다양한 도메인에서 중요한 정보를 담고 있는 비구조화된 데이터 형식입니다. 하지만 PDF의 복잡한 구조와 긴 문서 길이는 데이터를 효율적으로 처리하고 검색하는 데 큰 도전 과제가 됩니다.
LangChain은 이러한 문제를 해결하기 위해 다양한 PDF 로더와 벡터 검색 기술을 제공하여 PDF 문서의 내용을 효율적으로 처리하고, 검색 및 질의응답 시스템을 구축할 수 있게 합니다. 이번 글에서는 LangChain을 활용해 PDF 파일을 처리하고, 문서 검색 및 내용 추출을 자동화하는 전략을 단계별로 알아보겠습니다.


LangChain으로 PDF 파일 처리하기: 단계별 가이드

PDF 로더 선택 및 텍스트 추출

LangChain은 다양한 PDF 로더를 제공합니다. 아래는 주요 로더와 사용 시나리오입니다:

  • PyPDFLoader: 가볍고 빠른 텍스트 추출에 적합
  • UnstructuredPDFLoader: 복잡한 레이아웃 문서 처리
  • PDFPlumberLoader: 표와 같은 구조화 데이터 추출
from langchain_community.document_loaders import PyPDFLoader

# PDF 파일 로드
loader = PyPDFLoader("example.pdf")
documents = loader.load()
print(documents[0].page_content[:300])  # 첫 페이지 내용 출력

고급 PDF 로더 활용법

from langchain_community.document_loaders import (
    PyPDFLoader, UnstructuredPDFLoader, PDFPlumberLoader, 
    PyMuPDFLoader, PDFMinerLoader
)
import os
from typing import List, Dict, Any

class AdvancedPDFProcessor:
    def __init__(self):
        self.loaders = {
            "pypdf": PyPDFLoader,
            "unstructured": UnstructuredPDFLoader,
            "plumber": PDFPlumberLoader,
            "pymupdf": PyMuPDFLoader,
            "pdfminer": PDFMinerLoader
        }

    def analyze_pdf_structure(self, pdf_path: str) -> Dict[str, Any]:
        """PDF 구조 분석 및 최적 로더 추천"""
        import PyPDF2

        with open(pdf_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)

            analysis = {
                "page_count": len(pdf_reader.pages),
                "has_images": False,
                "has_tables": False,
                "text_density": 0,
                "recommended_loader": "pypdf"
            }

            # 첫 페이지 분석
            first_page = pdf_reader.pages[0]
            text = first_page.extract_text()

            analysis["text_density"] = len(text) / 1000  # 텍스트 밀도

            # 표 존재 여부 추정 (간단한 휴리스틱)
            if "table" in text.lower() or text.count("|") > 10:
                analysis["has_tables"] = True
                analysis["recommended_loader"] = "plumber"

            # 복잡한 레이아웃 감지
            if analysis["text_density"]  Dict[str, Any]:
        """PDF 메타데이터 추출"""
        import PyPDF2

        with open(pdf_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            metadata = pdf_reader.metadata

            return {
                "title": metadata.get("/Title", ""),
                "author": metadata.get("/Author", ""),
                "subject": metadata.get("/Subject", ""),
                "creator": metadata.get("/Creator", ""),
                "creation_date": metadata.get("/CreationDate", ""),
                "modification_date": metadata.get("/ModDate", ""),
                "page_count": len(pdf_reader.pages)
            }

# 사용 예시
processor = AdvancedPDFProcessor()
documents, used_loader = processor.load_pdf_with_optimal_loader("example.pdf")
metadata = processor.extract_metadata("example.pdf")

print(f"사용된 로더: {used_loader}")
print(f"메타데이터: {metadata}")

텍스트 분할로 대용량 문서 처리

긴 문서를 효율적으로 다루기 위해 텍스트를 청크 단위로 분할합니다.

from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
docs = text_splitter.split_documents(documents)

고급 텍스트 분할 전략

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter, 
    SpacyTextSplitter,
    NLTKTextSplitter,
    TokenTextSplitter
)
from langchain.schema import Document
import re

class IntelligentTextSplitter:
    def __init__(self):
        self.splitters = {
            "recursive": RecursiveCharacterTextSplitter,
            "spacy": SpacyTextSplitter,
            "nltk": NLTKTextSplitter,
            "token": TokenTextSplitter
        }

    def semantic_split(self, documents: List[Document], chunk_size: int = 1000) -> List[Document]:
        """의미 단위 기반 텍스트 분할"""

        # 문서 유형별 구분자 설정
        separators = [
            "\n\n",  # 단락 구분
            "\n",    # 줄 구분
            ". ",    # 문장 구분
            "? ",    # 질문 구분
            "! ",    # 감탄 구분
            "; ",    # 세미콜론 구분
            ", ",    # 쉼표 구분
            " ",     # 공백 구분
            ""       # 문자 구분
        ]

        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=200,
            separators=separators,
            length_function=len
        )

        return splitter.split_documents(documents)

    def section_aware_split(self, documents: List[Document]) -> List[Document]:
        """섹션 인식 분할"""
        split_docs = []

        for doc in documents:
            content = doc.page_content

            # 섹션 헤더 패턴 감지
            section_patterns = [
                r'^#{1,6}\s+(.+)$',  # 마크다운 헤더
                r'^\d+\.\s+(.+)$',   # 번호 목록
                r'^[A-Z\s]+:',       # 대문자 라벨
                r'^\s*\d+\.\d+',     # 계층 번호
            ]

            sections = []
            current_section = ""

            for line in content.split('\n'):
                is_header = any(re.match(pattern, line, re.MULTILINE) for pattern in section_patterns)

                if is_header and current_section:
                    sections.append(current_section.strip())
                    current_section = line + '\n'
                else:
                    current_section += line + '\n'

            if current_section:
                sections.append(current_section.strip())

            # 각 섹션을 별도 문서로 생성
            for i, section in enumerate(sections):
                if len(section) > 100:  # 최소 길이 확인
                    section_doc = Document(
                        page_content=section,
                        metadata={
                            **doc.metadata,
                            "section_index": i,
                            "section_type": "content"
                        }
                    )
                    split_docs.append(section_doc)

        return split_docs

    def table_aware_split(self, documents: List[Document]) -> List[Document]:
        """표 인식 분할"""
        split_docs = []

        for doc in documents:
            content = doc.page_content

            # 표 패턴 감지
            table_pattern = r'(\|[^|\n]*\|[^|\n]*\|.*?\n)+'
            tables = re.findall(table_pattern, content, re.MULTILINE)

            if tables:
                # 표와 일반 텍스트 분리
                text_parts = re.split(table_pattern, content)

                for i, part in enumerate(text_parts):
                    if part.strip():
                        doc_type = "table" if part in tables else "text"

                        split_doc = Document(
                            page_content=part.strip(),
                            metadata={
                                **doc.metadata,
                                "content_type": doc_type,
                                "part_index": i
                            }
                        )
                        split_docs.append(split_doc)
            else:
                split_docs.append(doc)

        return split_docs

# 사용 예시
intelligent_splitter = IntelligentTextSplitter()

# 의미 단위 분할
semantic_docs = intelligent_splitter.semantic_split(documents)

# 섹션 인식 분할
section_docs = intelligent_splitter.section_aware_split(documents)

# 표 인식 분할
table_docs = intelligent_splitter.table_aware_split(documents)

print(f"원본 문서 수: {len(documents)}")
print(f"의미 단위 분할 후: {len(semantic_docs)}")
print(f"섹션 인식 분할 후: {len(section_docs)}")
print(f"표 인식 분할 후: {len(table_docs)}")

벡터 저장소 생성 및 검색 시스템 구축

PDF에서 추출한 텍스트를 벡터로 변환해 저장소에 저장하고, 유사도 기반 검색을 구현합니다.

from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

# 벡터 저장소 생성
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(docs, embeddings)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 5})

고급 벡터 저장소 구성

from langchain_community.vectorstores import FAISS, Chroma, Pinecone
from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
import numpy as np
from typing import List, Dict, Optional

class AdvancedVectorStore:
    def __init__(self, embedding_type: str = "openai"):
        self.embedding_type = embedding_type
        self.embeddings = self._initialize_embeddings()
        self.vectorstores = {}

    def _initialize_embeddings(self):
        """임베딩 모델 초기화"""
        if self.embedding_type == "openai":
            return OpenAIEmbeddings(model="text-embedding-3-large")
        elif self.embedding_type == "huggingface":
            return HuggingFaceEmbeddings(
                model_name="sentence-transformers/all-MiniLM-L6-v2"
            )
        else:
            raise ValueError(f"지원하지 않는 임베딩 타입: {self.embedding_type}")

    def create_hierarchical_vectorstore(self, documents: List[Document], domain: str):
        """계층적 벡터 저장소 생성"""

        # 문서 유형별 분류
        doc_types = {}
        for doc in documents:
            doc_type = doc.metadata.get("content_type", "general")
            if doc_type not in doc_types:
                doc_types[doc_type] = []
            doc_types[doc_type].append(doc)

        # 각 유형별로 별도 벡터 저장소 생성
        for doc_type, type_docs in doc_types.items():
            if type_docs:
                vectorstore = FAISS.from_documents(type_docs, self.embeddings)
                self.vectorstores[f"{domain}_{doc_type}"] = vectorstore

        # 통합 벡터 저장소도 생성
        all_vectorstore = FAISS.from_documents(documents, self.embeddings)
        self.vectorstores[f"{domain}_all"] = all_vectorstore

        return self.vectorstores

    def create_multi_embedding_store(self, documents: List[Document], domain: str):
        """다중 임베딩 벡터 저장소"""

        # 여러 임베딩 모델 사용
        embedding_models = {
            "openai": OpenAIEmbeddings(model="text-embedding-3-large"),
            "openai_small": OpenAIEmbeddings(model="text-embedding-3-small"),
            "huggingface": HuggingFaceEmbeddings(
                model_name="sentence-transformers/all-MiniLM-L6-v2"
            )
        }

        multi_stores = {}
        for model_name, embedding_model in embedding_models.items():
            try:
                vectorstore = FAISS.from_documents(documents, embedding_model)
                multi_stores[f"{domain}_{model_name}"] = vectorstore
            except Exception as e:
                print(f"{model_name} 임베딩 실패: {e}")

        return multi_stores

    def create_filtered_retriever(self, vectorstore, filters: Dict[str, Any]):
        """필터링된 검색기 생성"""

        def filter_function(metadata):
            for key, value in filters.items():
                if key in metadata:
                    if isinstance(value, list):
                        if metadata[key] not in value:
                            return False
                    else:
                        if metadata[key] != value:
                            return False
            return True

        # 커스텀 검색기 클래스
        class FilteredRetriever:
            def __init__(self, vectorstore, filter_func):
                self.vectorstore = vectorstore
                self.filter_func = filter_func

            def get_relevant_documents(self, query: str, k: int = 5):
                # 더 많은 문서를 검색한 후 필터링
                candidates = self.vectorstore.similarity_search(query, k=k*3)
                filtered = [doc for doc in candidates if self.filter_func(doc.metadata)]
                return filtered[:k]

        return FilteredRetriever(vectorstore, filter_function)

    def hybrid_search(self, query: str, vectorstore_keys: List[str], weights: List[float] = None):
        """하이브리드 검색 (여러 벡터 저장소 결합)"""

        if weights is None:
            weights = [1.0] * len(vectorstore_keys)

        all_results = []

        for i, key in enumerate(vectorstore_keys):
            if key in self.vectorstores:
                results = self.vectorstores[key].similarity_search_with_score(query, k=5)

                # 가중치 적용
                weighted_results = [
                    (doc, score * weights[i], key) 
                    for doc, score in results
                ]
                all_results.extend(weighted_results)

        # 점수순 정렬
        all_results.sort(key=lambda x: x[1], reverse=True)

        # 중복 제거 및 상위 결과 반환
        seen_content = set()
        unique_results = []

        for doc, score, source in all_results:
            content_hash = hash(doc.page_content)
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                unique_results.append((doc, score, source))

                if len(unique_results) >= 5:
                    break

        return unique_results

# 사용 예시
advanced_store = AdvancedVectorStore(embedding_type="openai")

# 계층적 벡터 저장소 생성
hierarchical_stores = advanced_store.create_hierarchical_vectorstore(docs, "legal_docs")

# 다중 임베딩 저장소 생성
multi_stores = advanced_store.create_multi_embedding_store(docs, "legal_docs")

# 필터링된 검색
filters = {"content_type": "text", "section_index": [0, 1, 2]}
filtered_retriever = advanced_store.create_filtered_retriever(
    hierarchical_stores["legal_docs_all"], 
    filters
)

# 하이브리드 검색
hybrid_results = advanced_store.hybrid_search(
    "계약 조건",
    ["legal_docs_text", "legal_docs_all"],
    weights=[0.7, 0.3]
)

print(f"하이브리드 검색 결과: {len(hybrid_results)}개")

검색 증강 생성(RAG) 파이프라인 구성

검색된 문서를 기반으로 GPT 모델이 답변을 생성하도록 체인을 구성합니다.

from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

template = """문서 내용 기반 답변:
{context}

질문: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# RAG 체인 구성
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model="gpt-4")
)

고급 RAG 파이프라인

from langchain_core.runnables import RunnableBranch, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder

class AdvancedRAGPipeline:
    def __init__(self, vectorstore, llm_model="gpt-4"):
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI(model=llm_model, temperature=0.1)
        self.retriever = vectorstore.as_retriever()

    def create_contextual_retriever(self):
        """컨텍스트 인식 검색기"""

        contextualize_q_system_prompt = """이전 대화 기록과 최신 사용자 질문이 주어졌을 때,
        이전 대화 맥락을 참조하는 질문을 독립적으로 이해할 수 있는 질문으로 재구성하세요.
        질문에 답하지 말고, 필요하다면 재구성만 하고, 그렇지 않으면 그대로 반환하세요."""

        contextualize_q_prompt = ChatPromptTemplate.from_messages([
            ("system", contextualize_q_system_prompt),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
        ])

        history_aware_retriever = create_history_aware_retriever(
            self.llm, self.retriever, contextualize_q_prompt
        )

        return history_aware_retriever

    def create_multi_query_retriever(self):
        """다중 쿼리 검색기"""

        multi_query_prompt = ChatPromptTemplate.from_template("""
        다음 질문에 대해 다른 관점에서 3개의 유사한 질문을 생성하세요.
        각 질문은 한 줄에 하나씩 작성하세요.

        원본 질문: {question}

        대안 질문들:
        """)

        def generate_queries(question):
            queries_text = (multi_query_prompt | self.llm | StrOutputParser()).invoke(
                {"question": question}
            )
            queries = [q.strip() for q in queries_text.split('\n') if q.strip()]
            return [question] + queries  # 원본 질문도 포함

        def multi_query_search(question):
            queries = generate_queries(question)
            all_docs = []

            for query in queries:
                docs = self.retriever.get_relevant_documents(query)
                all_docs.extend(docs)

            # 중복 제거
            unique_docs = []
            seen_content = set()

            for doc in all_docs:
                content_hash = hash(doc.page_content)
                if content_hash not in seen_content:
                    seen_content.add(content_hash)
                    unique_docs.append(doc)

            return unique_docs[:5]  # 상위 5개 반환

        return multi_query_search

    def create_adaptive_rag_chain(self):
        """적응형 RAG 체인"""

        # 질문 유형 분류
        def classify_question(question):
            classification_prompt = ChatPromptTemplate.from_template("""
            다음 질문을 분류하세요:
            1. factual: 사실적 정보 요청
            2. analytical: 분석적 사고 필요
            3. comparative: 비교 분석 필요
            4. summarization: 요약 요청

            질문: {question}

            분류 결과 (한 단어로만 답하세요):
            """)

            result = (classification_prompt | self.llm | StrOutputParser()).invoke(
                {"question": question}
            )
            return result.strip().lower()

        # 질문 유형별 프롬프트
        prompts = {
            "factual": ChatPromptTemplate.from_template("""
            다음 문서에서 정확한 사실 정보를 찾아 답변하세요:

            문서: {context}

            질문: {question}

            답변 (사실만 간단명료하게):
            """),

            "analytical": ChatPromptTemplate.from_template("""
            다음 문서를 분석하여 깊이 있는 답변을 제공하세요:

            문서: {context}

            질문: {question}

            분석적 답변:
            """),

            "comparative": ChatPromptTemplate.from_template("""
            다음 문서들을 비교 분석하여 답변하세요:

            문서: {context}

            질문: {question}

            비교 분석 답변:
            """),

            "summarization": ChatPromptTemplate.from_template("""
            다음 문서의 내용을 요약하여 답변하세요:

            문서: {context}

            질문: {question}

            요약 답변:
            """)
        }

        # 분기 체인 생성
        def route_question(inputs):
            question = inputs["question"]
            question_type = classify_question(question)

            # 기본값 설정
            if question_type not in prompts:
                question_type = "factual"

            prompt = prompts[question_type]
            context = self.retriever.get_relevant_documents(question)

            return {
                "context": "\n\n".join([doc.page_content for doc in context]),
                "question": question,
                "question_type": question_type
            }

        adaptive_chain = (
            RunnablePassthrough()
            | route_question
            | RunnableBranch(
                (lambda x: x["question_type"] == "factual", prompts["factual"] | self.llm),
                (lambda x: x["question_type"] == "analytical", prompts["analytical"] | self.llm),
                (lambda x: x["question_type"] == "comparative", prompts["comparative"] | self.llm),
                (lambda x: x["question_type"] == "summarization", prompts["summarization"] | self.llm),
                prompts["factual"] | self.llm  # 기본값
            )
            | StrOutputParser()
        )

        return adaptive_chain

# 사용 예시
rag_pipeline = AdvancedRAGPipeline(vectorstore)

# 컨텍스트 인식 검색기
contextual_retriever = rag_pipeline.create_contextual_retriever()

# 다중 쿼리 검색기
multi_query_retriever = rag_pipeline.create_multi_query_retriever()

# 적응형 RAG 체인
adaptive_chain = rag_pipeline.create_adaptive_rag_chain()

# 테스트
question = "이 계약서의 주요 조건들을 비교 분석해주세요."
response = adaptive_chain.invoke({"question": question})
print(f"답변: {response}")

성능 최적화 전략

캐싱 활용

동일한 요청에 대해 중복 계산을 방지하기 위해 캐싱 시스템을 도입합니다.

from langchain.cache import SQLiteCache
import langchain

langchain.llm_cache = SQLiteCache(database_path="./cache.db")

압축 검색

LLM 기반 압축기를 사용해 검색 성능을 개선합니다.

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

compressor = LLMChainExtractor.from_llm(ChatOpenAI())
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

병렬 처리

여러 문서를 동시에 처리하여 속도를 높입니다.

from concurrent.futures import ThreadPoolExecutor

def process_documents(docs):
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(lambda doc: retriever.retrieve(doc), docs))
    return results

고급 성능 최적화

import asyncio
from typing import List, Dict, Any
import time
import hashlib
from functools import lru_cache

class PDFProcessingOptimizer:
    def __init__(self):
        self.cache = {}
        self.processing_stats = {}

    @lru_cache(maxsize=1000)
    def cached_embedding(self, text: str) -> str:
        """임베딩 결과 캐싱"""
        # 실제로는 임베딩 모델 호출
        return hashlib.md5(text.encode()).hexdigest()

    async def parallel_document_processing(self, pdf_files: List[str]) -> Dict[str, Any]:
        """병렬 문서 처리"""

        async def process_single_pdf(pdf_path: str):
            start_time = time.time()

            try:
                # PDF 로드
                loader = PyPDFLoader(pdf_path)
                documents = loader.load()

                # 텍스트 분할
                text_splitter = RecursiveCharacterTextSplitter(
                    chunk_size=1000,
                    chunk_overlap=200
                )
                splits = text_splitter.split_documents(documents)

                processing_time = time.time() - start_time

                return {
                    "file": pdf_path,
                    "documents": splits,
                    "processing_time": processing_time,
                    "chunk_count": len(splits),
                    "status": "success"
                }

            except Exception as e:
                return {
                    "file": pdf_path,
                    "error": str(e),
                    "processing_time": time.time() - start_time,
                    "status": "failed"
                }

        # 모든 PDF 파일을 병렬로 처리
        tasks = [process_single_pdf(pdf_file) for pdf_file in pdf_files]
        results = await asyncio.gather(*tasks)

        return results

    def batch_vectorization(self, documents: List[Document], batch_size: int = 50):
        """배치 벡터화"""

        vectorized_batches = []

        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]

            # 배치별 벡터화 (실제로는 임베딩 모델 호출)
            batch_vectors = []
            for doc in batch:
                vector = self.cached_embedding(doc.page_content)
                batch_vectors.append(vector)

            vectorized_batches.append({
                "documents": batch,
                "vectors": batch_vectors,
                "batch_index": i // batch_size
            })

            # API 레이트 리밋 고려
            time.sleep(0.1)

        return vectorized_batches

    def optimize_chunk_size(self, documents: List[Document]) -> int:
        """최적 청크 크기 결정"""

        # 문서 길이 분석
        doc_lengths = [len(doc.page_content) for doc in documents]
        avg_length = sum(doc_lengths) / len(doc_lengths)

        # 최적 청크 크기 계산
        if avg_length  max_memory_mb:
                # 메모리 정리
                gc.collect()
                print(f"메모리 정리 수행: {memory_usage:.2f}MB -> {psutil.Process().memory_info().rss / 1024 / 1024:.2f}MB")

            # PDF 처리
            try:
                loader = PyPDFLoader(pdf_file)
                documents = loader.load()

                # 청크 크기 최적화
                optimal_chunk_size = self.optimize_chunk_size(documents)

                text_splitter = RecursiveCharacterTextSplitter(
                    chunk_size=optimal_chunk_size,
                    chunk_overlap=200
                )
                splits = text_splitter.split_documents(documents)

                processed_files.append({
                    "file": pdf_file,
                    "chunks": len(splits),
                    "optimal_chunk_size": optimal_chunk_size
                })

                # 처리 후 메모리 정리
                del documents, splits

            except Exception as e:
                print(f"파일 처리 실패 {pdf_file}: {e}")

        return processed_files

# 사용 예시
optimizer = PDFProcessingOptimizer()

# 병렬 문서 처리
pdf_files = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
results = asyncio.run(optimizer.parallel_document_processing(pdf_files))

# 배치 벡터화
batch_results = optimizer.batch_vectorization(docs, batch_size=30)

# 메모리 효율적 처리
memory_results = optimizer.memory_efficient_processing(pdf_files, max_memory_mb=800)

print(f"병렬 처리 결과: {len(results)}개 파일")
print(f"배치 벡터화: {len(batch_results)}개 배치")
print(f"메모리 효율 처리: {len(memory_results)}개 파일")

실제 적용 사례

법률 문서 분석: 계약서에서 특정 조항 검색 및 요약 제공

class LegalDocumentAnalyzer:
    def __init__(self):
        self.rag_pipeline = AdvancedRAGPipeline(vectorstore)
        self.legal_terms = self._load_legal_terminology()

    def _load_legal_terminology(self):
        """법률 용어 사전 로드"""
        return {
            "termination": "해지",
            "liability": "책임",
            "indemnification": "배상",
            "confidentiality": "기밀유지",
            "force majeure": "불가항력"
        }

    def extract_contract_clauses(self, contract_text: str) -> Dict[str, List[str]]:
        """계약서 조항 추출"""

        clause_patterns = {
            "termination": [
                r"termination.*?(?=\n\n|\Z)",
                r"해지.*?(?=\n\n|\Z)",
                r"계약.*?종료.*?(?=\n\n|\Z)"
            ],
            "payment": [
                r"payment.*?(?=\n\n|\Z)",
                r"지급.*?(?=\n\n|\Z)",
                r"대금.*?(?=\n\n|\Z)"
            ],
            "liability": [
                r"liability.*?(?=\n\n|\Z)",
                r"책임.*?(?=\n\n|\Z)",
                r"손해.*?(?=\n\n|\Z)"
            ]
        }

        extracted_clauses = {}

        for clause_type, patterns in clause_patterns.items():
            clauses = []
            for pattern in patterns:
                matches = re.findall(pattern, contract_text, re.IGNORECASE | re.DOTALL)
                clauses.extend(matches)

            extracted_clauses[clause_type] = clauses

        return extracted_clauses

    def analyze_contract_risks(self, contract_text: str) -> Dict[str, Any]:
        """계약서 리스크 분석"""

        risk_analysis_prompt = ChatPromptTemplate.from_template("""
        다음 계약서 내용을 분석하여 잠재적 리스크를 식별하세요:

        계약서 내용:
        {contract_text}

        다음 형식으로 분석 결과를 제공하세요:
        1. 높은 리스크 요소 (3개)
        2. 중간 리스크 요소 (3개)
        3. 권장 수정 사항 (3개)
        4. 전체 리스크 점수 (1-10점)
        """)

        llm = ChatOpenAI(model="gpt-4", temperature=0.1)
        analysis_chain = risk_analysis_prompt | llm | StrOutputParser()

        risk_analysis = analysis_chain.invoke({"contract_text": contract_text})

        return {
            "risk_analysis": risk_analysis,
            "extracted_clauses": self.extract_contract_clauses(contract_text),
            "key_terms": self._identify_key_terms(contract_text)
        }

    def _identify_key_terms(self, text: str) -> List[str]:
        """주요 법률 용어 식별"""
        identified_terms = []

        for english_term, korean_term in self.legal_terms.items():
            if english_term.lower() in text.lower() or korean_term in text:
                identified_terms.append(f"{english_term} ({korean_term})")

        return identified_terms

# 사용 예시
legal_analyzer = LegalDocumentAnalyzer()
contract_analysis = legal_analyzer.analyze_contract_risks(contract_text)
print("계약서 리스크 분석:", contract_analysis)

연구 논문 요약: 다중 논문에서 유사 연구 비교 분석

class ResearchPaperAnalyzer:
    def __init__(self):
        self.rag_pipeline = AdvancedRAGPipeline(vectorstore)

    def extract_paper_metadata(self, paper_text: str) -> Dict[str, str]:
        """논문 메타데이터 추출"""

        metadata_prompt = ChatPromptTemplate.from_template("""
        다음 논문에서 메타데이터를 추출하세요:

        논문 내용:
        {paper_text}

        JSON 형식으로 다음 정보를 추출하세요:
        {{
            "title": "논문 제목",
            "authors": "저자들",
            "abstract": "초록",
            "keywords": "키워드들",
            "methodology": "연구 방법론",
            "main_findings": "주요 발견사항"
        }}
        """)

        llm = ChatOpenAI(model="gpt-4", temperature=0.1)
        extraction_chain = metadata_prompt | llm | StrOutputParser()

        try:
            result = extraction_chain.invoke({"paper_text": paper_text[:3000]})
            return eval(result)  # JSON 파싱
        except:
            return {"error": "메타데이터 추출 실패"}

    def compare_research_papers(self, papers: List[str]) -> Dict[str, Any]:
        """다중 논문 비교 분석"""

        paper_summaries = []

        for i, paper in enumerate(papers):
            metadata = self.extract_paper_metadata(paper)
            paper_summaries.append({
                "paper_id": i,
                "metadata": metadata,
                "content_preview": paper[:500]
            })

        comparison_prompt = ChatPromptTemplate.from_template("""
        다음 논문들을 비교 분석하세요:

        {paper_summaries}

        비교 분석 결과:
        1. 공통 연구 주제
        2. 방법론 차이점
        3. 주요 발견사항 비교
        4. 연구 한계점
        5. 향후 연구 방향
        """)

        llm = ChatOpenAI(model="gpt-4", temperature=0.3)
        comparison_chain = comparison_prompt | llm | StrOutputParser()

        comparison_result = comparison_chain.invoke({
            "paper_summaries": str(paper_summaries)
        })

        return {
            "comparison_analysis": comparison_result,
            "paper_count": len(papers),
            "individual_summaries": paper_summaries
        }

# 사용 예시
research_analyzer = ResearchPaperAnalyzer()
comparison_result = research_analyzer.compare_research_papers(research_papers)
print("논문 비교 분석:", comparison_result)

재무 보고서 처리: 표 데이터 추출 및 재무 지표 분석

import pandas as pd
import re
from typing import List, Dict, Any

class FinancialReportAnalyzer:
    def __init__(self):
        self.rag_pipeline = AdvancedRAGPipeline(vectorstore)

    def extract_financial_tables(self, report_text: str) -> List[pd.DataFrame]:
        """재무 보고서에서 표 데이터 추출"""

        # 표 패턴 감지
        table_patterns = [
            r'\|[^|\n]*\|[^|\n]*\|.*?\n(?:\|[^|\n]*\|[^|\n]*\|.*?\n)*',  # 마크다운 표
            r'(\d+,?\d*)\s+(\d+,?\d*)\s+(\d+,?\d*)',  # 숫자 열
        ]

        tables = []

        for pattern in table_patterns:
            matches = re.findall(pattern, report_text, re.MULTILINE)

            for match in matches:
                if isinstance(match, tuple):
                    # 숫자 데이터를 DataFrame으로 변환
                    data = [list(match)]
                    df = pd.DataFrame(data)
                    tables.append(df)
                else:
                    # 마크다운 표 파싱
                    lines = match.strip().split('\n')
                    if len(lines) >= 2:
                        headers = [col.strip() for col in lines[0].split('|')[1:-1]]
                        rows = []

                        for line in lines[2:]:  # 헤더와 구분선 제외
                            row = [col.strip() for col in line.split('|')[1:-1]]
                            if len(row) == len(headers):
                                rows.append(row)

                        if rows:
                            df = pd.DataFrame(rows, columns=headers)
                            tables.append(df)

        return tables

    def calculate_financial_ratios(self, financial_data: Dict[str, float]) -> Dict[str, float]:
        """재무 비율 계산"""

        ratios = {}

        try:
            # 유동비율
            if 'current_assets' in financial_data and 'current_liabilities' in financial_data:
                ratios['current_ratio'] = financial_data['current_assets'] / financial_data['current_liabilities']

            # 부채비율
            if 'total_debt' in financial_data and 'total_equity' in financial_data:
                ratios['debt_to_equity'] = financial_data['total_debt'] / financial_data['total_equity']

            # ROE (자기자본이익률)
            if 'net_income' in financial_data and 'total_equity' in financial_data:
                ratios['roe'] = financial_data['net_income'] / financial_data['total_equity']

            # ROA (총자산이익률)
            if 'net_income' in financial_data and 'total_assets' in financial_data:
                ratios['roa'] = financial_data['net_income'] / financial_data['total_assets']

        except ZeroDivisionError:
            pass

        return ratios

    def analyze_financial_performance(self, report_text: str) -> Dict[str, Any]:
        """재무 성과 분석"""

        # 표 데이터 추출
        tables = self.extract_financial_tables(report_text)

        # 재무 데이터 추출 프롬프트
        extraction_prompt = ChatPromptTemplate.from_template("""
        다음 재무 보고서에서 주요 재무 지표를 추출하세요:

        보고서 내용:
        {report_text}

        JSON 형식으로 다음 정보를 추출하세요:
        {{
            "revenue": 매출액,
            "net_income": 순이익,
            "total_assets": 총자산,
            "total_equity": 자기자본,
            "total_debt": 총부채,
            "current_assets": 유동자산,
            "current_liabilities": 유동부채
        }}

        숫자만 추출하고, 단위는 제외하세요.
        """)

        llm = ChatOpenAI(model="gpt-4", temperature=0.1)
        extraction_chain = extraction_prompt | llm | StrOutputParser()

        try:
            financial_data_str = extraction_chain.invoke({"report_text": report_text[:3000]})
            financial_data = eval(financial_data_str)

            # 재무 비율 계산
            ratios = self.calculate_financial_ratios(financial_data)

            # 성과 분석
            performance_analysis = self._analyze_performance_trends(financial_data, ratios)

            return {
                "extracted_data": financial_data,
                "financial_ratios": ratios,
                "performance_analysis": performance_analysis,
                "extracted_tables": len(tables)
            }

        except Exception as e:
            return {"error": f"재무 분석 실패: {str(e)}"}

    def _analyze_performance_trends(self, data: Dict[str, float], ratios: Dict[str, float]) -> str:
        """성과 트렌드 분석"""

        analysis = []

        # 수익성 분석
        if 'roe' in ratios:
            if ratios['roe'] > 0.15:
                analysis.append("높은 자기자본이익률로 우수한 수익성을 보임")
            elif ratios['roe'] > 0.10:
                analysis.append("양호한 수준의 자기자본이익률")
            else:
                analysis.append("자기자본이익률 개선 필요")

        # 안정성 분석
        if 'current_ratio' in ratios:
            if ratios['current_ratio'] > 2.0:
                analysis.append("유동비율이 높아 단기 안정성 우수")
            elif ratios['current_ratio'] > 1.0:
                analysis.append("적정 수준의 유동비율 유지")
            else:
                analysis.append("유동비율 개선 필요")

        return "; ".join(analysis)

# 사용 예시
financial_analyzer = FinancialReportAnalyzer()
financial_analysis = financial_analyzer.analyze_financial_performance(financial_report_text)
print("재무 분석 결과:", financial_analysis)

마무리

LangChain은 PDF 파일의 복잡한 구조를 효과적으로 처리하고, 검색 및 질의응답 시스템을 손쉽게 구축할 수 있는 강력한 도구를 제공합니다.
특히, 다양한 PDF 로더와 벡터 저장소를 활용하면 대규모 문서에서도 높은 정확도로 정보를 검색할 수 있습니다. 법률, 금융, 연구 등 다양한 도메인에서 LangChain의 기능을 활용할 수 있습니다.
핵심 포인트:

  • 다양한 PDF 로더: 문서 특성에 맞는 최적 로더 선택으로 추출 품질 향상
  • 지능형 텍스트 분할: 의미 단위, 섹션 인식, 표 인식 분할로 구조 보존
  • 계층적 벡터 저장소: 문서 유형별 분류와 다중 임베딩으로 검색 정확도 극대화
  • 적응형 RAG 파이프라인: 질문 유형별 맞춤형 처리로 답변 품질 향상
  • 성능 최적화: 병렬 처리, 캐싱, 메모리 관리로 대용량 문서 효율적 처리
  • 도메인별 특화: 법률, 연구, 재무 등 전문 분야별 맞춤형 분석 기능

LangChain의 PDF 처리 기능을 통해 비구조화된 문서 데이터를 구조화된 지식으로 변환하고, 지능형 문서 검색 및 분석 시스템을 구축할 수 있습니다.

반응형