IT기술/랭체인 (langchain)

LangChain 멀티모달 데이터 통합 완벽 가이드: 텍스트, 이미지, 비디오를 아우르는 AI 애플리케이션 구축

후스파 2025. 7. 8. 21:37
반응형

멀티모달 데이터 통합은 다양한 형태의 데이터(텍스트, 이미지, 오디오, 비디오 등)를 함께 처리하여 더 풍부한 AI 애플리케이션을 구축하는 방법입니다.
LangChain은 이러한 멀티모달 데이터를 효과적으로 통합할 수 있는 프레임워크를 제공합니다. 이 글에서는 LangChain을 활용하여 다양한 형태의 데이터를 통합하는 방법을 알아보겠습니다.


LangChain의 멀티모달 지원 개요

LangChain은 멀티모달 데이터 처리를 위한 다양한 기능을 제공합니다. 멀티모달 지원은 아직 상대적으로 새로운 분야이며, 모델 제공업체들이 API 정의 방식을 표준화하지 않았기 때문에 LangChain의 멀티모달 추상화는 가볍고 유연하게 설계되어 있습니다.

지원되는 멀티모달 데이터 유형

  • 이미지: URL 또는 Base64 인코딩 형태
  • 오디오: 음성 파일 및 실시간 오디오 스트림
  • 비디오: 동영상 파일 및 프레임 분석
  • 문서: PDF, Word 등 구조화된 문서
  • 텍스트: 다양한 형태의 텍스트 데이터

지원 모델 제공업체

OpenAI텍스트, 이미지, 오디오GPT-4o, GPT-4V
Google텍스트, 이미지, 비디오, 오디오Gemini 2.5 Pro/Flash
Anthropic텍스트, 이미지, PDFClaude 3.5 Sonnet
Microsoft텍스트, 이미지Azure OpenAI

이미지 데이터 통합하기

이미지 캡셔닝 구현

LangChain은 이미지 처리를 위해 비전 모델과 언어 모델을 통합하여 이미지를 분석하고 설명 텍스트를 생성할 수 있습니다.

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
import base64
import httpx

# 이미지 URL
image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"

# 이미지 데이터 인코딩
image_data = base64.b64encode(httpx.get(image_url).content).decode("utf-8")

# 멀티모달 모델 초기화
model = ChatOpenAI(model="gpt-4o")

# 이미지와 텍스트를 포함한 메시지 생성
message = HumanMessage(
    content=[
        {"type": "text", "text": "이 이미지의 날씨를 설명해주세요"},
        {
            "type": "image_url",
            "image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
        },
    ],
)

# 모델 호출
response = model.invoke([message])
print(response.content)

다중 이미지 처리

LangChain은 여러 이미지를 동시에 처리할 수도 있습니다.

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

# 여러 이미지 비교 분석
def compare_images(image_urls, question="이 이미지들의 차이점을 설명해주세요"):
    model = ChatOpenAI(model="gpt-4o")

    content = [{"type": "text", "text": question}]

    # 여러 이미지 추가
    for i, url in enumerate(image_urls):
        content.append({
            "type": "image_url",
            "image_url": {"url": url}
        })

    message = HumanMessage(content=content)
    response = model.invoke([message])

    return response.content

# 사용 예시
image_urls = [
    "https://example.com/image1.jpg",
    "https://example.com/image2.jpg"
]

result = compare_images(image_urls, "이 두 이미지가 동일한가요?")
print(result)

이미지 캡션 자동 생성 시스템

from langchain_community.document_loaders import ImageCaptionLoader
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

class ImageCaptionSystem:
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = None

    def process_images(self, image_urls):
        """이미지 목록을 처리하여 캡션 생성"""
        # ImageCaptionLoader 사용
        loader = ImageCaptionLoader(images=image_urls)
        documents = loader.load()

        # 벡터 스토어 생성
        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings
        )

        return documents

    def search_images_by_description(self, query, k=3):
        """설명으로 이미지 검색"""
        if not self.vectorstore:
            return "먼저 이미지를 처리해주세요."

        results = self.vectorstore.similarity_search(query, k=k)

        return [
            {
                "image_path": doc.metadata["image_path"],
                "caption": doc.page_content,
                "score": "유사도 점수"
            }
            for doc in results
        ]

