IT기술/랭체인 (langchain)

LangChain 컴퓨터 비전 통합 완벽 가이드: 이미지 분석부터 멀티모달 AI 시스템까지

후스파 2025. 7. 12. 14:56
반응형

LangChain은 텍스트 기반 작업에 특화되었지만, 컴퓨터 비전 모델과의 통합을 통해 이미지 분석, 객체 인식, 이미지 캡셔닝 등 다양한 비전 태스크를 처리할 수 있습니다.
이 글에서는 LangChain을 컴퓨터 비전 작업에 효과적으로 적용하는 방법과 실제 사례를 소개합니다.


멀티모달 파이프라인 구축 전략

LangChain은 컴퓨터 비전 모델과 언어 모델을 결합한 멀티모달 처리 아키텍처를 지원합니다.

핵심 구성 요소

  • 이미지 로더: OpenCV, PIL 등으로 이미지 로드
  • 비전 모델: CLIP, BLIP, YOLO 등 객체 인식/이미지 이해 모델
  • 언어 모델: GPT-4, Llama-3 등 텍스트 생성 모델
  • 벡터 데이터베이스: Milvus, FAISS 등 특징 임베딩 저장

멀티모달 아키텍처 설계

from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel, BlipProcessor, BlipForConditionalGeneration
import cv2
import numpy as np

class MultimodalVisionPipeline:
    def __init__(self):
        # 비전 모델 초기화
        self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")

        self.blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large")
        self.blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large")

        # 언어 모델 초기화
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.3)

    def load_and_preprocess_image(self, image_input):
        """이미지 로드 및 전처리"""
        if isinstance(image_input, str):
            if image_input.startswith(('http://', 'https://')):
                import requests
                response = requests.get(image_input)
                image = Image.open(BytesIO(response.content))
            else:
                image = Image.open(image_input)
        else:
            image = image_input

        # 이미지 크기 정규화
        if image.size[0] > 1024 or image.size[1] > 1024:
            image.thumbnail((1024, 1024), Image.Resampling.LANCZOS)

        return {"image": image, "image_size": image.size}

    def extract_clip_features(self, inputs):
        """CLIP 기반 특징 추출"""
        image = inputs["image"]

        # CLIP 처리
        clip_inputs = self.clip_processor(images=image, return_tensors="pt")
        with torch.no_grad():
            image_features = self.clip_model.get_image_features(**clip_inputs)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)

        return {
            **inputs,
            "clip_features": image_features,
            "clip_embedding": image_features.numpy().tolist()
        }

    def generate_caption(self, inputs):
        """BLIP 기반 이미지 캡셔닝"""
        image = inputs["image"]

        # BLIP 처리
        blip_inputs = self.blip_processor(image, return_tensors="pt")
        with torch.no_grad():
            out = self.blip_model.generate(**blip_inputs, max_length=50)
            caption = self.blip_processor.decode(out[0], skip_special_tokens=True)

        return {
            **inputs,
            "caption": caption
        }

    def analyze_objects(self, inputs):
        """객체 감지 및 분석"""
        image = inputs["image"]

        # OpenCV를 사용한 기본 객체 감지
        img_array = np.array(image)
        gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)

        # 간단한 윤곽선 감지
        edges = cv2.Canny(gray, 50, 150)
        contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        object_count = len([c for c in contours if cv2.contourArea(c) > 100])

        return {
            **inputs,
            "object_count": object_count,
            "has_objects": object_count > 0
        }

    def create_comprehensive_pipeline(self):
        """종합적인 이미지 분석 파이프라인"""

        # 병렬 처리를 위한 체인 구성
        parallel_analysis = RunnableParallel({
            "basic_info": RunnableLambda(self.load_and_preprocess_image),
            "features": RunnableLambda(self.load_and_preprocess_image) | RunnableLambda(self.extract_clip_features),
            "caption": RunnableLambda(self.load_and_preprocess_image) | RunnableLambda(self.generate_caption),
            "objects": RunnableLambda(self.load_and_preprocess_image) | RunnableLambda(self.analyze_objects)
        })

        # 결과 통합 및 최종 분석
        def integrate_results(parallel_outputs):
            basic_info = parallel_outputs["basic_info"]
            features = parallel_outputs["features"]
            caption = parallel_outputs["caption"]
            objects = parallel_outputs["objects"]

            # 모든 정보를 통합
            integrated_data = {
                "image_size": basic_info["image_size"],
                "caption": caption["caption"],
                "object_count": objects["object_count"],
                "has_objects": objects["has_objects"],
                "clip_features": features["clip_embedding"]
            }

            return integrated_data

        # LLM 기반 최종 분석
        analysis_prompt = ChatPromptTemplate.from_template("""
        다음 이미지 분석 결과를 바탕으로 종합적인 분석을 제공해주세요:

        이미지 크기: {image_size}
        자동 생성 캡션: {caption}
        감지된 객체 수: {object_count}
        객체 존재 여부: {has_objects}

        다음 형식으로 분석 결과를 제공해주세요:
        1. 이미지 주요 내용 요약
        2. 감지된 객체들의 특징
        3. 이미지 품질 평가
        4. 추천 활용 방안
        """)

        # 전체 파이프라인 구성
        full_pipeline = (
            parallel_analysis
            | RunnableLambda(integrate_results)
            | analysis_prompt
            | self.llm
        )

        return full_pipeline

