Skip to content

Instantly share code, notes, and snippets.

@deepaks7n
Last active May 22, 2025 21:42
Show Gist options
  • Select an option

  • Save deepaks7n/9dc18e6c916e123dab6a068b45c53dc2 to your computer and use it in GitHub Desktop.

Select an option

Save deepaks7n/9dc18e6c916e123dab6a068b45c53dc2 to your computer and use it in GitHub Desktop.

PDF Invoice Parser System - Implementation Guide

Project Structure

invoice-parser/
├── src/
│   ├── __init__.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── interfaces.py
│   │   └── exceptions.py
│   ├── parsers/
│   │   ├── __init__.py
│   │   ├── base_parser.py
│   │   ├── text_parser.py
│   │   ├── ocr_parser.py
│   │   └── ai_parser.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── vendor_detector.py
│   │   ├── parser_strategy.py
│   │   ├── cache_service.py
│   │   └── storage_service.py
│   ├── handlers/
│   │   ├── __init__.py
│   │   ├── upload_handler.py
│   │   ├── parsing_handler.py
│   │   └── result_handler.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── app.py
│   │   ├── routes.py
│   │   └── middleware.py
│   └── utils/
│       ├── __init__.py
│       ├── logger.py
│       ├── metrics.py
│       └── validators.py
├── tests/
│   ├── unit/
│   ├── integration/
│   └── fixtures/
├── config/
│   ├── default.yaml
│   ├── development.yaml
│   └── production.yaml
├── infrastructure/
│   ├── terraform/
│   └── docker/
├── requirements.txt
├── Dockerfile
└── README.md

Core Models (src/core/models.py)

from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from typing import Dict, List, Optional, Any
from enum import Enum
import uuid


class ParserType(Enum):
    TEXT = "text"
    OCR = "ocr"
    AI = "ai"
    HYBRID = "hybrid"


class ProcessingStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRY = "retry"


@dataclass
class PDFDocument:
    """Represents a PDF document to be parsed"""
    document_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    file_path: str = ""
    file_size: int = 0
    page_count: int = 0
    metadata: Dict[str, Any] = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    content_hash: Optional[str] = None
    
    def get_page(self, page_num: int) -> 'PDFPage':
        """Retrieve a specific page from the document"""
        pass
    
    def get_text(self, start_page: int = 0, end_page: Optional[int] = None) -> str:
        """Extract text from specified page range"""
        pass


@dataclass
class PDFPage:
    """Represents a single page in a PDF document"""
    page_number: int
    width: float
    height: float
    text_content: str = ""
    images: List['Image'] = field(default_factory=list)
    tables: List['Table'] = field(default_factory=list)
    
    def get_text_blocks(self) -> List['TextBlock']:
        """Extract text blocks with position information"""
        pass


@dataclass
class LineItem:
    """Represents a line item in an invoice"""
    description: str
    quantity: float = 1.0
    unit_price: Decimal = Decimal("0.00")
    total_price: Decimal = Decimal("0.00")
    tax_rate: float = 0.0
    tax_amount: Decimal = Decimal("0.00")
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def calculate_total(self) -> Decimal:
        """Calculate total including tax"""
        base_total = Decimal(str(self.quantity)) * self.unit_price
        tax = base_total * Decimal(str(self.tax_rate))
        return base_total + tax


@dataclass
class ParseResult:
    """Result of parsing an invoice"""
    job_id: str
    vendor_id: Optional[str] = None
    invoice_number: Optional[str] = None
    invoice_date: Optional[datetime] = None
    due_date: Optional[datetime] = None
    total_amount: Decimal = Decimal("0.00")
    subtotal: Decimal = Decimal("0.00")
    tax_amount: Decimal = Decimal("0.00")
    currency: str = "USD"
    line_items: List[LineItem] = field(default_factory=list)
    extracted_fields: Dict[str, Any] = field(default_factory=dict)
    confidence_scores: Dict[str, float] = field(default_factory=dict)
    parser_used: str = ""
    processing_time: float = 0.0
    errors: List[str] = field(default_factory=list)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization"""
        return {
            "job_id": self.job_id,
            "vendor_id": self.vendor_id,
            "invoice_number": self.invoice_number,
            "invoice_date": self.invoice_date.isoformat() if self.invoice_date else None,
            "due_date": self.due_date.isoformat() if self.due_date else None,
            "total_amount": str(self.total_amount),
            "subtotal": str(self.subtotal),
            "tax_amount": str(self.tax_amount),
            "currency": self.currency,
            "line_items": [
                {
                    "description": item.description,
                    "quantity": item.quantity,
                    "unit_price": str(item.unit_price),
                    "total_price": str(item.total_price),
                    "tax_rate": item.tax_rate,
                    "tax_amount": str(item.tax_amount)
                }
                for item in self.line_items
            ],
            "extracted_fields": self.extracted_fields,
            "confidence_scores": self.confidence_scores,
            "parser_used": self.parser_used,
            "processing_time": self.processing_time,
            "errors": self.errors
        }


@dataclass
class Vendor:
    """Vendor configuration"""
    vendor_id: str
    vendor_name: str
    parser_strategy: ParserType = ParserType.TEXT
    parser_config: Dict[str, Any] = field(default_factory=dict)
    field_mappings: Dict[str, str] = field(default_factory=dict)
    validation_rules: List[Dict[str, Any]] = field(default_factory=list)
    confidence_threshold: float = 0.8
    regex_patterns: Dict[str, str] = field(default_factory=dict)
    is_active: bool = True
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)

Interfaces (src/core/interfaces.py)

from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from .models import PDFDocument, ParseResult, Vendor


class InvoiceParser(ABC):
    """Base interface for all invoice parsers"""
    
    @abstractmethod
    def parse(self, document: PDFDocument) -> ParseResult:
        """Parse invoice document and extract data"""
        pass
    
    @abstractmethod
    def validate(self, result: ParseResult) -> bool:
        """Validate parsed results"""
        pass
    
    @abstractmethod
    def get_confidence(self) -> float:
        """Get parser confidence score"""
        pass


class VendorDetectorInterface(ABC):
    """Interface for vendor detection"""
    
    @abstractmethod
    def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]:
        """Detect vendor from document"""
        pass
    
    @abstractmethod
    def update_vendor_patterns(self, vendor: Vendor, patterns: List[str]) -> None:
        """Update vendor detection patterns"""
        pass


class CacheServiceInterface(ABC):
    """Interface for caching service"""
    
    @abstractmethod
    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        pass
    
    @abstractmethod
    async def set(self, key: str, value: Any, ttl: int = 3600) -> None:
        """Set value in cache with TTL"""
        pass
    
    @abstractmethod
    async def delete(self, key: str) -> None:
        """Delete value from cache"""
        pass


class StorageServiceInterface(ABC):
    """Interface for document storage"""
    
    @abstractmethod
    async def upload(self, file_path: str, content: bytes) -> str:
        """Upload document to storage"""
        pass
    
    @abstractmethod
    async def download(self, file_path: str) -> bytes:
        """Download document from storage"""
        pass
    
    @abstractmethod
    async def delete(self, file_path: str) -> None:
        """Delete document from storage"""
        pass

Base Parser (src/parsers/base_parser.py)

from abc import abstractmethod
import time
from typing import Dict, Any, Optional
import structlog

from ..core.interfaces import InvoiceParser
from ..core.models import PDFDocument, ParseResult, ProcessingStatus
from ..core.exceptions import ParsingError, ValidationError
from ..utils.metrics import MetricsCollector


class BaseParser(InvoiceParser):
    """Base implementation for all parsers"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = structlog.get_logger()
        self.metrics = MetricsCollector()
        self._confidence = 0.0
    
    def parse(self, document: PDFDocument) -> ParseResult:
        """Parse document with timing and error handling"""
        start_time = time.time()
        
        try:
            # Log parsing start
            self.logger.info(
                "Starting document parsing",
                parser_type=self.__class__.__name__,
                document_id=document.document_id,
                page_count=document.page_count
            )
            
            # Extract metadata
            metadata = self.extract_metadata(document)
            
            # Perform parsing
            result = self._parse_implementation(document)
            
            # Calculate processing time
            result.processing_time = time.time() - start_time
            
            # Validate result
            if not self.validate(result):
                raise ValidationError("Parsed result failed validation")
            
            # Log success
            self.logger.info(
                "Document parsed successfully",
                document_id=document.document_id,
                processing_time=result.processing_time,
                confidence=self._confidence
            )
            
            # Emit metrics
            self.metrics.increment(
                "documents_parsed",
                tags={"parser": self.__class__.__name__, "status": "success"}
            )
            
            return result
            
        except Exception as e:
            # Log error
            self.logger.error(
                "Document parsing failed",
                document_id=document.document_id,
                error=str(e),
                parser_type=self.__class__.__name__
            )
            
            # Emit error metric
            self.metrics.increment(
                "parsing_errors",
                tags={"parser": self.__class__.__name__, "error_type": type(e).__name__}
            )
            
            # Create error result
            return ParseResult(
                job_id=document.document_id,
                parser_used=self.__class__.__name__,
                processing_time=time.time() - start_time,
                errors=[str(e)]
            )
    
    @abstractmethod
    def _parse_implementation(self, document: PDFDocument) -> ParseResult:
        """Actual parsing implementation to be overridden"""
        pass
    
    def extract_metadata(self, document: PDFDocument) -> Dict[str, Any]:
        """Extract document metadata"""
        return {
            "page_count": document.page_count,
            "file_size": document.file_size,
            "created_at": document.created_at.isoformat(),
            "document_id": document.document_id
        }
    
    def validate(self, result: ParseResult) -> bool:
        """Validate parsed results"""
        # Basic validation
        if not result.invoice_number:
            self.logger.warning("Missing invoice number")
            return False
        
        if result.total_amount <= 0:
            self.logger.warning("Invalid total amount", amount=result.total_amount)
            return False
        
        # Check confidence threshold
        avg_confidence = sum(result.confidence_scores.values()) / len(result.confidence_scores) if result.confidence_scores else 0
        if avg_confidence < self.config.get("confidence_threshold", 0.5):
            self.logger.warning("Low confidence score", confidence=avg_confidence)
            return False
        
        return True
    
    def get_confidence(self) -> float:
        """Get parser confidence score"""
        return self._confidence