# 사용 예시
caption_system = ImageCaptionSystem()

image_urls = [
    "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ec/Ara_ararauna_Luc_Viatour.jpg/1554px-Ara_ararauna_Luc_Viatour.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0c/1928_Model_A_Ford.jpg/640px-1928_Model_A_Ford.jpg"
]

# 이미지 처리
documents = caption_system.process_images(image_urls)
print("생성된 캡션:")
for doc in documents:
    print(f"이미지: {doc.metadata['image_path']}")
    print(f"캡션: {doc.page_content}")
    print("---")

# 이미지 검색
search_results = caption_system.search_images_by_description("새가 나는 모습")
print("검색 결과:", search_results)

멀티모달 RAG(Retrieval Augmented Generation) 구현

LangChain과 Redis를 활용한 멀티모달 RAG 템플릿은 텍스트뿐만 아니라 이미지도 함께 처리할 수 있는 시스템을 구축할 수 있게 해줍니다.

멀티모달 RAG의 작동 방식

  1. 텍스트 및 이미지 임베딩 생성: 텍스트에는 GPT 임베딩, 이미지에는 CLIP 임베딩을 사용
  2. 통합 검색: 사용자 쿼리에 따라 관련 텍스트와 이미지를 함께 검색
  3. 멀티모달 추론: GPT-4V와 같은 멀티모달 모델을 사용하여 텍스트와 이미지를 함께 분석
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Redis
from langchain.schema import Document
import redis
import numpy as np
from PIL import Image
import requests
from io import BytesIO

class MultimodalRAG:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.text_embeddings = OpenAIEmbeddings()
        self.llm = ChatOpenAI(model="gpt-4o")
        self.redis_client = redis.from_url(redis_url)

        # 텍스트와 이미지용 별도 벡터 스토어
        self.text_vectorstore = None
        self.image_vectorstore = None

    def setup_vectorstores(self):
        """벡터 스토어 초기화"""
        self.text_vectorstore = Redis(
            redis_url="redis://localhost:6379",
            index_name="text_index",
            embedding_function=self.text_embeddings
        )

        # 이미지용 벡터 스토어 (CLIP 임베딩 사용)
        self.image_vectorstore = Redis(
            redis_url="redis://localhost:6379", 
            index_name="image_index",
            embedding_function=self.get_clip_embeddings
        )

    def get_clip_embeddings(self, texts):
        """CLIP 모델을 사용한 이미지 임베딩"""
        # 실제 구현에서는 CLIP 모델 사용
        # 여기서는 더미 임베딩 반환
        return np.random.rand(len(texts), 512).tolist()

    def add_documents(self, text_docs, image_docs):
        """텍스트 및 이미지 문서 추가"""
        if text_docs:
            self.text_vectorstore.add_documents(text_docs)

        if image_docs:
            self.image_vectorstore.add_documents(image_docs)

    def multimodal_search(self, query, k=3):
        """멀티모달 검색 수행"""
        # 텍스트 검색
        text_results = self.text_vectorstore.similarity_search(query, k=k)

        # 이미지 검색 (쿼리를 이미지 임베딩과 비교)
        image_results = self.image_vectorstore.similarity_search(query, k=k)

        return {
            "text_results": text_results,
            "image_results": image_results
        }

    def generate_multimodal_response(self, query):
        """멀티모달 응답 생성"""
        # 검색 수행
        search_results = self.multimodal_search(query)

        # 컨텍스트 구성
        text_context = "\n".join([doc.page_content for doc in search_results["text_results"]])

        # 이미지 정보 포함
        image_info = []
        for doc in search_results["image_results"]:
            image_info.append(f"이미지: {doc.metadata.get('image_path', 'Unknown')}")

        # 멀티모달 프롬프트 생성
        content = [
            {"type": "text", "text": f"""
            다음 정보를 바탕으로 질문에 답변해주세요:

            텍스트 정보:
            {text_context}

            관련 이미지 정보:
            {chr(10).join(image_info)}

            질문: {query}
            """}
        ]

        # 이미지가 있는 경우 추가
        for doc in search_results["image_results"]:
            if "image_url" in doc.metadata:
                content.append({
                    "type": "image_url",
                    "image_url": {"url": doc.metadata["image_url"]}
                })

        from langchain_core.messages import HumanMessage
        message = HumanMessage(content=content)

        response = self.llm.invoke([message])
        return response.content