# 사용 예시
pipeline = MultimodalVisionPipeline()
comprehensive_pipeline = pipeline.create_comprehensive_pipeline()

# 이미지 분석 실행
result = comprehensive_pipeline.invoke("https://example.com/image.jpg")
print(result.content)

이미지 캡셔닝 구현 예시

CLIP + GPT-4 조합으로 이미지 설명 생성

from langchain.chains import TransformChain  
from PIL import Image  
import requests  

# 이미지 → 텍스트 변환 체인  
def load_image(inputs):  
    image = Image.open(requests.get(inputs["url"], stream=True).raw)  
    return {"image": image}  

image_chain = TransformChain(  
    input_variables=["url"],  
    output_variables=["image"],  
    transform=load_image  
)  

# CLIP 기반 특징 추출  
from transformers import CLIPProcessor, CLIPModel  

clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")  
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")  

def clip_embedding(inputs):  
    inputs = clip_processor(images=inputs["image"], return_tensors="pt")  
    outputs = clip_model.get_image_features(**inputs)  
    return {"embedding": outputs}  

vision_chain = TransformChain(  
    input_variables=["image"],  
    output_variables=["embedding"],  
    transform=clip_embedding  
)  

# LangChain 파이프라인 구성  
from langchain.chains import SequentialChain  

caption_pipeline = SequentialChain(  
    chains=[image_chain, vision_chain, caption_chain],  
    input_variables=["url"]  
)  

result = caption_pipeline({"url": "https://example.com/image.jpg"})  
print(result["caption"])  

고급 이미지 캡셔닝 시스템

class AdvancedImageCaptioning:
    def __init__(self):
        self.pipeline = MultimodalVisionPipeline()

    def create_detailed_captioning_chain(self):
        """상세한 이미지 캡셔닝 체인"""

        def analyze_image_composition(inputs):
            """이미지 구성 요소 분석"""
            image = inputs["image"]

            # 색상 분석
            img_array = np.array(image)
            dominant_colors = self.extract_dominant_colors(img_array)

            # 밝기 분석
            brightness = np.mean(img_array)

            # 대비 분석
            contrast = np.std(img_array)

            return {
                **inputs,
                "dominant_colors": dominant_colors,
                "brightness": brightness,
                "contrast": contrast
            }

        def generate_contextual_caption(inputs):
            """맥락적 캡션 생성"""

            context_prompt = ChatPromptTemplate.from_template("""
            다음 이미지 정보를 바탕으로 상세하고 맥락적인 설명을 생성해주세요:

            기본 캡션: {caption}
            주요 색상: {dominant_colors}
            밝기 수준: {brightness}
            대비 수준: {contrast}
            객체 수: {object_count}

            다음 요소들을 포함한 자연스러운 설명을 작성해주세요:
            1. 주요 객체나 인물
            2. 배경과 환경
            3. 색상과 분위기
            4. 구도와 시각적 특징
            """)

            chain = context_prompt | self.pipeline.llm
            return chain.invoke(inputs)

        # 체인 구성
        detailed_chain = (
            RunnableLambda(self.pipeline.load_and_preprocess_image)
            | RunnableLambda(self.pipeline.generate_caption)
            | RunnableLambda(self.pipeline.analyze_objects)
            | RunnableLambda(analyze_image_composition)
            | RunnableLambda(generate_contextual_caption)
        )

        return detailed_chain

    def extract_dominant_colors(self, img_array, k=3):
        """주요 색상 추출"""
        from sklearn.cluster import KMeans

        # 이미지를 1차원 배열로 변환
        pixels = img_array.reshape(-1, 3)

        # K-means 클러스터링으로 주요 색상 추출
        kmeans = KMeans(n_clusters=k, random_state=42)
        kmeans.fit(pixels)

        colors = kmeans.cluster_centers_.astype(int)
        return [f"RGB({c[0]}, {c[1]}, {c[2]})" for c in colors]