Text Parser (src/parsers/text_parser.py)

import re
from decimal import Decimal
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import pdfplumber
import pandas as pd

from .base_parser import BaseParser
from ..core.models import PDFDocument, ParseResult, LineItem
from ..core.exceptions import ParsingError


class TextParser(BaseParser):
    """Parser for digital PDFs with extractable text"""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.extraction_library = config.get("extraction_library", "pdfplumber")
        self.date_formats = config.get("date_formats", [
            "%Y-%m-%d",
            "%d/%m/%Y",
            "%m/%d/%Y",
            "%d.%m.%Y",
            "%B %d, %Y"
        ])
    
    def _parse_implementation(self, document: PDFDocument) -> ParseResult:
        """Extract data from digital PDF"""
        # Extract text and tables
        text, tables = self._extract_content(document.file_path)
        
        # Create result
        result = ParseResult(job_id=document.document_id, parser_used="text")
        
        # Extract fields using regex patterns
        result.invoice_number = self._extract_invoice_number(text)
        result.invoice_date = self._extract_date(text, "invoice date")
        result.due_date = self._extract_date(text, "due date")
        
        # Extract amounts
        result.total_amount = self._extract_amount(text, "total")
        result.subtotal = self._extract_amount(text, "subtotal")
        result.tax_amount = self._extract_amount(text, "tax")
        
        # Extract line items from tables
        if tables:
            result.line_items = self._extract_line_items(tables)
        
        # Calculate confidence scores
        result.confidence_scores = self._calculate_field_confidence(result, text)
        
        # Set overall confidence
        self._confidence = sum(result.confidence_scores.values()) / len(result.confidence_scores)
        
        return result
    
    def _extract_content(self, file_path: str) -> Tuple[str, List[pd.DataFrame]]:
        """Extract text and tables from PDF"""
        full_text = ""
        all_tables = []
        
        try:
            with pdfplumber.open(file_path) as pdf:
                for page in pdf.pages:
                    # Extract text
                    page_text = page.extract_text()
                    if page_text:
                        full_text += page_text + "\n"
                    
                    # Extract tables
                    tables = page.extract_tables()
                    if tables:
                        for table in tables:
                            df = pd.DataFrame(table[1:], columns=table[0])
                            all_tables.append(df)
        
        except Exception as e:
            raise ParsingError(f"Failed to extract content: {str(e)}")
        
        return full_text, all_tables
    
    def _extract_invoice_number(self, text: str) -> Optional[str]:
        """Extract invoice number using regex patterns"""
        patterns = [
            r"Invoice\s*#?\s*:?\s*([A-Z0-9-]+)",
            r"Invoice Number\s*:?\s*([A-Z0-9-]+)",
            r"Bill\s*#?\s*:?\s*([A-Z0-9-]+)",
            r"Reference\s*:?\s*([A-Z0-9-]+)"
        ]
        
        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                return match.group(1)
        
        return None
    
    def _extract_date(self, text: str, date_type: str) -> Optional[datetime]:
        """Extract date from text"""
        # Common date patterns
        date_patterns = [
            rf"{date_type}\s*:?\s*(\d{{1,2}}[-/\.]\d{{1,2}}[-/\.]\d{{2,4}})",
            rf"{date_type}\s*:?\s*(\w+\s+\d{{1,2}},?\s+\d{{4}})",
            rf"{date_type}\s*:?\s*(\d{{4}}[-/]\d{{2}}[-/]\d{{2}})"
        ]
        
        for pattern in date_patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                date_str = match.group(1)
                # Try to parse with different formats
                for fmt in self.date_formats:
                    try:
                        return datetime.strptime(date_str, fmt)
                    except ValueError:
                        continue
        
        return None
    
    def _extract_amount(self, text: str, amount_type: str) -> Decimal:
        """Extract monetary amount from text"""
        # Currency symbols and patterns
        currency_pattern = r"[$£€¥]?\s*([0-9,]+\.?\d{0,2})"
        
        # Search patterns based on amount type
        patterns = [
            rf"{amount_type}\s*:?\s*{currency_pattern}",
            rf"{amount_type}\s+amount\s*:?\s*{currency_pattern}",
            rf"amount\s+{amount_type}\s*:?\s*{currency_pattern}"
        ]
        
        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                # Clean and convert amount
                amount_str = match.group(1).replace(",", "")
                try:
                    return Decimal(amount_str)
                except:
                    continue
        
        return Decimal("0.00")
    
    def _extract_line_items(self, tables: List[pd.DataFrame]) -> List[LineItem]:
        """Extract line items from tables"""
        line_items = []
        
        for df in tables:
            # Try to identify line item table
            if self._is_line_item_table(df):
                for _, row in df.iterrows():
                    item = self._parse_line_item_row(row)
                    if item:
                        line_items.append(item)
        
        return line_items
    
    def _is_line_item_table(self, df: pd.DataFrame) -> bool:
        """Check if table contains line items"""
        # Look for common column headers
        headers = [col.lower() for col in df.columns if col]
        
        required_headers = ["description", "quantity", "price", "amount", "total"]
        matches = sum(1 for req in required_headers if any(req in header for header in headers))
        
        return matches >= 2
    
    def _parse_line_item_row(self, row: pd.Series) -> Optional[LineItem]:
        """Parse a single line item from table row"""
        try:
            # Find columns by partial match
            description = self._find_column_value(row, ["description", "item", "product"])
            quantity = self._find_column_value(row, ["qty", "quantity", "units"], numeric=True)
            unit_price = self._find_column_value(row, ["price", "rate", "unit"], numeric=True)
            total = self._find_column_value(row, ["total", "amount", "subtotal"], numeric=True)
            
            if description and (unit_price or total):
                return LineItem(
                    description=str(description),
                    quantity=float(quantity) if quantity else 1.0,
                    unit_price=Decimal(str(unit_price)) if unit_price else Decimal("0.00"),
                    total_price=Decimal(str(total)) if total else Decimal("0.00")
                )
        
        except Exception as e:
            self.logger.debug(f"Failed to parse line item: {e}")
        
        return None
    
    def _find_column_value(self, row: pd.Series, keywords: List[str], numeric: bool = False):
        """Find value in row by column keywords"""
        for col in row.index:
            if any(keyword in str(col).lower() for keyword in keywords):
                value = row[col]
                if numeric and value:
                    # Clean numeric value
                    value = re.sub(r'[^\d.,]', '', str(value))
                    value = value.replace(',', '')
                    try:
                        return float(value)
                    except:
                        return None
                return value
        return None
    
    def _calculate_field_confidence(self, result: ParseResult, text: str) -> Dict[str, float]:
        """Calculate confidence scores for extracted fields"""
        scores = {}
        
        # Invoice number confidence
        if result.invoice_number:
            scores["invoice_number"] = 0.9 if len(result.invoice_number) > 3 else 0.7
        else:
            scores["invoice_number"] = 0.0
        
        # Date confidence
        scores["invoice_date"] = 0.9 if result.invoice_date else 0.0
        scores["due_date"] = 0.8 if result.due_date else 0.3  # Optional field
        
        # Amount confidence based on consistency
        if result.line_items:
            calculated_total = sum(item.total_price for item in result.line_items)
            difference = abs(calculated_total - result.subtotal) / result.subtotal if result.subtotal > 0 else 1
            scores["amounts"] = 0.9 if difference < 0.02 else 0.7
        else:
            scores["amounts"] = 0.6 if result.total_amount > 0 else 0.0
        
        return scores