# 사용 예시
multimodal_rag = MultimodalRAG()
multimodal_rag.setup_vectorstores()

# 텍스트 문서 추가
text_docs = [
    Document(page_content="자연 경관에 대한 설명...", metadata={"type": "text"}),
    Document(page_content="동물의 생태에 대한 정보...", metadata={"type": "text"})
]

# 이미지 문서 추가
image_docs = [
    Document(
        page_content="자연 풍경 이미지",
        metadata={
            "type": "image",
            "image_url": "https://example.com/nature.jpg",
            "image_path": "/path/to/nature.jpg"
        }
    )
]

multimodal_rag.add_documents(text_docs, image_docs)

# 멀티모달 쿼리 수행
response = multimodal_rag.generate_multimodal_response("자연 풍경에 대해 설명해주세요")
print(response)

멀티벡터 검색기를 활용한 반구조화 데이터 처리

LangChain의 멀티벡터 검색기(Multi-Vector Retriever)는 테이블, 텍스트, 이미지와 같은 반구조화된 데이터에 대한 RAG를 지원합니다.

from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema import Document
import uuid

class MultiVectorMultimodalRetriever:
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma(
            collection_name="multimodal_docs",
            embedding_function=self.embeddings
        )
        self.docstore = InMemoryByteStore()
        self.id_key = "doc_id"

        # 멀티벡터 검색기 설정
        self.retriever = MultiVectorRetriever(
            vectorstore=self.vectorstore,
            docstore=self.docstore,
            id_key=self.id_key
        )

    def process_multimodal_document(self, file_path):
        """멀티모달 문서 처리"""
        # PDF 로드
        loader = PyPDFLoader(file_path)
        documents = loader.load()

        # 텍스트 분할
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000, 
            chunk_overlap=200
        )
        splits = text_splitter.split_documents(documents)

        # 각 청크에 대해 다양한 표현 생성
        doc_ids = []
        sub_docs = []

        for doc in splits:
            doc_id = str(uuid.uuid4())
            doc_ids.append(doc_id)

            # 원본 문서 저장
            self.docstore.mset([(doc_id, doc)])

            # 요약 생성
            summary = self.generate_summary(doc.page_content)
            summary_doc = Document(
                page_content=summary,
                metadata={**doc.metadata, self.id_key: doc_id, "type": "summary"}
            )
            sub_docs.append(summary_doc)

            # 가상 질문 생성
            questions = self.generate_hypothetical_questions(doc.page_content)
            for question in questions:
                question_doc = Document(
                    page_content=question,
                    metadata={**doc.metadata, self.id_key: doc_id, "type": "question"}
                )
                sub_docs.append(question_doc)

        # 벡터 스토어에 추가
        self.vectorstore.add_documents(sub_docs)

        return len(sub_docs)

    def generate_summary(self, text):
        """텍스트 요약 생성"""
        from langchain_openai import ChatOpenAI

        llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        prompt = f"다음 텍스트를 한 문장으로 요약해주세요:\n\n{text}"

        response = llm.invoke(prompt)
        return response.content

    def generate_hypothetical_questions(self, text):
        """가상 질문 생성"""
        from langchain_openai import ChatOpenAI

        llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
        prompt = f"""
        다음 텍스트를 바탕으로 3개의 질문을 생성해주세요:

        {text}

        질문들을 줄바꿈으로 구분해서 작성해주세요.
        """

        response = llm.invoke(prompt)
        questions = [q.strip() for q in response.content.split('\n') if q.strip()]
        return questions[:3]  # 최대 3개

    def search(self, query, k=3):
        """검색 수행"""
        return self.retriever.get_relevant_documents(query)