# 사용 예시
captioning_system = AdvancedImageCaptioning()
detailed_chain = captioning_system.create_detailed_captioning_chain()

result = detailed_chain.invoke("path/to/image.jpg")
print(result.content)

주요 적용 사례

의료 이미지 분석

X-ray, MRI 이미지 분석 및 보고서 자동 생성

class MedicalImageAnalyzer:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0.1)

    def create_medical_analysis_chain(self):
        """의료 이미지 분석 체인"""

        def load_dicom_image(inputs):
            """DICOM 이미지 로드"""
            import pydicom
            from pydicom.pixel_data_handlers.util import apply_voi_lut

            dicom_path = inputs["dicom_path"]
            ds = pydicom.dcmread(dicom_path)

            # 픽셀 데이터 추출
            image = apply_voi_lut(ds.pixel_array, ds)

            # 메타데이터 추출
            metadata = {
                "patient_id": getattr(ds, 'PatientID', 'Unknown'),
                "study_date": getattr(ds, 'StudyDate', 'Unknown'),
                "modality": getattr(ds, 'Modality', 'Unknown'),
                "body_part": getattr(ds, 'BodyPartExamined', 'Unknown')
            }

            return {
                "image": image,
                "metadata": metadata,
                "dicom_data": ds
            }

        def preprocess_medical_image(inputs):
            """의료 이미지 전처리"""
            image = inputs["image"]

            # 정규화
            image_normalized = (image - image.min()) / (image.max() - image.min())

            # 대비 향상
            image_enhanced = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)).apply(
                (image_normalized * 255).astype(np.uint8)
            )

            return {
                **inputs,
                "processed_image": image_enhanced
            }

        def detect_anomalies(inputs):
            """이상 소견 감지"""
            # 실제 구현에서는 훈련된 의료 AI 모델 사용
            processed_image = inputs["processed_image"]

            # 간단한 이상 감지 시뮬레이션
            # 실제로는 ResNet, DenseNet 등의 의료 특화 모델 사용
            anomaly_score = np.random.random()  # 실제 모델 예측으로 대체

            findings = []
            if anomaly_score > 0.7:
                findings.append("의심스러운 음영 발견")
            if anomaly_score > 0.8:
                findings.append("추가 검사 권장")

            return {
                **inputs,
                "anomaly_score": anomaly_score,
                "findings": findings
            }

        def generate_medical_report(inputs):
            """의료 보고서 생성"""

            report_prompt = ChatPromptTemplate.from_template("""
            다음 의료 이미지 분석 결과를 바탕으로 전문적인 의료 보고서를 작성해주세요:

            환자 정보:
            - 환자 ID: {patient_id}
            - 검사 날짜: {study_date}
            - 검사 부위: {body_part}
            - 영상 기법: {modality}

            분석 결과:
            - 이상 점수: {anomaly_score}
            - 발견 사항: {findings}

            다음 형식으로 보고서를 작성해주세요:

            [영상의학과 판독 보고서]

            1. 검사 정보
            2. 영상 소견
            3. 판독 의견
            4. 권장 사항

            주의: 이는 AI 보조 분석이며, 최종 진단은 전문의가 수행해야 합니다.
            """)

            chain = report_prompt | self.llm
            return chain.invoke({
                "patient_id": inputs["metadata"]["patient_id"],
                "study_date": inputs["metadata"]["study_date"],
                "body_part": inputs["metadata"]["body_part"],
                "modality": inputs["metadata"]["modality"],
                "anomaly_score": inputs["anomaly_score"],
                "findings": ", ".join(inputs["findings"]) if inputs["findings"] else "특이 소견 없음"
            })

        # 의료 분석 체인 구성
        medical_chain = (
            RunnableLambda(load_dicom_image)
            | RunnableLambda(preprocess_medical_image)
            | RunnableLambda(detect_anomalies)
            | RunnableLambda(generate_medical_report)
        )

        return medical_chain