OCR Parser (src/parsers/ocr_parser.py)

import cv2
import numpy as np
from PIL import Image
import pytesseract
import pdf2image
from typing import List, Tuple, Optional

from .base_parser import BaseParser
from ..core.models import PDFDocument, ParseResult
from ..core.exceptions import ParsingError


class OCRParser(BaseParser):
    """Parser for scanned PDFs using OCR"""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.ocr_engine = config.get("ocr_engine", "tesseract")
        self.language = config.get("language", "eng")
        self.dpi = config.get("dpi", 300)
        self.preprocessing_steps = config.get("preprocessing_steps", [
            "denoise",
            "deskew",
            "contrast"
        ])
    
    def _parse_implementation(self, document: PDFDocument) -> ParseResult:
        """Extract data from scanned PDF using OCR"""
        # Convert PDF to images
        images = self._pdf_to_images(document.file_path)
        
        # Process each page
        full_text = ""
        for i, image in enumerate(images):
            # Preprocess image
            processed_image = self._preprocess_image(image)
            
            # Perform OCR
            page_text = self._perform_ocr(processed_image)
            full_text += f"\n--- Page {i+1} ---\n{page_text}"
        
        # Use text parser logic on OCR output
        text_parser = TextParser(self.config)
        result = text_parser._parse_implementation(
            PDFDocument(
                document_id=document.document_id,
                file_path="",  # Not used
                page_count=len(images)
            )
        )
        
        # Override with OCR-specific processing
        result.parser_used = "ocr"
        result = self._enhance_ocr_results(result, full_text, images)
        
        return result
    
    def _pdf_to_images(self, pdf_path: str) -> List[Image.Image]:
        """Convert PDF pages to images"""
        try:
            images = pdf2image.convert_from_path(
                pdf_path,
                dpi=self.dpi,
                fmt='PNG',
                thread_count=4
            )
            return images
        except Exception as e:
            raise ParsingError(f"Failed to convert PDF to images: {str(e)}")
    
    def _preprocess_image(self, image: Image.Image) -> np.ndarray:
        """Preprocess image for better OCR results"""
        # Convert PIL image to OpenCV format
        img_array = np.array(image)
        
        # Convert to grayscale
        if len(img_array.shape) == 3:
            gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
        else:
            gray = img_array
        
        # Apply preprocessing steps
        processed = gray
        
        if "denoise" in self.preprocessing_steps:
            processed = cv2.fastNlMeansDenoising(processed)
        
        if "deskew" in self.preprocessing_steps:
            processed = self._deskew_image(processed)
        
        if "contrast" in self.preprocessing_steps:
            processed = self._enhance_contrast(processed)
        
        if "threshold" in self.preprocessing_steps:
            _, processed = cv2.threshold(
                processed, 0, 255, 
                cv2.THRESH_BINARY + cv2.THRESH_OTSU
            )
        
        return processed
    
    def _deskew_image(self, image: np.ndarray) -> np.ndarray:
        """Correct image skew"""
        # Find all text regions
        coords = np.column_stack(np.where(image > 0))
        
        # Calculate rotation angle
        angle = cv2.minAreaRect(coords)[-1]
        if angle < -45:
            angle = 90 + angle
        
        # Rotate image
        (h, w) = image.shape[:2]
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        rotated = cv2.warpAffine(
            image, M, (w, h),
            flags=cv2.INTER_CUBIC,
            borderMode=cv2.BORDER_REPLICATE
        )
        
        return rotated
    
    def _enhance_contrast(self, image: np.ndarray) -> np.ndarray:
        """Enhance image contrast"""
        # Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        enhanced = clahe.apply(image)
        return enhanced
    
    def _perform_ocr(self, image: np.ndarray) -> str:
        """Perform OCR on preprocessed image"""
        if self.ocr_engine == "tesseract":
            # Configure Tesseract
            custom_config = r'--oem 3 --psm 6'
            
            # Perform OCR
            text = pytesseract.image_to_string(
                image,
                lang=self.language,
                config=custom_config
            )
            
            # Also get detailed data for confidence scoring
            data = pytesseract.image_to_data(
                image,
                lang=self.language,
                output_type=pytesseract.Output.DICT
            )
            
            # Calculate average confidence
            confidences = [int(conf) for conf in data['conf'] if int(conf) > 0]
            self._confidence = sum(confidences) / len(confidences) / 100 if confidences else 0.0
            
            return text
        
        else:
            # Placeholder for other OCR engines (Google Vision, AWS Textract)
            raise NotImplementedError(f"OCR engine {self.ocr_engine} not implemented")
    
    def _enhance_ocr_results(self, result: ParseResult, full_text: str, images: List[Image.Image]) -> ParseResult:
        """Enhance OCR results with additional processing"""
        # If confidence is low, try table detection
        if self._confidence < 0.7 and not result.line_items:
            for image in images:
                tables = self._detect_tables(np.array(image))
                if tables:
                    # Process detected tables
                    result.line_items.extend(self._process_table_regions(tables, image))
        
        # Update confidence scores
        result.confidence_scores["ocr_quality"] = self._confidence
        
        return result
    
    def _detect_tables(self, image: np.ndarray) -> List[Tuple[int, int, int, int]]:
        """Detect table regions in image"""
        # Convert to grayscale if needed
        if len(image.shape) == 3:
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        else:
            gray = image
        
        # Detect horizontal and vertical lines
        horizontal = self._detect_lines(gray, horizontal=True)
        vertical = self._detect_lines(gray, horizontal=False)
        
        # Combine lines to find table regions
        table_mask = cv2.addWeighted(horizontal, 0.5, vertical, 0.5, 0.0)
        
        # Find contours
        contours, _ = cv2.findContours(
            table_mask,
            cv2.RETR_EXTERNAL,
            cv2.CHAIN_APPROX_SIMPLE
        )
        
        # Filter and return table regions
        tables = []
        for contour in contours:
            x, y, w, h = cv2.boundingRect(contour)
            if w > 100 and h > 50:  # Minimum table size
                tables.append((x, y, w, h))
        
        return tables
    
    def _detect_lines(self, image: np.ndarray, horizontal: bool = True) -> np.ndarray:
        """Detect horizontal or vertical lines in image"""
        # Create structure element for morphology
        if horizontal:
            kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1))
        else:
            kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40))
        
        # Apply morphology operations
        binary = cv2.adaptiveThreshold(
            image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
            cv2.THRESH_BINARY_INV, 11, 2
        )
        
        lines = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
        return lines
    
    def _process_table_regions(self, tables: List[Tuple[int, int, int, int]], 
                             image: Image.Image) -> List[LineItem]:
        """Process detected table regions to extract line items"""
        line_items = []
        
        for x, y, w, h in tables:
            # Crop table region
            table_img = image.crop((x, y, x + w, y + h))
            
            # Perform OCR on table region
            table_text = pytesseract.image_to_string(table_img)
            
            # Parse table text
            # This is simplified - in production, use more sophisticated parsing
            lines = table_text.strip().split('\n')
            for line in lines:
                if line and not any(header in line.lower() for header in ['description', 'quantity', 'price']):
                    # Basic parsing - extract numbers and text
                    parts = line.split()
                    if len(parts) >= 2:
                        description = ' '.join(parts[:-2])
                        try:
                            amount = Decimal(parts[-1].replace(',', ''))
                            if description and amount > 0:
                                line_items.append(LineItem(
                                    description=description,
                                    total_price=amount
                                ))
                        except:
                            continue
        
        return line_items

AI Parser (src/parsers/ai_parser.py)

import json
import asyncio
from typing import Dict, Any, List, Optional
from decimal import Decimal
from datetime import datetime

from .base_parser import BaseParser
from ..core.models import PDFDocument, ParseResult, LineItem
from ..core.exceptions import ParsingError