# 사용 예시
multi_retriever = MultiVectorMultimodalRetriever()

# 문서 처리
doc_count = multi_retriever.process_multimodal_document("multimodal_document.pdf")
print(f"처리된 서브 문서 수: {doc_count}")

# 검색 수행
results = multi_retriever.search("이 문서의 주요 내용은 무엇인가요?")
for result in results:
    print(f"타입: {result.metadata.get('type', 'unknown')}")
    print(f"내용: {result.page_content[:200]}...")
    print("---")

비디오 데이터 통합

LangChain은 Gemini와 같은 모델을 통해 비디오 데이터도 처리할 수 있습니다. 이를 위해 모델별 네이티브 표현을 지원합니다.

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
import base64

class VideoProcessor:
    def __init__(self):
        # 비디오 처리가 가능한 Gemini 모델 초기화
        self.model = ChatGoogleGenerativeAI(
            model="gemini-2.5-flash",
            temperature=0.3
        )

    def process_video_file(self, video_path, question="이 비디오의 내용을 설명해주세요"):
        """로컬 비디오 파일 처리"""
        # 비디오 파일을 base64로 인코딩
        with open(video_path, "rb") as video_file:
            video_data = base64.b64encode(video_file.read()).decode("utf-8")

        message = HumanMessage(
            content=[
                {"type": "text", "text": question},
                {
                    "type": "video",
                    "source_type": "base64",
                    "data": video_data,
                    "mime_type": "video/mp4"
                }
            ]
        )

        response = self.model.invoke([message])
        return response.content

    def process_video_url(self, video_url, question="이 비디오에서 무엇을 볼 수 있나요?"):
        """비디오 URL 처리"""
        message = HumanMessage(
            content=[
                {"type": "text", "text": question},
                {
                    "type": "video",
                    "source_type": "url",
                    "url": video_url
                }
            ]
        )

        response = self.model.invoke([message])
        return response.content

    def analyze_video_frames(self, video_path, frame_interval=30):
        """비디오 프레임별 분석"""
        import cv2
        import tempfile
        import os

        # 비디오 캡처
        cap = cv2.VideoCapture(video_path)
        frame_count = 0
        analyses = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            if frame_count % frame_interval == 0:
                # 프레임을 임시 파일로 저장
                with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp_file:
                    cv2.imwrite(tmp_file.name, frame)

                    # 프레임 분석
                    with open(tmp_file.name, "rb") as img_file:
                        img_data = base64.b64encode(img_file.read()).decode("utf-8")

                    message = HumanMessage(
                        content=[
                            {"type": "text", "text": f"프레임 {frame_count//frame_interval + 1}의 내용을 설명해주세요"},
                            {
                                "type": "image",
                                "source_type": "base64",
                                "data": img_data,
                                "mime_type": "image/jpeg"
                            }
                        ]
                    )

                    response = self.model.invoke([message])
                    analyses.append({
                        "frame_number": frame_count,
                        "timestamp": frame_count / cap.get(cv2.CAP_PROP_FPS),
                        "analysis": response.content
                    })

                    # 임시 파일 삭제
                    os.unlink(tmp_file.name)

            frame_count += 1

        cap.release()
        return analyses

# 사용 예시
video_processor = VideoProcessor()

# 비디오 파일 처리
try:
    result = video_processor.process_video_file(
        "sample_video.mp4",
        "이 비디오에서 일어나는 주요 사건들을 시간순으로 설명해주세요"
    )
    print("비디오 분석 결과:", result)