# 사용 예시
medical_analyzer = MedicalImageAnalyzer()
medical_chain = medical_analyzer.create_medical_analysis_chain()

report = medical_chain.invoke({"dicom_path": "chest_xray.dcm"})
print(report.content)

제조업 결함 검출

생산 라인 이미지에서 결함 식별

class ManufacturingQualityControl:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0.2)

    def create_defect_detection_chain(self):
        """결함 검출 체인"""

        def load_production_image(inputs):
            """생산 라인 이미지 로드"""
            image_path = inputs["image_path"]
            image = cv2.imread(image_path)
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            return {
                "image": image_rgb,
                "image_path": image_path,
                "timestamp": inputs.get("timestamp", "unknown")
            }

        def detect_defects(inputs):
            """결함 감지"""
            image = inputs["image"]

            # 그레이스케일 변환
            gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

            # 가우시안 블러 적용
            blurred = cv2.GaussianBlur(gray, (5, 5), 0)

            # 임계값 적용
            _, thresh = cv2.threshold(blurred, 127, 255, cv2.THRESH_BINARY)

            # 윤곽선 감지
            contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

            # 결함 분류
            defects = []
            for contour in contours:
                area = cv2.contourArea(contour)
                perimeter = cv2.arcLength(contour, True)

                if area > 100:  # 최소 크기 필터
                    # 원형도 계산
                    circularity = 4 * np.pi * area / (perimeter * perimeter) if perimeter > 0 else 0

                    if circularity  500 else "낮음"
                        })

            return {
                **inputs,
                "defects": defects,
                "defect_count": len(defects)
            }

        def classify_defect_severity(inputs):
            """결함 심각도 분류"""
            defects = inputs["defects"]

            severity_levels = {
                "critical": 0,
                "major": 0,
                "minor": 0
            }

            for defect in defects:
                if defect["area"] > 1000:
                    severity_levels["critical"] += 1
                elif defect["area"] > 500:
                    severity_levels["major"] += 1
                else:
                    severity_levels["minor"] += 1

            # 전체 품질 등급 결정
            if severity_levels["critical"] > 0:
                quality_grade = "불합격"
            elif severity_levels["major"] > 2:
                quality_grade = "재검사 필요"
            elif severity_levels["minor"] > 5:
                quality_grade = "주의"
            else:
                quality_grade = "합격"

            return {
                **inputs,
                "severity_levels": severity_levels,
                "quality_grade": quality_grade
            }

        def generate_quality_report(inputs):
            """품질 검사 보고서 생성"""

            report_prompt = ChatPromptTemplate.from_template("""
            다음 제품 품질 검사 결과를 바탕으로 상세한 품질 보고서를 작성해주세요:

            검사 정보:
            - 이미지 경로: {image_path}
            - 검사 시간: {timestamp}

            검출 결과:
            - 총 결함 수: {defect_count}
            - 심각한 결함: {critical_defects}개
            - 주요 결함: {major_defects}개
            - 경미한 결함: {minor_defects}개
            - 품질 등급: {quality_grade}

            다음 형식으로 보고서를 작성해주세요:

            [품질 검사 보고서]

            1. 검사 개요
            2. 결함 분석
            3. 품질 평가
            4. 권장 조치사항
            5. 다음 검사 일정
            """)

            chain = report_prompt | self.llm
            return chain.invoke({
                "image_path": inputs["image_path"],
                "timestamp": inputs["timestamp"],
                "defect_count": inputs["defect_count"],
                "critical_defects": inputs["severity_levels"]["critical"],
                "major_defects": inputs["severity_levels"]["major"],
                "minor_defects": inputs["severity_levels"]["minor"],
                "quality_grade": inputs["quality_grade"]
            })

        # 품질 검사 체인 구성
        quality_chain = (
            RunnableLambda(load_production_image)
            | RunnableLambda(detect_defects)
            | RunnableLambda(classify_defect_severity)
            | RunnableLambda(generate_quality_report)
        )

        return quality_chain

# 사용 예시
quality_control = ManufacturingQualityControl()
quality_chain = quality_control.create_defect_detection_chain()