class AIParser(BaseParser):
    """AI-powered parser using LLMs"""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.model = config.get("model", "gpt-4")
        self.temperature = config.get("temperature", 0.1)
        self.max_tokens = config.get("max_tokens", 4000)
        self.retry_attempts = config.get("retry_attempts", 3)
        self.llm_client = self._initialize_client()
    
    def _initialize_client(self):
        """Initialize LLM client based on model"""
        if "gpt" in self.model:
            from openai import OpenAI
            return OpenAI(api_key=self.config.get("api_key"))
        elif "claude" in self.model:
            from anthropic import Anthropic
            return Anthropic(api_key=self.config.get("api_key"))
        else:
            raise ValueError(f"Unsupported model: {self.model}")
    
    def _parse_implementation(self, document: PDFDocument) -> ParseResult:
        """Extract data using AI/LLM"""
        # Extract text for context
        text = self._get_document_text(document)
        
        # Generate extraction prompt
        prompt = self._generate_extraction_prompt(text)
        
        # Call LLM with retry logic
        for attempt in range(self.retry_attempts):
            try:
                response = self._call_llm(prompt)
                extracted_data = self._parse_llm_response(response)
                
                # Build result
                result = self._build_parse_result(extracted_data, document)
                
                # Validate extraction
                if self.validate(result):
                    return result
                
            except Exception as e:
                self.logger.warning(f"AI parsing attempt {attempt + 1} failed: {e}")
                if attempt == self.retry_attempts - 1:
                    raise ParsingError(f"AI parsing failed after {self.retry_attempts} attempts")
        
        raise ParsingError("Failed to extract valid data")
    
    def _get_document_text(self, document: PDFDocument) -> str:
        """Extract text from document for AI processing"""
        try:
            # Try text extraction first
            text = document.get_text()
            if text and len(text) > 100:
                return text[:self.max_tokens]  # Limit for token constraints
        except:
            pass
        
        # Fallback to OCR if needed
        from .ocr_parser import OCRParser
        ocr_parser = OCRParser(self.config)
        images = ocr_parser._pdf_to_images(document.file_path)
        
        text = ""
        for image in images[:3]:  # Process first 3 pages only
            processed = ocr_parser._preprocess_image(image)
            page_text = ocr_parser._perform_ocr(processed)
            text += page_text + "\n"
            
            if len(text) > self.max_tokens:
                break
        
        return text[:self.max_tokens]
    
    def _generate_extraction_prompt(self, text: str) -> str:
        """Generate prompt for invoice data extraction"""
        prompt = f"""Extract invoice information from the following text and return as JSON.

Invoice Text:
{text}

Extract the following information:
1. vendor_name: Name of the company issuing the invoice
2. vendor_id: Any vendor ID or registration number
3. invoice_number: Invoice number or reference
4. invoice_date: Date of invoice (ISO format: YYYY-MM-DD)
5. due_date: Payment due date (ISO format: YYYY-MM-DD)
6. currency: Currency code (e.g., USD, EUR, GBP)
7. subtotal: Subtotal amount before tax
8. tax_amount: Tax amount
9. total_amount: Total amount including tax
10. line_items: Array of items with:
    - description: Item description
    - quantity: Quantity (default 1 if not specified)
    - unit_price: Price per unit
    - total_price: Total price for this line
    - tax_rate: Tax rate if specified

Return ONLY valid JSON without any explanation or markdown formatting.

Example response:
{{
    "vendor_name": "ABC Company Ltd",
    "invoice_number": "INV-2024-001",
    "invoice_date": "2024-01-15",
    "total_amount": 1250.00,
    "line_items": [
        {{
            "description": "Professional Services",
            "quantity": 1,
            "unit_price": 1000.00,
            "total_price": 1000.00
        }}
    ]
}}
"""
        return prompt
    
    def _call_llm(self, prompt: str) -> str:
        """Call LLM API"""
        if "gpt" in self.model:
            response = self.llm_client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are an invoice data extraction assistant. Extract data accurately and return only JSON."},
                    {"role": "user", "content": prompt}
                ],
                temperature=self.temperature,
                response_format={"type": "json_object"}
            )
            return response.choices[0].message.content
        
        elif "claude" in self.model:
            response = self.llm_client.messages.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=self.temperature,
                max_tokens=self.max_tokens
            )
            return response.content[0].text
        
        else:
            raise NotImplementedError(f"Model {self.model} not implemented")
    
    def _parse_llm_response(self, response: str) -> Dict[str, Any]:
        """Parse LLM response to extract JSON data"""
        try:
            # Clean response - remove markdown if present
            cleaned = response.strip()
            if cleaned.startswith("```json"):
                cleaned = cleaned[7:]
            if cleaned.startswith("```"):
                cleaned = cleaned[3:]
            if cleaned.endswith("```"):
                cleaned = cleaned[:-3]
            
            # Parse JSON
            data = json.loads(cleaned.strip())
            return data
        
        except json.JSONDecodeError as e:
            self.logger.error(f"Failed to parse LLM response as JSON: {e}")
            self.logger.debug(f"Response: {response}")
            raise ParsingError("Invalid JSON response from LLM")
    
    def _build_parse_result(self, data: Dict[str, Any], document: PDFDocument) -> ParseResult:
        """Build ParseResult from extracted data"""
        result = ParseResult(
            job_id=document.document_id,
            parser_used="ai"
        )
        
        # Map extracted fields
        result.vendor_id = data.get("vendor_id") or data.get("vendor_name")
        result.invoice_number = data.get("invoice_number")
        result.currency = data.get("currency", "USD")
        
        # Parse dates
        if data.get("invoice_date"):
            result.invoice_date = self._parse_date(data["invoice_date"])
        if data.get("due_date"):
            result.due_date = self._parse_date(data["due_date"])
        
        # Parse amounts
        result.total_amount = Decimal(str(data.get("total_amount", 0)))
        result.subtotal = Decimal(str(data.get("subtotal", 0)))
        result.tax_amount = Decimal(str(data.get("tax_amount", 0)))
        
        # Parse line items
        result.line_items = self._parse_line_items(data.get("line_items", []))
        
        # Store all extracted fields
        result.extracted_fields = data
        
        # Calculate confidence based on field completeness
        self._calculate_ai_confidence(result, data)
        
        return result
    
    def _parse_date(self, date_str: str) -> Optional[datetime]:
        """Parse date string to datetime"""
        if not date_str:
            return None
        
        # Try common formats
        formats = ["%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%Y/%m/%d"]
        
        for fmt in formats:
            try:
                return datetime.strptime(date_str, fmt)
            except ValueError:
                continue
        
        # Try parsing with dateutil as fallback
        try:
            from dateutil import parser
            return parser.parse(date_str)
        except:
            self.logger.warning(f"Failed to parse date: {date_str}")
            return None
    
    def _parse_line_items(self, items_data: List[Dict[str, Any]]) -> List[LineItem]:
        """Parse line items from extracted data"""
        line_items = []
        
        for item in items_data:
            try:
                line_item = LineItem(
                    description=item.get("description", ""),
                    quantity=float(item.get("quantity", 1)),
                    unit_price=Decimal(str(item.get("unit_price", 0))),
                    total_price=Decimal(str(item.get("total_price", 0))),
                    tax_rate=float(item.get("tax_rate", 0))
                )
                
                # Calculate total if not provided
                if line_item.total_price == 0 and line_item.unit_price > 0:
                    line_item.total_price = line_item.calculate_total()
                
                line_items.append(line_item)
            
            except Exception as e:
                self.logger.warning(f"Failed to parse line item: {e}")
                continue
        
        return line_items
    
    def _calculate_ai_confidence(self, result: ParseResult, data: Dict[str, Any]):
        """Calculate confidence scores for AI extraction"""
        required_fields = ["invoice_number", "invoice_date", "total_amount"]
        optional_fields = ["vendor_id", "due_date", "line_items", "currency"]
        
        # Check required fields
        required_score = sum(1 for field in required_fields if data.get(field)) / len(required_fields)
        
        # Check optional fields
        optional_score = sum(1 for field in optional_fields if data.get(field)) / len(optional_fields)
        
        # Overall confidence
        self._confidence = (required_score * 0.7) + (optional_score * 0.3)
        
        # Field-specific confidence
        result.confidence_scores = {
            "invoice_number": 0.9 if result.invoice_number else 0.0,
            "dates": 0.9 if result.invoice_date else 0.0,
            "amounts": 0.9 if result.total_amount > 0 else 0.0,
            "line_items": 0.8 if result.line_items else 0.3,
            "overall": self._confidence
        }

Vendor Detection Service (src/services/vendor_detector.py)

import re
import hashlib
from typing import Optional, List, Dict, Any
import pickle
from dataclasses import asdict