except Exception as e:
    print(f"비디오 처리 오류: {e}")

# 프레임별 분석
frame_analyses = video_processor.analyze_video_frames("sample_video.mp4", frame_interval=60)
for analysis in frame_analyses[:3]:  # 처음 3개 프레임만 출력
    print(f"시간: {analysis['timestamp']:.2f}초")
    print(f"분석: {analysis['analysis']}")
    print("---")

오디오 데이터 통합

음성 인식과 텍스트 생성 통합

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import base64
import speech_recognition as sr
from pydub import AudioSegment

class AudioProcessor:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-audio-preview")
        self.recognizer = sr.Recognizer()

    def process_audio_file(self, audio_path, question="이 오디오의 내용을 요약해주세요"):
        """오디오 파일 처리"""
        # 오디오 파일을 base64로 인코딩
        with open(audio_path, "rb") as audio_file:
            audio_data = base64.b64encode(audio_file.read()).decode("utf-8")

        message = HumanMessage(
            content=[
                {"type": "text", "text": question},
                {
                    "type": "audio",
                    "source_type": "base64",
                    "data": audio_data,
                    "mime_type": "audio/mp3"
                }
            ]
        )

        response = self.llm.invoke([message])
        return response.content

    def transcribe_and_analyze(self, audio_path):
        """음성 인식 후 분석"""
        # 음성을 텍스트로 변환
        with sr.AudioFile(audio_path) as source:
            audio = self.recognizer.record(source)
            try:
                transcript = self.recognizer.recognize_google(audio, language='ko-KR')
            except sr.UnknownValueError:
                return "음성을 인식할 수 없습니다."

        # 텍스트 분석
        analysis_prompt = f"""
        다음 음성 전사 내용을 분석해주세요:

        전사 내용: {transcript}

        분석 요청:
        1. 주요 내용 요약
        2. 감정 분석
        3. 핵심 키워드 추출
        """

        response = self.llm.invoke(analysis_prompt)

        return {
            "transcript": transcript,
            "analysis": response.content
        }

    def convert_audio_format(self, input_path, output_path, target_format="wav"):
        """오디오 형식 변환"""
        audio = AudioSegment.from_file(input_path)
        audio.export(output_path, format=target_format)
        return output_path

# 사용 예시
audio_processor = AudioProcessor()

# 오디오 파일 처리
try:
    result = audio_processor.process_audio_file(
        "sample_audio.mp3",
        "이 오디오에서 말하는 주요 내용과 화자의 감정을 분석해주세요"
    )
    print("오디오 분석 결과:", result)
except Exception as e:
    print(f"오디오 처리 오류: {e}")

# 음성 인식 및 분석
transcription_result = audio_processor.transcribe_and_analyze("sample_audio.wav")
print("전사 내용:", transcription_result["transcript"])
print("분석 결과:", transcription_result["analysis"])

멀티모달 데이터 통합의 과제와 해결책

데이터 이질성 해결

LangChain은 다양한 모달리티의 데이터를 통합 표현으로 변환하는 기술을 사용합니다. 이는 이미지, 오디오, 텍스트를 균일하게 처리할 수 있는 형식으로 변환하여 데이터의 이질적 특성으로 인한 복잡성을 줄입니다.

from langchain.schema import Document
from typing import List, Dict, Any
import hashlib