result = quality_chain.invoke({
    "image_path": "production_sample.jpg",
    "timestamp": "2025-07-04 17:06:00"
})
print(result.content)

소매업 재고 관리

선반 이미지 분석을 통한 재고 추적

class RetailInventoryManager:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0.3)

    def create_inventory_tracking_chain(self):
        """재고 추적 체인"""

        def capture_shelf_image(inputs):
            """선반 이미지 캡처"""
            image_path = inputs["shelf_image_path"]
            shelf_id = inputs["shelf_id"]

            image = cv2.imread(image_path)
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            return {
                "image": image_rgb,
                "shelf_id": shelf_id,
                "capture_time": inputs.get("capture_time", "unknown")
            }

        def count_products(inputs):
            """제품 개수 계산"""
            image = inputs["image"]

            # 제품 감지를 위한 이미지 처리
            gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

            # 적응적 임계값 적용
            adaptive_thresh = cv2.adaptiveThreshold(
                gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
            )

            # 윤곽선 감지
            contours, _ = cv2.findContours(adaptive_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

            # 제품으로 추정되는 객체 필터링
            products = []
            for contour in contours:
                area = cv2.contourArea(contour)
                if 1000  0:
                diff = cv2.absdiff(self.frame_buffer[-1], frame)
                gray_diff = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
                motion_level = np.mean(gray_diff)
            else:
                motion_level = 0

            # 객체 감지 (간단한 예시)
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
            faces = face_cascade.detectMultiScale(gray, 1.1, 4)

            # 프레임 버퍼 관리
            self.frame_buffer.append(frame)
            if len(self.frame_buffer) > 5:
                self.frame_buffer.pop(0)

            return {
                "motion_level": motion_level,
                "face_count": len(faces),
                "faces": faces.tolist() if len(faces) > 0 else []
            }

        def generate_real_time_alert(self, analysis_data):
            """실시간 알림 생성"""
            motion_level = analysis_data["analysis"]["motion_level"]
            face_count = analysis_data["analysis"]["face_count"]

            alerts = []

            if motion_level > 50:
                alerts.append("높은 움직임 감지")

            if face_count > 5:
                alerts.append("다수 인원 감지")

            if face_count == 0 and motion_level > 30:
                alerts.append("무인 상태에서 움직임 감지")

            return {
                **analysis_data,
                "alerts": alerts,
                "alert_level": "high" if len(alerts) > 1 else "low" if len(alerts) == 1 else "normal"
            }

        return process_video_stream, generate_real_time_alert

# 사용 예시
video_analyzer = RealTimeVideoAnalyzer()
stream_processor, alert_generator = video_analyzer.create_video_analysis_chain()

# 실시간 비디오 분석 실행
for frame_data in stream_processor(0):  # 웹캠 사용
    alert_data = alert_generator(frame_data)

    if alert_data["alert_level"] != "normal":
        print(f"알림: {alert_data['alerts']}")

3D 이미지 처리

Point cloud 데이터와 결합한 공간 분석

class ThreeDimensionalAnalyzer:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0.2)

    def create_3d_analysis_chain(self):
        """3D 분석 체인"""

        def load_3d_scan(inputs):
            """3D 스캔 데이터 로드"""
            # Open3D 라이브러리 사용 예시
            try:
                import open3d as o3d

                point_cloud_path = inputs["point_cloud_path"]
                pcd = o3d.io.read_point_cloud(point_cloud_path)

                # 포인트 클라우드 정보 추출
                points = np.asarray(pcd.points)
                colors = np.asarray(pcd.colors) if pcd.has_colors() else None

                return {
                    "point_cloud": pcd,
                    "points": points,
                    "colors": colors,
                    "point_count": len(points)
                }
            except ImportError:
                return {"error": "Open3D 라이브러리가 필요합니다"}

        def segment_3d_objects(inputs):
            """3D 객체 분할"""
            import open3d as o3d

            pcd = inputs["point_cloud"]

            # 평면 분할 (RANSAC)
            plane_model, inliers = pcd.segment_plane(
                distance_threshold=0.01,
                ransac_n=3,
                num_iterations=1000
            )

            # 평면과 객체 분리
            plane_cloud = pcd.select_by_index(inliers)
            object_cloud = pcd.select_by_index(inliers, invert=True)

            # 클러스터링으로 개별 객체 분리
            labels = np.array(object_cloud.cluster_dbscan(eps=0.02, min_points=10))
            max_label = labels.max()

            objects = []
            for i in range(max_label + 1):
                object_indices = np.where(labels == i)[0]
                if len(object_indices) > 50:  # 최소 포인트 수
                    obj_points = np.asarray(object_cloud.points)[object_indices]

                    # 객체 바운딩 박스 계산
                    min_bound = obj_points.min(axis=0)
                    max_bound = obj_points.max(axis=0)
                    dimensions = max_bound - min_bound

                    objects.append({
                        "id": i,
                        "point_count": len(object_indices),
                        "min_bound": min_bound.tolist(),
                        "max_bound": max_bound.tolist(),
                        "dimensions": dimensions.tolist(),
                        "volume": np.prod(dimensions)
                    })

            return {
                **inputs,
                "plane_model": plane_model,
                "objects": objects,
                "object_count": len(objects)
            }

        def analyze_spatial_relationships(inputs):
            """공간 관계 분석"""
            objects = inputs["objects"]

            relationships = []

            for i, obj1 in enumerate(objects):
                for j, obj2 in enumerate(objects[i+1:], i+1):
                    # 두 객체 간 거리 계산
                    center1 = np.array(obj1["min_bound"]) + np.array(obj1["dimensions"]) / 2
                    center2 = np.array(obj2["min_bound"]) + np.array(obj2["dimensions"]) / 2
                    distance = np.linalg.norm(center1 - center2)

                    # 상대적 위치 관계
                    if center1[2] > center2[2] + obj2["dimensions"][2]:
                        relation = "위에 위치"
                    elif center1[2]  0) / edges.size

            # 색상 다양성 계산
            unique_colors = len(np.unique(img_array.reshape(-1, 3), axis=0))
            color_complexity = min(unique_colors / 1000, 1.0)

            # 전체 복잡도
            complexity = (edge_density + color_complexity) / 2

            return complexity

        def estimate_processing_cost(self, model_type):
            """처리 비용 추정"""
            cost_per_request = {
                "lightweight": 0.001,
                "standard": 0.005,
                "premium": 0.02
            }

            return cost_per_request.get(model_type, 0.005)

        def check_budget_limit(self, estimated_cost):
            """예산 한도 확인"""
            today = datetime.now().strftime("%Y-%m-%d")
            daily_usage = self.usage_tracker.get(today, 0)

            return (daily_usage + estimated_cost)  self.max_memory_mb:
                    # 가비지 컬렉션 수행
                    import gc
                    gc.collect()

                # 타일 추출
                tile = large_image[y:y+tile_size, x:x+tile_size]

                # 타일 처리
                tile_result = self.process_tile(tile, x, y)
                results.append(tile_result)

        return self.merge_tile_results(results)

    def process_tile(self, tile, offset_x, offset_y):
        """개별 타일 처리"""
        # 타일별 분석 로직
        return {
            "tile_position": (offset_x, offset_y),
            "tile_size": tile.shape,
            "analysis": "타일 분석 결과"
        }

    def merge_tile_results(self, tile_results):
        """타일 결과 병합"""
        return {
            "total_tiles": len(tile_results),
            "merged_analysis": "전체 이미지 분석 결과"
        }

마무리

LangChain과 컴퓨터 비전의 결합은 의료, 제조, 소매 등 다양한 분야에서 혁신을 이끌고 있습니다. 멀티모달 아키텍처와 최신 AI 기술을 접목하면 단순 이미지 인식을 넘어 맥락 이해 기반의 지능형 시스템 구축이 가능합니다.
핵심 포인트:

  • 멀티모달 파이프라인: CLIP, BLIP, YOLO 등 다양한 비전 모델과 LLM의 효과적 통합
  • 실제 적용 사례: 의료 진단, 제조 품질 관리, 소매 재고 추적 등 실무 중심 활용
  • 성능 최적화: 캐싱, 배치 처리, 모델 경량화를 통한 실시간 처리 구현
  • 비용 효율성: 이미지 복잡도 기반 모델 선택으로 처리 비용 최적화
  • 확장 가능성: 3D 처리, 실시간 스트리밍까지 포괄하는 포괄적 솔루션

LangChain의 유연한 아키텍처를 활용하면 컴퓨터 비전 기술을 기존 비즈니스 워크플로우에 자연스럽게 통합하여 지능형 자동화 시스템을 구축할 수 있습니다.

반응형