from ..core.interfaces import VendorDetectorInterface, CacheServiceInterface
from ..core.models import PDFDocument, Vendor, ParserType
from ..core.exceptions import VendorDetectionError


class VendorDetector(VendorDetectorInterface):
    """Service for detecting vendors from PDF documents"""
    
    def __init__(self, cache_service: CacheServiceInterface, 
                 vendor_repository, ai_classifier=None):
        self.cache = cache_service
        self.vendor_repo = vendor_repository
        self.ai_classifier = ai_classifier
        self._vendor_patterns = {}
        self._load_vendor_patterns()
    
    def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]:
        """Detect vendor from document"""
        # Generate document hash for caching
        doc_hash = self._generate_document_hash(document)
        
        # Check cache
        cached_vendor = asyncio.run(self._check_cache(doc_hash))
        if cached_vendor:
            return cached_vendor
        
        # Extract identifying text (first 2000 chars)
        try:
            text = document.get_text(end_page=2)[:2000]
        except:
            # If text extraction fails, return None for AI parsing
            return None
        
        # Try pattern matching
        vendor = self._match_vendor_patterns(text)
        if vendor:
            asyncio.run(self._update_cache(doc_hash, vendor))
            return vendor
        
        # Try fuzzy matching on vendor names
        vendor = self._fuzzy_match_vendor(text)
        if vendor:
            asyncio.run(self._update_cache(doc_hash, vendor))
            return vendor
        
        # Use AI classifier if available
        if self.ai_classifier:
            vendor = self._classify_with_ai(text)
            if vendor:
                asyncio.run(self._update_cache(doc_hash, vendor))
                return vendor
        
        # Return None if no vendor detected
        return None
    
    def update_vendor_patterns(self, vendor: Vendor, patterns: List[str]) -> None:
        """Update vendor detection patterns"""
        vendor.regex_patterns["detection"] = patterns
        self.vendor_repo.update(vendor)
        self._load_vendor_patterns()  # Reload patterns
    
    def _load_vendor_patterns(self):
        """Load all vendor patterns from repository"""
        vendors = self.vendor_repo.get_all_active()
        
        self._vendor_patterns = {}
        for vendor in vendors:
            if vendor.regex_patterns.get("detection"):
                self._vendor_patterns[vendor.vendor_id] = {
                    "patterns": vendor.regex_patterns["detection"],
                    "vendor": vendor
                }
    
    def _generate_document_hash(self, document: PDFDocument) -> str:
        """Generate hash for document caching"""
        # Use file size and first page content for hash
        try:
            content = f"{document.file_size}_{document.get_text(end_page=1)[:500]}"
            return hashlib.sha256(content.encode()).hexdigest()
        except:
            return hashlib.sha256(f"{document.document_id}".encode()).hexdigest()
    
    async def _check_cache(self, doc_hash: str) -> Optional[Vendor]:
        """Check cache for vendor detection result"""
        cached = await self.cache.get(f"vendor_detection:{doc_hash}")
        if cached:
            return Vendor(**cached)
        return None
    
    async def _update_cache(self, doc_hash: str, vendor: Vendor):
        """Update cache with vendor detection result"""
        await self.cache.set(
            f"vendor_detection:{doc_hash}",
            asdict(vendor),
            ttl=3600  # 1 hour
        )
    
    def _match_vendor_patterns(self, text: str) -> Optional[Vendor]:
        """Match text against vendor patterns"""
        for vendor_id, pattern_data in self._vendor_patterns.items():
            for pattern in pattern_data["patterns"]:
                try:
                    if re.search(pattern, text, re.IGNORECASE | re.MULTILINE):
                        self.logger.info(f"Vendor detected by pattern: {vendor_id}")
                        return pattern_data["vendor"]
                except re.error:
                    self.logger.warning(f"Invalid regex pattern for vendor {vendor_id}: {pattern}")
        
        return None
    
    def _fuzzy_match_vendor(self, text: str) -> Optional[Vendor]:
        """Fuzzy match vendor names in text"""
        from fuzzywuzzy import fuzz
        
        vendors = self.vendor_repo.get_all_active()
        best_match = None
        best_score = 0
        
        # Extract potential company names from text
        company_patterns = [
            r"^([A-Z][A-Za-z\s&,.\-]+(?:Inc|LLC|Ltd|Limited|Corp|Corporation|Company|Co)\.?)",
            r"From:\s*([A-Za-z\s&,.\-]+)",
            r"Bill To:.*?From:\s*([A-Za-z\s&,.\-]+)",
        ]
        
        potential_names = []
        for pattern in company_patterns:
            matches = re.findall(pattern, text, re.MULTILINE)
            potential_names.extend(matches)
        
        # Match against known vendors
        for vendor in vendors:
            for name in potential_names:
                score = fuzz.ratio(vendor.vendor_name.lower(), name.lower())
                if score > best_score and score > 80:  # 80% threshold
                    best_score = score
                    best_match = vendor
        
        if best_match:
            self.logger.info(f"Vendor detected by fuzzy match: {best_match.vendor_id} (score: {best_score})")
        
        return best_match
    
    def _classify_with_ai(self, text: str) -> Optional[Vendor]:
        """Use AI to classify vendor"""
        if not self.ai_classifier:
            return None
        
        try:
            # Get vendor classification from AI
            result = self.ai_classifier.classify(text)
            
            if result.get("vendor_id"):
                vendor = self.vendor_repo.get(result["vendor_id"])
                if vendor:
                    self.logger.info(f"Vendor detected by AI: {vendor.vendor_id}")
                    return vendor
            
            # If AI suggests a new vendor
            if result.get("vendor_name") and result.get("confidence", 0) > 0.8:
                # Create new vendor entry
                new_vendor = Vendor(
                    vendor_id=self._generate_vendor_id(result["vendor_name"]),
                    vendor_name=result["vendor_name"],
                    parser_strategy=ParserType.AI,  # Default to AI for new vendors
                    confidence_threshold=0.7
                )
                
                # Save new vendor
                self.vendor_repo.create(new_vendor)
                self.logger.info(f"New vendor created by AI: {new_vendor.vendor_id}")
                
                return new_vendor
        
        except Exception as e:
            self.logger.error(f"AI classification failed: {e}")
        
        return None
    
    def _generate_vendor_id(self, vendor_name: str) -> str:
        """Generate vendor ID from name"""
        # Clean and format vendor name
        clean_name = re.sub(r'[^a-zA-Z0-9\s]', '', vendor_name)
        words = clean_name.upper().split()
        
        # Generate ID (e.g., "ABC_COMPANY_LTD")
        return '_'.join(words)

Parser Strategy (src/services/parser_strategy.py)

from typing import Dict, Optional
import time

from ..core.models import PDFDocument, Vendor, ParserType
from ..core.interfaces import InvoiceParser
from ..parsers.text_parser import TextParser
from ..parsers.ocr_parser import OCRParser
from ..parsers.ai_parser import AIParser