class MultimodalDataNormalizer:
    def __init__(self):
        self.supported_types = ["text", "image", "audio", "video"]

    def normalize_data(self, data_items: List[Dict[str, Any]]) -> List[Document]:
        """다양한 모달리티 데이터를 통합 형식으로 정규화"""
        normalized_docs = []

        for item in data_items:
            data_type = item.get("type")
            content = item.get("content")
            metadata = item.get("metadata", {})

            if data_type not in self.supported_types:
                continue

            # 통합 메타데이터 생성
            unified_metadata = {
                "data_type": data_type,
                "content_hash": self.generate_content_hash(content),
                "processing_timestamp": "2025-07-04T16:48:00",
                **metadata
            }

            # 데이터 타입별 처리
            if data_type == "text":
                processed_content = self.process_text(content)
            elif data_type == "image":
                processed_content = self.process_image(content)
            elif data_type == "audio":
                processed_content = self.process_audio(content)
            elif data_type == "video":
                processed_content = self.process_video(content)

            # Document 객체 생성
            doc = Document(
                page_content=processed_content,
                metadata=unified_metadata
            )

            normalized_docs.append(doc)

        return normalized_docs

    def generate_content_hash(self, content):
        """콘텐츠 해시 생성"""
        return hashlib.md5(str(content).encode()).hexdigest()

    def process_text(self, text_content):
        """텍스트 데이터 처리"""
        # 텍스트 정제 및 정규화
        cleaned_text = text_content.strip()
        return f"[TEXT] {cleaned_text}"

    def process_image(self, image_content):
        """이미지 데이터 처리"""
        # 이미지 메타데이터 추출
        if isinstance(image_content, dict):
            return f"[IMAGE] {image_content.get('description', 'Image content')}"
        return f"[IMAGE] {str(image_content)}"

    def process_audio(self, audio_content):
        """오디오 데이터 처리"""
        # 오디오 메타데이터 추출
        if isinstance(audio_content, dict):
            return f"[AUDIO] {audio_content.get('transcript', 'Audio content')}"
        return f"[AUDIO] {str(audio_content)}"

    def process_video(self, video_content):
        """비디오 데이터 처리"""
        # 비디오 메타데이터 추출
        if isinstance(video_content, dict):
            return f"[VIDEO] {video_content.get('summary', 'Video content')}"
        return f"[VIDEO] {str(video_content)}"

# 사용 예시
normalizer = MultimodalDataNormalizer()

sample_data = [
    {
        "type": "text",
        "content": "이것은 샘플 텍스트입니다.",
        "metadata": {"source": "document1.txt"}
    },
    {
        "type": "image", 
        "content": {"description": "자연 풍경 사진", "url": "image1.jpg"},
        "metadata": {"source": "gallery"}
    },
    {
        "type": "audio",
        "content": {"transcript": "안녕하세요, 반갑습니다.", "duration": 3.5},
        "metadata": {"source": "recording1.mp3"}
    }
]

normalized_docs = normalizer.normalize_data(sample_data)
for doc in normalized_docs:
    print(f"타입: {doc.metadata['data_type']}")
    print(f"내용: {doc.page_content}")
    print(f"해시: {doc.metadata['content_hash']}")
    print("---")

컨텍스트 임베딩을 통한 정렬

LangChain은 딥러닝 모델을 활용하여 다양한 모달리티의 데이터에서 의미적 의미를 포착하는 컨텍스트 임베딩을 생성합니다.