class ParserStrategy:
    """Strategy pattern implementation for parser selection"""
    
    def __init__(self, parser_registry=None):
        self.parsers: Dict[str, InvoiceParser] = {}
        self.parser_registry = parser_registry
        self.quality_threshold = {
            "text": 0.8,
            "ocr": 0.6,
            "ai": 0.5
        }
    
    def select_parser(self, document: PDFDocument, vendor: Optional[Vendor]) -> InvoiceParser:
        """Select appropriate parser based on document and vendor"""
        # If vendor specified with preference, use it
        if vendor and vendor.parser_strategy != ParserType.AI:
            return self._get_parser_for_vendor(vendor)
        
        # Assess document quality
        quality = self._assess_document_quality(document)
        
        # Select parser based on quality
        if quality.is_digital and quality.text_quality > self.quality_threshold["text"]:
            parser_type = ParserType.TEXT
        elif quality.is_scanned and quality.scan_quality > self.quality_threshold["ocr"]:
            parser_type = ParserType.OCR
        else:
            parser_type = ParserType.AI
        
        # Override with vendor preference if specified
        if vendor and vendor.parser_strategy:
            parser_type = vendor.parser_strategy
        
        # Get or create parser
        return self._get_or_create_parser(parser_type, vendor)
    
    def register_parser(self, parser_key: str, parser: InvoiceParser):
        """Register a parser instance"""
        self.parsers[parser_key] = parser
    
    def _get_parser_for_vendor(self, vendor: Vendor) -> InvoiceParser:
        """Get parser configured for specific vendor"""
        parser_key = f"{vendor.vendor_id}_{vendor.parser_strategy.value}"
        
        if parser_key not in self.parsers:
            # Create vendor-specific parser
            config = vendor.parser_config.copy()
            config["vendor"] = vendor
            
            parser = self._create_parser(vendor.parser_strategy, config)
            self.parsers[parser_key] = parser
        
        return self.parsers[parser_key]
    
    def _get_or_create_parser(self, parser_type: ParserType, 
                            vendor: Optional[Vendor]) -> InvoiceParser:
        """Get or create parser instance"""
        # Generate cache key
        parser_key = f"{vendor.vendor_id if vendor else 'default'}_{parser_type.value}"
        
        if parser_key not in self.parsers:
            # Prepare configuration
            config = {}
            if vendor:
                config.update(vendor.parser_config)
                config["vendor"] = vendor
            
            # Create parser
            parser = self._create_parser(parser_type, config)
            self.parsers[parser_key] = parser
        
        return self.parsers[parser_key]
    
    def _create_parser(self, parser_type: ParserType, config: Dict) -> InvoiceParser:
        """Create parser instance"""
        if parser_type == ParserType.TEXT:
            return TextParser(config)
        elif parser_type == ParserType.OCR:
            return OCRParser(config)
        elif parser_type == ParserType.AI:
            return AIParser(config)
        else:
            raise ValueError(f"Unknown parser type: {parser_type}")
    
    def _assess_document_quality(self, document: PDFDocument) -> 'DocumentQuality':
        """Assess document quality for parser selection"""
        quality = DocumentQuality()
        
        try:
            # Try to extract text
            sample_text = document.get_text(end_page=1)
            
            if sample_text:
                # Analyze text quality
                quality.is_digital = True
                quality.text_quality = self._calculate_text_quality(sample_text)
            else:
                quality.is_digital = False
                quality.is_scanned = True
        
        except Exception as e:
            self.logger.debug(f"Failed to extract text: {e}")
            quality.is_scanned = True
        
        # Check if document has images (likely scanned)
        if document.metadata.get("has_images", False):
            quality.is_scanned = True
            quality.scan_quality = self._estimate_scan_quality(document)
        
        return quality
    
    def _calculate_text_quality(self, text: str) -> float:
        """Calculate text extraction quality score"""
        if not text:
            return 0.0
        
        # Check for common indicators of good text extraction
        scores = []
        
        # Has reasonable length
        scores.append(min(len(text) / 1000, 1.0))  # Normalize to 1000 chars
        
        # Has alphanumeric content
        alnum_ratio = sum(c.isalnum() for c in text) / len(text)
        scores.append(alnum_ratio)
        
        # Has proper spacing
        space_ratio = text.count(' ') / len(text)
        scores.append(min(space_ratio * 10, 1.0))  # Expect ~10% spaces
        
        # Has newlines (structure)
        newline_ratio = text.count('\n') / len(text)
        scores.append(min(newline_ratio * 100, 1.0))  # Expect ~1% newlines
        
        # Check for garbled text indicators
        garbled_indicators = ['�', '\x00', '\xff']
        garbled_score = 1.0 - sum(text.count(ind) for ind in garbled_indicators) / len(text)
        scores.append(garbled_score)
        
        return sum(scores) / len(scores)
    
    def _estimate_scan_quality(self, document: PDFDocument) -> float:
        """Estimate scan quality from document metadata"""
        # This is a simplified estimation
        # In production, analyze image resolution, contrast, etc.
        
        quality_score = 0.7  # Default medium quality
        
        # Check resolution if available
        if document.metadata.get("dpi"):
            dpi = document.metadata["dpi"]
            if dpi >= 300:
                quality_score = 0.9
            elif dpi >= 200:
                quality_score = 0.7
            else:
                quality_score = 0.5
        
        return quality_score


class DocumentQuality:
    """Document quality assessment results"""
    def __init__(self):
        self.is_digital = False
        self.is_scanned = False
        self.text_quality = 0.0
        self.scan_quality = 0.0
        self.has_tables = False
        self.has_forms = False

Lambda Handlers (src/handlers/parsing_handler.py)

import json
import os
import boto3
from typing import Dict, Any
import asyncio

from ..core.models import PDFDocument, ProcessingStatus
from ..services.vendor_detector import VendorDetector
from ..services.parser_strategy import ParserStrategy
from ..services.cache_service import CacheService
from ..services.storage_service import S3StorageService
from ..utils.logger import get_logger


# Initialize services outside handler for reuse
logger = get_logger()
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')

# Initialize services
cache_service = CacheService()
storage_service = S3StorageService(s3_client)
vendor_detector = VendorDetector(cache_service, vendor_repository=None)
parser_strategy = ParserStrategy()


def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """Main Lambda handler for invoice parsing"""
    
    # Handle warmup calls
    if event.get('source') == 'serverless-plugin-warmup':
        logger.info('WarmUp - Lambda is warm!')
        return {'statusCode': 200, 'body': 'Lambda is warm!'}
    
    try:
        # Parse event
        if 'Records' in event:
            # SQS event
            return handle_sqs_event(event, context)
        else:
            # Direct invocation
            return handle_direct_invocation(event, context)
    
    except Exception as e:
        logger.error(f"Lambda handler error: {e}", exc_info=True)
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }


def handle_sqs_event(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """Handle SQS event processing"""
    results = []
    
    for record in event['Records']:
        try:
            # Parse message
            message = json.loads(record['body'])
            job_id = message['job_id']
            document_path = message['document_path']
            
            # Process document
            result = asyncio.run(process_document(job_id, document_path))
            results.append({
                'job_id': job_id,
                'status': 'success',
                'result': result
            })
            
        except Exception as e:
            logger.error(f"Failed to process record: {e}", exc_info=True)
            results.append({
                'job_id': message.get('job_id', 'unknown'),
                'status': 'failed',
                'error': str(e)
            })
    
    return {
        'statusCode': 200,
        'body': json.dumps({'processed': len(results), 'results': results})
    }


def handle_direct_invocation(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """Handle direct Lambda invocation"""
    job_id = event.get('job_id')
    document_path = event.get('document_path')
    
    if not job_id or not document_path:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Missing required parameters: job_id, document_path'})
        }
    
    # Process document
    result = asyncio.run(process_document(job_id, document_path))
    
    return {
        'statusCode': 200,
        'body': json.dumps(result.to_dict())
    }


async def process_document(job_id: str, document_path: str):
    """Process a single document"""
    logger.info(f"Processing document: {job_id}")
    
    # Update job status
    await update_job_status(job_id, ProcessingStatus.PROCESSING)
    
    try:
        # Download document from S3
        local_path = f"/tmp/{job_id}.pdf"
        await storage_service.download_to_file(document_path, local_path)
        
        # Create document object
        document = PDFDocument(
            document_id=job_id,
            file_path=local_path
        )
        
        # Detect vendor
        vendor = vendor_detector.detect_vendor(document)
        if vendor:
            logger.info(f"Vendor detected: {vendor.vendor_id}")
        
        # Select and execute parser
        parser = parser_strategy.select_parser(document, vendor)
        result = parser.parse(document)
        
        # Store result
        await store_parse_result(result)
        
        # Update job status
        await update_job_status(job_id, ProcessingStatus.COMPLETED)
        
        # Cleanup
        os.remove(local_path)
        
        return result
        
    except Exception as e:
        logger.error(f"Document processing failed: {e}", exc_info=True)
        await update_job_status(job_id, ProcessingStatus.FAILED, error=str(e))
        raise


async def update_job_status(job_id: str, status: ProcessingStatus, error: str = None):
    """Update job status in DynamoDB"""
    table = dynamodb.Table(os.environ.get('JOBS_TABLE', 'invoice-parser-jobs'))
    
    update_expr = "SET #status = :status, updated_at = :timestamp"
    expr_values = {
        ':status': status.value,
        ':timestamp': datetime.utcnow().isoformat()
    }
    
    if error:
        update_expr += ", error_message = :error"
        expr_values[':error'] = error
    
    table.update_item(
        Key={'job_id': job_id},
        UpdateExpression=update_expr,
        ExpressionAttributeNames={'#status': 'status'},
        ExpressionAttributeValues=expr_values
    )


async def store_parse_result(result: ParseResult):
    """Store parsing result in DynamoDB"""
    table = dynamodb.Table(os.environ.get('RESULTS_TABLE', 'invoice-parser-results'))
    
    table.put_item(
        Item=result.to_dict()
    )

API Implementation (src/api/app.py)

from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from typing import Optional
import uuid

from .routes import router
from .middleware import SecurityMiddleware, LoggingMiddleware
from ..core.models import ProcessingStatus
from ..services.queue_service import QueueService


# Create FastAPI app
app = FastAPI(
    title="PDF Invoice Parser API",
    description="Scalable PDF invoice parsing system",
    version="1.0.0"
)

# Add middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.add_middleware(LoggingMiddleware)
app.add_middleware(SecurityMiddleware)

# Include routers
app.include_router(router, prefix="/api/v1")

# Initialize services
queue_service = QueueService()


@app.post("/api/v1/invoices/upload")
async def upload_invoice(
    background_tasks: BackgroundTasks,
    file: UploadFile = File(...),
    vendor_id: Optional[str] = None
):
    """Upload invoice for parsing"""
    
    # Validate file
    if not file.filename.lower().endswith('.pdf'):
        raise HTTPException(400, "Only PDF files are supported")
    
    if file.size > 50 * 1024 * 1024:  # 50MB limit
        raise HTTPException(400, "File size exceeds 50MB limit")
    
    # Generate job ID
    job_id = str(uuid.uuid4())
    
    # Upload to S3
    s3_path = f"invoices/{job_id}/{file.filename}"
    try:
        content = await file.read()
        await storage_service.upload(s3_path, content)
    except Exception as e:
        raise HTTPException(500, f"Failed to upload file: {str(e)}")
    
    # Create job record
    job_data = {
        "job_id": job_id,
        "status": ProcessingStatus.PENDING.value,
        "vendor_id": vendor_id,
        "document_url": s3_path,
        "created_at": datetime.utcnow().isoformat(),
        "file_name": file.filename,
        "file_size": file.size
    }
    
    # Store job in database
    await create_job_record(job_data)
    
    # Queue for processing
    message = {
        "job_id": job_id,
        "document_path": s3_path,
        "vendor_id": vendor_id
    }
    
    await queue_service.send_message(message)
    
    return {
        "job_id": job_id,
        "status": ProcessingStatus.PENDING.value,
        "message": "Invoice uploaded successfully and queued for processing",
        "estimated_time": 30  # seconds
    }


@app.get("/api/v1/invoices/{job_id}/status")
async def get_job_status(job_id: str):
    """Get parsing job status"""
    
    # Fetch job from database
    job = await get_job_record(job_id)
    
    if not job:
        raise HTTPException(404, "Job not found")
    
    return {
        "job_id": job_id,
        "status": job["status"],
        "created_at": job["created_at"],
        "updated_at": job.get("updated_at"),
        "error_message": job.get("error_message")
    }


@app.get("/api/v1/invoices/{job_id}/result")
async def get_parse_result(job_id: str):
    """Get parsed invoice data"""
    
    # Check job status first
    job = await get_job_record(job_id)
    
    if not job:
        raise HTTPException(404, "Job not found")
    
    if job["status"] != ProcessingStatus.COMPLETED.value:
        raise HTTPException(400, f"Job status is {job['status']}, not completed")
    
    # Fetch result
    result = await get_parse_result_record(job_id)
    
    if not result:
        raise HTTPException(404, "Parse result not found")
    
    return result


@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)


## Cache Service (src/services/cache_service.py)

import redis
import json
import pickle
from typing import Optional, Any

from ..core.interfaces import CacheServiceInterface


class CacheService(CacheServiceInterface):
    """Redis-based caching service"""
    
    def __init__(self):
        self.redis_client = self._init_redis()
        self.default_ttl = 3600  # 1 hour
    
    def _init_redis(self):
        """Initialize Redis connection"""
        return redis.Redis(
            host=os.environ.get('REDIS_HOST', 'localhost'),
            port=int(os.environ.get('REDIS_PORT', 6379)),
            decode_responses=False,  # Handle binary data
            connection_pool_kwargs={
                'max_connections': 50,
                'socket_keepalive': True,
                'socket_keepalive_options': {
                    1: 1,  # TCP_KEEPIDLE
                    2: 3,  # TCP_KEEPINTVL
                    3: 5   # TCP_KEEPCNT
                }
            }
        )
    
    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        try:
            value = self.redis_client.get(key)
            if value:
                # Try to deserialize as JSON first
                try:
                    return json.loads(value)
                except:
                    # Fall back to pickle
                    return pickle.loads(value)
        except Exception as e:
            self.logger.error(f"Cache get error: {e}")
        
        return None
    
    async def set(self, key: str, value: Any, ttl: int = None) -> None:
        """Set value in cache with TTL"""
        ttl = ttl or self.default_ttl
        
        try:
            # Try to serialize as JSON first (more portable)
            try:
                serialized = json.dumps(value)
            except:
                # Fall back to pickle for complex objects
                serialized = pickle.dumps(value)
            
            self.redis_client.setex(key, ttl, serialized)
        
        except Exception as e:
            self.logger.error(f"Cache set error: {e}")
    
    async def delete(self, key: str) -> None:
        """Delete value from cache"""
        try:
            self.redis_client.delete(key)
        except Exception as e:
            self.logger.error(f"Cache delete error: {e}")
    
    async def clear_pattern(self, pattern: str) -> int:
        """Clear all keys matching pattern"""
        try:
            keys = self.redis_client.keys(pattern)
            if keys:
                return self.redis_client.delete(*keys)
            return 0
        except Exception as e:
            self.logger.error(f"Cache clear pattern error: {e}")
            return 0

Test Implementation (tests/unit/test_parsers.py)

import pytest
from decimal import Decimal
from datetime import datetime
import os

from src.core.models import PDFDocument, Vendor, ParserType
from src.parsers.text_parser import TextParser
from src.parsers.ocr_parser import OCRParser
from src.parsers.ai_parser import AIParser


class TestTextParser:
    """Test suite for TextParser"""
    
    @pytest.fixture
    def text_parser(self):
        config = {
            "confidence_threshold": 0.7,
            "date_formats": ["%Y-%m-%d", "%d/%m/%Y"]
        }
        return TextParser(config)
    
    @pytest.fixture
    def sample_document(self):
        return PDFDocument(
            document_id="test-001",
            file_path="tests/fixtures/sample_invoice.pdf",
            page_count=1
        )
    
    def test_parse_invoice_number(self, text_parser):
        """Test invoice number extraction"""
        text = "Invoice #: INV-2024-001\nDate: 2024-01-15"
        result = text_parser._extract_invoice_number(text)
        assert result == "INV-2024-001"
    
    def test_parse_date(self, text_parser):
        """Test date extraction"""
        text = "Invoice Date: 2024-01-15\nDue Date: 2024-02-15"
        
        invoice_date = text_parser._extract_date(text, "invoice date")
        assert invoice_date == datetime(2024, 1, 15)
        
        due_date = text_parser._extract_date(text, "due date")
        assert due_date == datetime(2024, 2, 15)
    
    def test_parse_amount(self, text_parser):
        """Test amount extraction"""
        text = "Subtotal: $1,000.00\nTax: $100.00\nTotal: $1,100.00"
        
        total = text_parser._extract_amount(text, "total")
        assert total == Decimal("1100.00")
        
        subtotal = text_parser._extract_amount(text, "subtotal")
        assert subtotal == Decimal("1000.00")
    
    def test_parse_complete_invoice(self, text_parser, sample_document):
        """Test complete invoice parsing"""
        result = text_parser.parse(sample_document)
        
        assert result.job_id == "test-001"
        assert result.parser_used == "text"
        assert result.invoice_number is not None
        assert result.total_amount > 0
        assert len(result.confidence_scores) > 0


class TestOCRParser:
    """Test suite for OCRParser"""
    
    @pytest.fixture
    def ocr_parser(self):
        config = {
            "ocr_engine": "tesseract",
            "language": "eng",
            "dpi": 300
        }
        return OCRParser(config)
    
    def test_image_preprocessing(self, ocr_parser):
        """Test image preprocessing steps"""
        # Create test image
        import numpy as np
        from PIL import Image
        
        # Create a simple test image
        img_array = np.ones((100, 100), dtype=np.uint8) * 255
        img_array[40:60, 40:60] = 0  # Add black square
        
        test_image = Image.fromarray(img_array)
        
        # Process image
        processed = ocr_parser._preprocess_image(test_image)
        
        assert processed is not None
        assert processed.shape == (100, 100)
    
    @pytest.mark.skipif(not os.path.exists("/usr/bin/tesseract"), 
                        reason="Tesseract not installed")
    def test_ocr_extraction(self, ocr_parser):
        """Test OCR text extraction"""
        # This test requires Tesseract to be installed
        # In CI/CD, you would have a proper test image
        pass