from langchain_openai import OpenAIEmbeddings
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class MultimodalEmbeddingAligner:
    def __init__(self):
        self.text_embeddings = OpenAIEmbeddings()
        self.embedding_dim = 1536  # OpenAI 임베딩 차원

    def create_unified_embedding(self, multimodal_content):
        """멀티모달 콘텐츠의 통합 임베딩 생성"""
        embeddings = []
        weights = []

        # 텍스트 임베딩
        if "text" in multimodal_content:
            text_emb = self.text_embeddings.embed_query(multimodal_content["text"])
            embeddings.append(text_emb)
            weights.append(0.4)  # 텍스트 가중치

        # 이미지 임베딩 (CLIP 등 사용)
        if "image" in multimodal_content:
            image_emb = self.get_image_embedding(multimodal_content["image"])
            embeddings.append(image_emb)
            weights.append(0.3)  # 이미지 가중치

        # 오디오 임베딩
        if "audio" in multimodal_content:
            audio_emb = self.get_audio_embedding(multimodal_content["audio"])
            embeddings.append(audio_emb)
            weights.append(0.3)  # 오디오 가중치

        # 가중 평균으로 통합 임베딩 생성
        if embeddings:
            weighted_embedding = np.average(embeddings, axis=0, weights=weights)
            return weighted_embedding.tolist()

        return None

    def get_image_embedding(self, image_description):
        """이미지 임베딩 생성 (CLIP 등 사용)"""
        # 실제 구현에서는 CLIP 모델 사용
        # 여기서는 텍스트 임베딩으로 대체
        return self.text_embeddings.embed_query(f"이미지: {image_description}")

    def get_audio_embedding(self, audio_transcript):
        """오디오 임베딩 생성"""
        # 실제 구현에서는 오디오 특화 모델 사용
        # 여기서는 텍스트 임베딩으로 대체
        return self.text_embeddings.embed_query(f"오디오: {audio_transcript}")

    def align_multimodal_data(self, data_items):
        """멀티모달 데이터 정렬"""
        aligned_data = []

        for item in data_items:
            unified_embedding = self.create_unified_embedding(item["content"])

            if unified_embedding:
                aligned_item = {
                    "id": item.get("id"),
                    "content": item["content"],
                    "embedding": unified_embedding,
                    "metadata": item.get("metadata", {})
                }
                aligned_data.append(aligned_item)

        return aligned_data

    def find_similar_items(self, query_embedding, aligned_data, top_k=3):
        """유사한 아이템 찾기"""
        similarities = []

        for item in aligned_data:
            similarity = cosine_similarity(
                [query_embedding], 
                [item["embedding"]]
            )[0][0]

            similarities.append({
                "item": item,
                "similarity": similarity
            })

        # 유사도 순으로 정렬
        similarities.sort(key=lambda x: x["similarity"], reverse=True)

        return similarities[:top_k]

# 사용 예시
aligner = MultimodalEmbeddingAligner()

sample_multimodal_data = [
    {
        "id": "item1",
        "content": {
            "text": "아름다운 자연 풍경",
            "image": "산과 호수가 있는 풍경",
            "audio": "새소리와 바람소리"
        },
        "metadata": {"category": "nature"}
    },
    {
        "id": "item2", 
        "content": {
            "text": "도시의 야경",
            "image": "밤에 빛나는 건물들",
            "audio": "도시의 소음"
        },
        "metadata": {"category": "urban"}
    }
]

# 데이터 정렬
aligned_data = aligner.align_multimodal_data(sample_multimodal_data)

# 쿼리 임베딩 생성
query_content = {"text": "평화로운 자연 환경"}
query_embedding = aligner.create_unified_embedding(query_content)

# 유사한 아이템 찾기
if query_embedding:
    similar_items = aligner.find_similar_items(query_embedding, aligned_data)

    for item in similar_items:
        print(f"ID: {item['item']['id']}")
        print(f"유사도: {item['similarity']:.3f}")
        print(f"카테고리: {item['item']['metadata']['category']}")
        print("---")

확장 가능한 처리 프레임워크

LangChain은 대용량 데이터를 처리하기 위해 확장 가능한 처리 프레임워크를 활용합니다.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable
import time

class ScalableMultimodalProcessor:
    def __init__(self, max_workers=4):
        self.max_workers = max_workers
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    async def process_batch_async(self, data_batch: List, processor_func: Callable):
        """비동기 배치 처리"""
        loop = asyncio.get_event_loop()

        # 각 데이터 항목을 별도 스레드에서 처리
        tasks = [
            loop.run_in_executor(self.executor, processor_func, item)
            for item in data_batch
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 성공한 결과만 반환
        successful_results = [
            result for result in results 
            if not isinstance(result, Exception)
        ]

        return successful_results

    def process_large_dataset(self, dataset: List, processor_func: Callable, batch_size=10):
        """대용량 데이터셋 처리"""
        total_items = len(dataset)
        processed_items = 0
        all_results = []

        print(f"총 {total_items}개 항목 처리 시작...")

        # 배치 단위로 처리
        for i in range(0, total_items, batch_size):
            batch = dataset[i:i + batch_size]

            start_time = time.time()

            # 비동기 배치 처리
            batch_results = asyncio.run(
                self.process_batch_async(batch, processor_func)
            )

            processing_time = time.time() - start_time
            processed_items += len(batch)

            all_results.extend(batch_results)

            # 진행 상황 출력
            progress = (processed_items / total_items) * 100
            print(f"진행률: {progress:.1f}% ({processed_items}/{total_items})")
            print(f"배치 처리 시간: {processing_time:.2f}초")

            # 메모리 관리를 위한 짧은 대기
            time.sleep(0.1)

        print(f"처리 완료: {len(all_results)}개 결과")
        return all_results

    def create_processing_pipeline(self, processors: List[Callable]):
        """처리 파이프라인 생성"""
        def pipeline_processor(data_item):
            result = data_item

            for processor in processors:
                try:
                    result = processor(result)
                except Exception as e:
                    print(f"파이프라인 처리 오류: {e}")
                    return None

            return result

        return pipeline_processor

# 사용 예시
def sample_text_processor(item):
    """샘플 텍스트 처리 함수"""
    time.sleep(0.1)  # 처리 시간 시뮬레이션
    return f"처리됨: {item.get('text', 'No text')}"

def sample_image_processor(item):
    """샘플 이미지 처리 함수"""
    time.sleep(0.2)  # 처리 시간 시뮬레이션
    return f"이미지 분석: {item.get('image', 'No image')}"

# 확장 가능한 처리기 초기화
processor = ScalableMultimodalProcessor(max_workers=8)

# 샘플 데이터셋
large_dataset = [
    {"text": f"텍스트 {i}", "image": f"이미지 {i}"}
    for i in range(100)
]

# 파이프라인 생성
pipeline = processor.create_processing_pipeline([
    sample_text_processor,
    sample_image_processor
])

# 대용량 데이터셋 처리
start_time = time.time()
results = processor.process_large_dataset(
    large_dataset, 
    sample_text_processor,  # 단일 처리기 사용
    batch_size=20
)
total_time = time.time() - start_time

print(f"전체 처리 시간: {total_time:.2f}초")
print(f"처리 속도: {len(results)/total_time:.2f} 항목/초")

결론

LangChain은 텍스트, 이미지, 비디오와 같은 다양한 형태의 데이터를 통합하여 처리할 수 있는 강력한 프레임워크를 제공합니다.
멀티모달 데이터 통합을 통해 더 풍부하고 정확한 AI 애플리케이션을 구축할 수 있으며, LangChain의 유연한 아키텍처는 다양한 모델 제공업체와의 통합을 용이하게 합니다.
멀티모달 RAG와 같은 고급 기능을 활용하면 텍스트와 이미지를 함께 분석하여 더 포괄적인 이해와 응답 정확도를 향상시킬 수 있습니다.
핵심 포인트:

  • 다양한 모달리티 지원: 텍스트, 이미지, 오디오, 비디오 통합 처리
  • 멀티모달 RAG 구현: 텍스트와 이미지를 함께 활용한 검색 증강 생성
  • 멀티벡터 검색기: 반구조화된 데이터의 효율적 처리
  • 확장 가능한 아키텍처: 대용량 멀티모달 데이터 처리
  • 모델 제공업체 통합: OpenAI, Google, Anthropic 등 다양한 모델 지원
  • 데이터 정렬 및 정규화: 이질적 데이터의 통합 표현

LangChain의 멀티모달 기능을 활용하면 단순한 텍스트 기반 AI를 넘어서 시각, 청각, 텍스트 정보를 종합적으로 이해하고 처리하는 차세대 AI 애플리케이션을 구축할 수 있습니다.

반응형