class TestAIParser:
    """Test suite for AIParser"""
    
    @pytest.fixture
    def ai_parser(self, mocker):
        # Mock the LLM client
        mock_client = mocker.Mock()
        
        config = {
            "model": "gpt-4",
            "temperature": 0.1,
            "api_key": "test-key"
        }
        
        parser = AIParser(config)
        parser.llm_client = mock_client
        
        return parser, mock_client
    
    def test_prompt_generation(self, ai_parser):
        """Test extraction prompt generation"""
        parser, _ = ai_parser
        
        text = "Invoice from ABC Company"
        prompt = parser._generate_extraction_prompt(text)
        
        assert "Invoice Text:" in prompt
        assert text in prompt
        assert "vendor_name" in prompt
        assert "invoice_number" in prompt
    
    def test_parse_llm_response(self, ai_parser):
        """Test LLM response parsing"""
        parser, _ = ai_parser
        
        response = '''```json
        {
            "vendor_name": "ABC Company",
            "invoice_number": "INV-001",
            "total_amount": 1000.00
        }
        ```'''
        
        data = parser._parse_llm_response(response)
        
        assert data["vendor_name"] == "ABC Company"
        assert data["invoice_number"] == "INV-001"
        assert data["total_amount"] == 1000.00
    
    def test_build_parse_result(self, ai_parser):
        """Test building ParseResult from extracted data"""
        parser, _ = ai_parser
        
        data = {
            "vendor_name": "ABC Company",
            "invoice_number": "INV-001",
            "invoice_date": "2024-01-15",
            "total_amount": 1000.00,
            "line_items": [
                {
                    "description": "Service",
                    "quantity": 1,
                    "unit_price": 1000.00,
                    "total_price": 1000.00
                }
            ]
        }
        
        document = PDFDocument(document_id="test-001")
        result = parser._build_parse_result(data, document)
        
        assert result.vendor_id == "ABC Company"
        assert result.invoice_number == "INV-001"
        assert result.total_amount == Decimal("1000.00")
        assert len(result.line_items) == 1
        assert result.line_items[0].description == "Service"

Integration Test (tests/integration/test_end_to_end.py)

import pytest import asyncio from pathlib import Path

from src.core.models import PDFDocument, Vendor, ParserType from src.services.vendor_detector import VendorDetector from src.services.parser_strategy import ParserStrategy from src.services.cache_service import CacheService

class TestEndToEnd: """End-to-end integration tests"""

@pytest.fixture
async def services(self):
    """Initialize services for testing"""
    cache = CacheService()
    vendor_detector = VendorDetector(cache, vendor_repository=None)
    parser_strategy = ParserStrategy()
    
    return {
        'cache': cache,
        'vendor_detector': vendor_detector,
        'parser_strategy': parser_strategy
    }

@pytest.fixture
def test_invoices(self):
    """Load test invoice files"""
    test_dir = Path("tests/fixtures/invoices")
    return list(test_dir.glob("*.pdf"))

@pytest.mark.asyncio
async def test_complete_parsing_flow(self, services, test_invoices):
    """Test complete parsing flow for multiple vendors"""
    
    results = []
    
    for invoice_path in test_invoices:
        # Create document
        document = PDFDocument(
            document_id=f"test-{invoice_path.stem}",
            file_path=str(invoice_path)
        )
        
        # Detect vendor
        vendor = services['vendor_detector'].detect_vendor(document)
        
        # Select parser
        parser = services['parser_strategy'].select_parser(document, vendor)
        
        # Parse document
        result = parser.parse(document)
        
        # Validate result
        assert result.job_id == document.document_id
        assert result.parser_used in ["text", "ocr", "ai"]
        
        if result.errors:
            pytest.fail(f"Parsing failed for {invoice_path.name}: {result.errors}")
        
        results.append({
            'file': invoice_path.name,
            'vendor': vendor.vendor_id if vendor else "unknown",
            'parser': result.parser_used,
            'confidence': parser.get_confidence(),
            'invoice_number': result.invoice_number,
            'total': result.total_amount
        })
    
    # Print summary
    print("\n=== Parsing Results ===")
    for r in results:
        print(f"{r['file']}: {r['vendor']} - {r['invoice_number']} - ${r['total']}")
    
    # Assert we processed all files
    assert len(results) == len(test_invoices)
    
    # Check success rate
    successful = sum(1 for r in results if r['invoice_number'])
    success_rate = successful / len(results)
    assert success_rate >= 0.8  # 80% success rate minimum

Performance Test (tests/performance/test_load.py)

import pytest import time import concurrent.futures from statistics import mean, stdev

from src.parsers.text_parser import TextParser from src.core.models import PDFDocument

class TestPerformance: """Performance and load tests"""

@pytest.fixture
def parser(self):
    return TextParser({})

def test_parsing_performance(self, parser, benchmark):
    """Test single document parsing performance"""
    document = PDFDocument(
        document_id="perf-test",
        file_path="tests/fixtures/sample_invoice.pdf"
    )
    
    # Benchmark parsing
    result = benchmark(parser.parse, document)
    
    assert result is not None
    assert benchmark.stats['mean'] < 5.0  # Should parse in under 5 seconds

def test_concurrent_parsing(self, parser):
    """Test concurrent parsing performance"""
    documents = [
        PDFDocument(
            document_id=f"concurrent-{i}",
            file_path="tests/fixtures/sample_invoice.pdf"
        )
        for i in range(10)
    ]
    
    start_time = time.time()
    
    # Parse documents concurrently
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(parser.parse, doc) for doc in documents]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]
    
    total_time = time.time() - start_time
    
    # All documents should be parsed
    assert len(results) == 10
    
    # Should complete in reasonable time
    assert total_time < 20.0  # 10 documents in under 20 seconds
    
    # Calculate statistics
    processing_times = [r.processing_time for r in results]
    avg_time = mean(processing_times)
    std_dev = stdev(processing_times) if len(processing_times) > 1 else 0
    
    print(f"\nConcurrent Parsing Results:")
    print(f"Total Time: {total_time:.2f}s")
    print(f"Average Time: {avg_time:.2f}s")
    print(f"Std Dev: {std_dev:.2f}s")
    
    # Performance assertions
    assert avg_time < 2.0  # Average under 2 seconds per document
    assert std_dev < 1.0   # Consistent performance

Docker Configuration (Dockerfile)

Multi-stage build for optimal size

FROM python:3.11-slim as builder

Install system dependencies

RUN apt-get update && apt-get install -y
gcc
g++
make
tesseract-ocr
tesseract-ocr-eng
poppler-utils
libpoppler-cpp-dev
&& rm -rf /var/lib/apt/lists/*

Create virtual environment

RUN python -m venv /opt/venv ENV PATH="/opt/venv/bin:$PATH"

Copy and install Python dependencies

COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt

Runtime stage

FROM python:3.11-slim

Install runtime dependencies only

RUN apt-get update && apt-get install -y
tesseract-ocr
tesseract-ocr-eng
poppler-utils
&& rm -rf /var/lib/apt/lists/*

Copy virtual environment from builder

COPY --from=builder /opt/venv /opt/venv ENV PATH="/opt/venv/bin:$PATH"

Create app directory

WORKDIR /app

Copy application code

COPY src/ ./src/ COPY config/ ./config/

Create non-root user

RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser

For Lambda deployment

RUN pip install awslambdaric

Set entrypoint for Lambda

ENTRYPOINT ["python", "-m", "awslambdaric"] CMD ["src.handlers.parsing_handler.lambda_handler"]

Requirements File (requirements.txt)

Core dependencies

pdfplumber==0.9.0 PyPDF2==3.0.1 pdf2image==1.16.3 pytesseract==0.3.10 opencv-python-headless==4.8.1.78 Pillow==10.1.0

AI/ML

openai==1.3.0 anthropic==0.7.0 langchain==0.0.350

Web framework

fastapi==0.104.1 uvicorn[standard]==0.24.0 pydantic==2.5.0

AWS

boto3==1.29.0 aws-lambda-powertools==2.26.0

Database and caching

redis==5.0.1 motor==3.3.2

Data processing

pandas==2.1.3 numpy==1.25.2 python-dateutil==2.8.2

Utilities

structlog==23.2.0 python-multipart==0.0.6 fuzzywuzzy==0.18.0 python-Levenshtein==0.23.0

Testing

pytest==7.4.3 pytest-asyncio==0.21.1 pytest-mock==3.12.0 pytest-benchmark==4.0.0 pytest-cov==4.1.0

Development

black==23.11.0 flake8==6.1.0 mypy==1.7.0 pre-commit==3.5.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment