# PDF Invoice Parser System - Implementation Guide ## Project Structure ``` invoice-parser/ ├── src/ │ ├── __init__.py │ ├── core/ │ │ ├── __init__.py │ │ ├── models.py │ │ ├── interfaces.py │ │ └── exceptions.py │ ├── parsers/ │ │ ├── __init__.py │ │ ├── base_parser.py │ │ ├── text_parser.py │ │ ├── ocr_parser.py │ │ └── ai_parser.py │ ├── services/ │ │ ├── __init__.py │ │ ├── vendor_detector.py │ │ ├── parser_strategy.py │ │ ├── cache_service.py │ │ └── storage_service.py │ ├── handlers/ │ │ ├── __init__.py │ │ ├── upload_handler.py │ │ ├── parsing_handler.py │ │ └── result_handler.py │ ├── api/ │ │ ├── __init__.py │ │ ├── app.py │ │ ├── routes.py │ │ └── middleware.py │ └── utils/ │ ├── __init__.py │ ├── logger.py │ ├── metrics.py │ └── validators.py ├── tests/ │ ├── unit/ │ ├── integration/ │ └── fixtures/ ├── config/ │ ├── default.yaml │ ├── development.yaml │ └── production.yaml ├── infrastructure/ │ ├── terraform/ │ └── docker/ ├── requirements.txt ├── Dockerfile └── README.md ``` ## Core Models (src/core/models.py) ```python from dataclasses import dataclass, field from datetime import datetime from decimal import Decimal from typing import Dict, List, Optional, Any from enum import Enum import uuid class ParserType(Enum): TEXT = "text" OCR = "ocr" AI = "ai" HYBRID = "hybrid" class ProcessingStatus(Enum): PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" RETRY = "retry" @dataclass class PDFDocument: """Represents a PDF document to be parsed""" document_id: str = field(default_factory=lambda: str(uuid.uuid4())) file_path: str = "" file_size: int = 0 page_count: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) created_at: datetime = field(default_factory=datetime.utcnow) content_hash: Optional[str] = None def get_page(self, page_num: int) -> 'PDFPage': """Retrieve a specific page from the document""" pass def get_text(self, start_page: int = 0, end_page: Optional[int] = None) -> str: """Extract text from specified page range""" pass @dataclass class PDFPage: """Represents a single page in a PDF document""" page_number: int width: float height: float text_content: str = "" images: List['Image'] = field(default_factory=list) tables: List['Table'] = field(default_factory=list) def get_text_blocks(self) -> List['TextBlock']: """Extract text blocks with position information""" pass @dataclass class LineItem: """Represents a line item in an invoice""" description: str quantity: float = 1.0 unit_price: Decimal = Decimal("0.00") total_price: Decimal = Decimal("0.00") tax_rate: float = 0.0 tax_amount: Decimal = Decimal("0.00") metadata: Dict[str, Any] = field(default_factory=dict) def calculate_total(self) -> Decimal: """Calculate total including tax""" base_total = Decimal(str(self.quantity)) * self.unit_price tax = base_total * Decimal(str(self.tax_rate)) return base_total + tax @dataclass class ParseResult: """Result of parsing an invoice""" job_id: str vendor_id: Optional[str] = None invoice_number: Optional[str] = None invoice_date: Optional[datetime] = None due_date: Optional[datetime] = None total_amount: Decimal = Decimal("0.00") subtotal: Decimal = Decimal("0.00") tax_amount: Decimal = Decimal("0.00") currency: str = "USD" line_items: List[LineItem] = field(default_factory=list) extracted_fields: Dict[str, Any] = field(default_factory=dict) confidence_scores: Dict[str, float] = field(default_factory=dict) parser_used: str = "" processing_time: float = 0.0 errors: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" return { "job_id": self.job_id, "vendor_id": self.vendor_id, "invoice_number": self.invoice_number, "invoice_date": self.invoice_date.isoformat() if self.invoice_date else None, "due_date": self.due_date.isoformat() if self.due_date else None, "total_amount": str(self.total_amount), "subtotal": str(self.subtotal), "tax_amount": str(self.tax_amount), "currency": self.currency, "line_items": [ { "description": item.description, "quantity": item.quantity, "unit_price": str(item.unit_price), "total_price": str(item.total_price), "tax_rate": item.tax_rate, "tax_amount": str(item.tax_amount) } for item in self.line_items ], "extracted_fields": self.extracted_fields, "confidence_scores": self.confidence_scores, "parser_used": self.parser_used, "processing_time": self.processing_time, "errors": self.errors } @dataclass class Vendor: """Vendor configuration""" vendor_id: str vendor_name: str parser_strategy: ParserType = ParserType.TEXT parser_config: Dict[str, Any] = field(default_factory=dict) field_mappings: Dict[str, str] = field(default_factory=dict) validation_rules: List[Dict[str, Any]] = field(default_factory=list) confidence_threshold: float = 0.8 regex_patterns: Dict[str, str] = field(default_factory=dict) is_active: bool = True created_at: datetime = field(default_factory=datetime.utcnow) updated_at: datetime = field(default_factory=datetime.utcnow) ``` ## Interfaces (src/core/interfaces.py) ```python from abc import ABC, abstractmethod from typing import Optional, Dict, Any, List from .models import PDFDocument, ParseResult, Vendor class InvoiceParser(ABC): """Base interface for all invoice parsers""" @abstractmethod def parse(self, document: PDFDocument) -> ParseResult: """Parse invoice document and extract data""" pass @abstractmethod def validate(self, result: ParseResult) -> bool: """Validate parsed results""" pass @abstractmethod def get_confidence(self) -> float: """Get parser confidence score""" pass class VendorDetectorInterface(ABC): """Interface for vendor detection""" @abstractmethod def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]: """Detect vendor from document""" pass @abstractmethod def update_vendor_patterns(self, vendor: Vendor, patterns: List[str]) -> None: """Update vendor detection patterns""" pass class CacheServiceInterface(ABC): """Interface for caching service""" @abstractmethod async def get(self, key: str) -> Optional[Any]: """Get value from cache""" pass @abstractmethod async def set(self, key: str, value: Any, ttl: int = 3600) -> None: """Set value in cache with TTL""" pass @abstractmethod async def delete(self, key: str) -> None: """Delete value from cache""" pass class StorageServiceInterface(ABC): """Interface for document storage""" @abstractmethod async def upload(self, file_path: str, content: bytes) -> str: """Upload document to storage""" pass @abstractmethod async def download(self, file_path: str) -> bytes: """Download document from storage""" pass @abstractmethod async def delete(self, file_path: str) -> None: """Delete document from storage""" pass ``` ## Base Parser (src/parsers/base_parser.py) ```python from abc import abstractmethod import time from typing import Dict, Any, Optional import structlog from ..core.interfaces import InvoiceParser from ..core.models import PDFDocument, ParseResult, ProcessingStatus from ..core.exceptions import ParsingError, ValidationError from ..utils.metrics import MetricsCollector class BaseParser(InvoiceParser): """Base implementation for all parsers""" def __init__(self, config: Dict[str, Any]): self.config = config self.logger = structlog.get_logger() self.metrics = MetricsCollector() self._confidence = 0.0 def parse(self, document: PDFDocument) -> ParseResult: """Parse document with timing and error handling""" start_time = time.time() try: # Log parsing start self.logger.info( "Starting document parsing", parser_type=self.__class__.__name__, document_id=document.document_id, page_count=document.page_count ) # Extract metadata metadata = self.extract_metadata(document) # Perform parsing result = self._parse_implementation(document) # Calculate processing time result.processing_time = time.time() - start_time # Validate result if not self.validate(result): raise ValidationError("Parsed result failed validation") # Log success self.logger.info( "Document parsed successfully", document_id=document.document_id, processing_time=result.processing_time, confidence=self._confidence ) # Emit metrics self.metrics.increment( "documents_parsed", tags={"parser": self.__class__.__name__, "status": "success"} ) return result except Exception as e: # Log error self.logger.error( "Document parsing failed", document_id=document.document_id, error=str(e), parser_type=self.__class__.__name__ ) # Emit error metric self.metrics.increment( "parsing_errors", tags={"parser": self.__class__.__name__, "error_type": type(e).__name__} ) # Create error result return ParseResult( job_id=document.document_id, parser_used=self.__class__.__name__, processing_time=time.time() - start_time, errors=[str(e)] ) @abstractmethod def _parse_implementation(self, document: PDFDocument) -> ParseResult: """Actual parsing implementation to be overridden""" pass def extract_metadata(self, document: PDFDocument) -> Dict[str, Any]: """Extract document metadata""" return { "page_count": document.page_count, "file_size": document.file_size, "created_at": document.created_at.isoformat(), "document_id": document.document_id } def validate(self, result: ParseResult) -> bool: """Validate parsed results""" # Basic validation if not result.invoice_number: self.logger.warning("Missing invoice number") return False if result.total_amount <= 0: self.logger.warning("Invalid total amount", amount=result.total_amount) return False # Check confidence threshold avg_confidence = sum(result.confidence_scores.values()) / len(result.confidence_scores) if result.confidence_scores else 0 if avg_confidence < self.config.get("confidence_threshold", 0.5): self.logger.warning("Low confidence score", confidence=avg_confidence) return False return True def get_confidence(self) -> float: """Get parser confidence score""" return self._confidence ``` ## Text Parser (src/parsers/text_parser.py) ```python import re from decimal import Decimal from datetime import datetime from typing import Dict, List, Tuple, Optional import pdfplumber import pandas as pd from .base_parser import BaseParser from ..core.models import PDFDocument, ParseResult, LineItem from ..core.exceptions import ParsingError class TextParser(BaseParser): """Parser for digital PDFs with extractable text""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.extraction_library = config.get("extraction_library", "pdfplumber") self.date_formats = config.get("date_formats", [ "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%d.%m.%Y", "%B %d, %Y" ]) def _parse_implementation(self, document: PDFDocument) -> ParseResult: """Extract data from digital PDF""" # Extract text and tables text, tables = self._extract_content(document.file_path) # Create result result = ParseResult(job_id=document.document_id, parser_used="text") # Extract fields using regex patterns result.invoice_number = self._extract_invoice_number(text) result.invoice_date = self._extract_date(text, "invoice date") result.due_date = self._extract_date(text, "due date") # Extract amounts result.total_amount = self._extract_amount(text, "total") result.subtotal = self._extract_amount(text, "subtotal") result.tax_amount = self._extract_amount(text, "tax") # Extract line items from tables if tables: result.line_items = self._extract_line_items(tables) # Calculate confidence scores result.confidence_scores = self._calculate_field_confidence(result, text) # Set overall confidence self._confidence = sum(result.confidence_scores.values()) / len(result.confidence_scores) return result def _extract_content(self, file_path: str) -> Tuple[str, List[pd.DataFrame]]: """Extract text and tables from PDF""" full_text = "" all_tables = [] try: with pdfplumber.open(file_path) as pdf: for page in pdf.pages: # Extract text page_text = page.extract_text() if page_text: full_text += page_text + "\n" # Extract tables tables = page.extract_tables() if tables: for table in tables: df = pd.DataFrame(table[1:], columns=table[0]) all_tables.append(df) except Exception as e: raise ParsingError(f"Failed to extract content: {str(e)}") return full_text, all_tables def _extract_invoice_number(self, text: str) -> Optional[str]: """Extract invoice number using regex patterns""" patterns = [ r"Invoice\s*#?\s*:?\s*([A-Z0-9-]+)", r"Invoice Number\s*:?\s*([A-Z0-9-]+)", r"Bill\s*#?\s*:?\s*([A-Z0-9-]+)", r"Reference\s*:?\s*([A-Z0-9-]+)" ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(1) return None def _extract_date(self, text: str, date_type: str) -> Optional[datetime]: """Extract date from text""" # Common date patterns date_patterns = [ rf"{date_type}\s*:?\s*(\d{{1,2}}[-/\.]\d{{1,2}}[-/\.]\d{{2,4}})", rf"{date_type}\s*:?\s*(\w+\s+\d{{1,2}},?\s+\d{{4}})", rf"{date_type}\s*:?\s*(\d{{4}}[-/]\d{{2}}[-/]\d{{2}})" ] for pattern in date_patterns: match = re.search(pattern, text, re.IGNORECASE) if match: date_str = match.group(1) # Try to parse with different formats for fmt in self.date_formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue return None def _extract_amount(self, text: str, amount_type: str) -> Decimal: """Extract monetary amount from text""" # Currency symbols and patterns currency_pattern = r"[$£€¥]?\s*([0-9,]+\.?\d{0,2})" # Search patterns based on amount type patterns = [ rf"{amount_type}\s*:?\s*{currency_pattern}", rf"{amount_type}\s+amount\s*:?\s*{currency_pattern}", rf"amount\s+{amount_type}\s*:?\s*{currency_pattern}" ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: # Clean and convert amount amount_str = match.group(1).replace(",", "") try: return Decimal(amount_str) except: continue return Decimal("0.00") def _extract_line_items(self, tables: List[pd.DataFrame]) -> List[LineItem]: """Extract line items from tables""" line_items = [] for df in tables: # Try to identify line item table if self._is_line_item_table(df): for _, row in df.iterrows(): item = self._parse_line_item_row(row) if item: line_items.append(item) return line_items def _is_line_item_table(self, df: pd.DataFrame) -> bool: """Check if table contains line items""" # Look for common column headers headers = [col.lower() for col in df.columns if col] required_headers = ["description", "quantity", "price", "amount", "total"] matches = sum(1 for req in required_headers if any(req in header for header in headers)) return matches >= 2 def _parse_line_item_row(self, row: pd.Series) -> Optional[LineItem]: """Parse a single line item from table row""" try: # Find columns by partial match description = self._find_column_value(row, ["description", "item", "product"]) quantity = self._find_column_value(row, ["qty", "quantity", "units"], numeric=True) unit_price = self._find_column_value(row, ["price", "rate", "unit"], numeric=True) total = self._find_column_value(row, ["total", "amount", "subtotal"], numeric=True) if description and (unit_price or total): return LineItem( description=str(description), quantity=float(quantity) if quantity else 1.0, unit_price=Decimal(str(unit_price)) if unit_price else Decimal("0.00"), total_price=Decimal(str(total)) if total else Decimal("0.00") ) except Exception as e: self.logger.debug(f"Failed to parse line item: {e}") return None def _find_column_value(self, row: pd.Series, keywords: List[str], numeric: bool = False): """Find value in row by column keywords""" for col in row.index: if any(keyword in str(col).lower() for keyword in keywords): value = row[col] if numeric and value: # Clean numeric value value = re.sub(r'[^\d.,]', '', str(value)) value = value.replace(',', '') try: return float(value) except: return None return value return None def _calculate_field_confidence(self, result: ParseResult, text: str) -> Dict[str, float]: """Calculate confidence scores for extracted fields""" scores = {} # Invoice number confidence if result.invoice_number: scores["invoice_number"] = 0.9 if len(result.invoice_number) > 3 else 0.7 else: scores["invoice_number"] = 0.0 # Date confidence scores["invoice_date"] = 0.9 if result.invoice_date else 0.0 scores["due_date"] = 0.8 if result.due_date else 0.3 # Optional field # Amount confidence based on consistency if result.line_items: calculated_total = sum(item.total_price for item in result.line_items) difference = abs(calculated_total - result.subtotal) / result.subtotal if result.subtotal > 0 else 1 scores["amounts"] = 0.9 if difference < 0.02 else 0.7 else: scores["amounts"] = 0.6 if result.total_amount > 0 else 0.0 return scores ``` ## OCR Parser (src/parsers/ocr_parser.py) ```python import cv2 import numpy as np from PIL import Image import pytesseract import pdf2image from typing import List, Tuple, Optional from .base_parser import BaseParser from ..core.models import PDFDocument, ParseResult from ..core.exceptions import ParsingError class OCRParser(BaseParser): """Parser for scanned PDFs using OCR""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.ocr_engine = config.get("ocr_engine", "tesseract") self.language = config.get("language", "eng") self.dpi = config.get("dpi", 300) self.preprocessing_steps = config.get("preprocessing_steps", [ "denoise", "deskew", "contrast" ]) def _parse_implementation(self, document: PDFDocument) -> ParseResult: """Extract data from scanned PDF using OCR""" # Convert PDF to images images = self._pdf_to_images(document.file_path) # Process each page full_text = "" for i, image in enumerate(images): # Preprocess image processed_image = self._preprocess_image(image) # Perform OCR page_text = self._perform_ocr(processed_image) full_text += f"\n--- Page {i+1} ---\n{page_text}" # Use text parser logic on OCR output text_parser = TextParser(self.config) result = text_parser._parse_implementation( PDFDocument( document_id=document.document_id, file_path="", # Not used page_count=len(images) ) ) # Override with OCR-specific processing result.parser_used = "ocr" result = self._enhance_ocr_results(result, full_text, images) return result def _pdf_to_images(self, pdf_path: str) -> List[Image.Image]: """Convert PDF pages to images""" try: images = pdf2image.convert_from_path( pdf_path, dpi=self.dpi, fmt='PNG', thread_count=4 ) return images except Exception as e: raise ParsingError(f"Failed to convert PDF to images: {str(e)}") def _preprocess_image(self, image: Image.Image) -> np.ndarray: """Preprocess image for better OCR results""" # Convert PIL image to OpenCV format img_array = np.array(image) # Convert to grayscale if len(img_array.shape) == 3: gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) else: gray = img_array # Apply preprocessing steps processed = gray if "denoise" in self.preprocessing_steps: processed = cv2.fastNlMeansDenoising(processed) if "deskew" in self.preprocessing_steps: processed = self._deskew_image(processed) if "contrast" in self.preprocessing_steps: processed = self._enhance_contrast(processed) if "threshold" in self.preprocessing_steps: _, processed = cv2.threshold( processed, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU ) return processed def _deskew_image(self, image: np.ndarray) -> np.ndarray: """Correct image skew""" # Find all text regions coords = np.column_stack(np.where(image > 0)) # Calculate rotation angle angle = cv2.minAreaRect(coords)[-1] if angle < -45: angle = 90 + angle # Rotate image (h, w) = image.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) rotated = cv2.warpAffine( image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE ) return rotated def _enhance_contrast(self, image: np.ndarray) -> np.ndarray: """Enhance image contrast""" # Apply CLAHE (Contrast Limited Adaptive Histogram Equalization) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) enhanced = clahe.apply(image) return enhanced def _perform_ocr(self, image: np.ndarray) -> str: """Perform OCR on preprocessed image""" if self.ocr_engine == "tesseract": # Configure Tesseract custom_config = r'--oem 3 --psm 6' # Perform OCR text = pytesseract.image_to_string( image, lang=self.language, config=custom_config ) # Also get detailed data for confidence scoring data = pytesseract.image_to_data( image, lang=self.language, output_type=pytesseract.Output.DICT ) # Calculate average confidence confidences = [int(conf) for conf in data['conf'] if int(conf) > 0] self._confidence = sum(confidences) / len(confidences) / 100 if confidences else 0.0 return text else: # Placeholder for other OCR engines (Google Vision, AWS Textract) raise NotImplementedError(f"OCR engine {self.ocr_engine} not implemented") def _enhance_ocr_results(self, result: ParseResult, full_text: str, images: List[Image.Image]) -> ParseResult: """Enhance OCR results with additional processing""" # If confidence is low, try table detection if self._confidence < 0.7 and not result.line_items: for image in images: tables = self._detect_tables(np.array(image)) if tables: # Process detected tables result.line_items.extend(self._process_table_regions(tables, image)) # Update confidence scores result.confidence_scores["ocr_quality"] = self._confidence return result def _detect_tables(self, image: np.ndarray) -> List[Tuple[int, int, int, int]]: """Detect table regions in image""" # Convert to grayscale if needed if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image # Detect horizontal and vertical lines horizontal = self._detect_lines(gray, horizontal=True) vertical = self._detect_lines(gray, horizontal=False) # Combine lines to find table regions table_mask = cv2.addWeighted(horizontal, 0.5, vertical, 0.5, 0.0) # Find contours contours, _ = cv2.findContours( table_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) # Filter and return table regions tables = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) if w > 100 and h > 50: # Minimum table size tables.append((x, y, w, h)) return tables def _detect_lines(self, image: np.ndarray, horizontal: bool = True) -> np.ndarray: """Detect horizontal or vertical lines in image""" # Create structure element for morphology if horizontal: kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1)) else: kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40)) # Apply morphology operations binary = cv2.adaptiveThreshold( image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2 ) lines = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) return lines def _process_table_regions(self, tables: List[Tuple[int, int, int, int]], image: Image.Image) -> List[LineItem]: """Process detected table regions to extract line items""" line_items = [] for x, y, w, h in tables: # Crop table region table_img = image.crop((x, y, x + w, y + h)) # Perform OCR on table region table_text = pytesseract.image_to_string(table_img) # Parse table text # This is simplified - in production, use more sophisticated parsing lines = table_text.strip().split('\n') for line in lines: if line and not any(header in line.lower() for header in ['description', 'quantity', 'price']): # Basic parsing - extract numbers and text parts = line.split() if len(parts) >= 2: description = ' '.join(parts[:-2]) try: amount = Decimal(parts[-1].replace(',', '')) if description and amount > 0: line_items.append(LineItem( description=description, total_price=amount )) except: continue return line_items ``` ## AI Parser (src/parsers/ai_parser.py) ```python import json import asyncio from typing import Dict, Any, List, Optional from decimal import Decimal from datetime import datetime from .base_parser import BaseParser from ..core.models import PDFDocument, ParseResult, LineItem from ..core.exceptions import ParsingError class AIParser(BaseParser): """AI-powered parser using LLMs""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.model = config.get("model", "gpt-4") self.temperature = config.get("temperature", 0.1) self.max_tokens = config.get("max_tokens", 4000) self.retry_attempts = config.get("retry_attempts", 3) self.llm_client = self._initialize_client() def _initialize_client(self): """Initialize LLM client based on model""" if "gpt" in self.model: from openai import OpenAI return OpenAI(api_key=self.config.get("api_key")) elif "claude" in self.model: from anthropic import Anthropic return Anthropic(api_key=self.config.get("api_key")) else: raise ValueError(f"Unsupported model: {self.model}") def _parse_implementation(self, document: PDFDocument) -> ParseResult: """Extract data using AI/LLM""" # Extract text for context text = self._get_document_text(document) # Generate extraction prompt prompt = self._generate_extraction_prompt(text) # Call LLM with retry logic for attempt in range(self.retry_attempts): try: response = self._call_llm(prompt) extracted_data = self._parse_llm_response(response) # Build result result = self._build_parse_result(extracted_data, document) # Validate extraction if self.validate(result): return result except Exception as e: self.logger.warning(f"AI parsing attempt {attempt + 1} failed: {e}") if attempt == self.retry_attempts - 1: raise ParsingError(f"AI parsing failed after {self.retry_attempts} attempts") raise ParsingError("Failed to extract valid data") def _get_document_text(self, document: PDFDocument) -> str: """Extract text from document for AI processing""" try: # Try text extraction first text = document.get_text() if text and len(text) > 100: return text[:self.max_tokens] # Limit for token constraints except: pass # Fallback to OCR if needed from .ocr_parser import OCRParser ocr_parser = OCRParser(self.config) images = ocr_parser._pdf_to_images(document.file_path) text = "" for image in images[:3]: # Process first 3 pages only processed = ocr_parser._preprocess_image(image) page_text = ocr_parser._perform_ocr(processed) text += page_text + "\n" if len(text) > self.max_tokens: break return text[:self.max_tokens] def _generate_extraction_prompt(self, text: str) -> str: """Generate prompt for invoice data extraction""" prompt = f"""Extract invoice information from the following text and return as JSON. Invoice Text: {text} Extract the following information: 1. vendor_name: Name of the company issuing the invoice 2. vendor_id: Any vendor ID or registration number 3. invoice_number: Invoice number or reference 4. invoice_date: Date of invoice (ISO format: YYYY-MM-DD) 5. due_date: Payment due date (ISO format: YYYY-MM-DD) 6. currency: Currency code (e.g., USD, EUR, GBP) 7. subtotal: Subtotal amount before tax 8. tax_amount: Tax amount 9. total_amount: Total amount including tax 10. line_items: Array of items with: - description: Item description - quantity: Quantity (default 1 if not specified) - unit_price: Price per unit - total_price: Total price for this line - tax_rate: Tax rate if specified Return ONLY valid JSON without any explanation or markdown formatting. Example response: {{ "vendor_name": "ABC Company Ltd", "invoice_number": "INV-2024-001", "invoice_date": "2024-01-15", "total_amount": 1250.00, "line_items": [ {{ "description": "Professional Services", "quantity": 1, "unit_price": 1000.00, "total_price": 1000.00 }} ] }} """ return prompt def _call_llm(self, prompt: str) -> str: """Call LLM API""" if "gpt" in self.model: response = self.llm_client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "You are an invoice data extraction assistant. Extract data accurately and return only JSON."}, {"role": "user", "content": prompt} ], temperature=self.temperature, response_format={"type": "json_object"} ) return response.choices[0].message.content elif "claude" in self.model: response = self.llm_client.messages.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=self.temperature, max_tokens=self.max_tokens ) return response.content[0].text else: raise NotImplementedError(f"Model {self.model} not implemented") def _parse_llm_response(self, response: str) -> Dict[str, Any]: """Parse LLM response to extract JSON data""" try: # Clean response - remove markdown if present cleaned = response.strip() if cleaned.startswith("```json"): cleaned = cleaned[7:] if cleaned.startswith("```"): cleaned = cleaned[3:] if cleaned.endswith("```"): cleaned = cleaned[:-3] # Parse JSON data = json.loads(cleaned.strip()) return data except json.JSONDecodeError as e: self.logger.error(f"Failed to parse LLM response as JSON: {e}") self.logger.debug(f"Response: {response}") raise ParsingError("Invalid JSON response from LLM") def _build_parse_result(self, data: Dict[str, Any], document: PDFDocument) -> ParseResult: """Build ParseResult from extracted data""" result = ParseResult( job_id=document.document_id, parser_used="ai" ) # Map extracted fields result.vendor_id = data.get("vendor_id") or data.get("vendor_name") result.invoice_number = data.get("invoice_number") result.currency = data.get("currency", "USD") # Parse dates if data.get("invoice_date"): result.invoice_date = self._parse_date(data["invoice_date"]) if data.get("due_date"): result.due_date = self._parse_date(data["due_date"]) # Parse amounts result.total_amount = Decimal(str(data.get("total_amount", 0))) result.subtotal = Decimal(str(data.get("subtotal", 0))) result.tax_amount = Decimal(str(data.get("tax_amount", 0))) # Parse line items result.line_items = self._parse_line_items(data.get("line_items", [])) # Store all extracted fields result.extracted_fields = data # Calculate confidence based on field completeness self._calculate_ai_confidence(result, data) return result def _parse_date(self, date_str: str) -> Optional[datetime]: """Parse date string to datetime""" if not date_str: return None # Try common formats formats = ["%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%Y/%m/%d"] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue # Try parsing with dateutil as fallback try: from dateutil import parser return parser.parse(date_str) except: self.logger.warning(f"Failed to parse date: {date_str}") return None def _parse_line_items(self, items_data: List[Dict[str, Any]]) -> List[LineItem]: """Parse line items from extracted data""" line_items = [] for item in items_data: try: line_item = LineItem( description=item.get("description", ""), quantity=float(item.get("quantity", 1)), unit_price=Decimal(str(item.get("unit_price", 0))), total_price=Decimal(str(item.get("total_price", 0))), tax_rate=float(item.get("tax_rate", 0)) ) # Calculate total if not provided if line_item.total_price == 0 and line_item.unit_price > 0: line_item.total_price = line_item.calculate_total() line_items.append(line_item) except Exception as e: self.logger.warning(f"Failed to parse line item: {e}") continue return line_items def _calculate_ai_confidence(self, result: ParseResult, data: Dict[str, Any]): """Calculate confidence scores for AI extraction""" required_fields = ["invoice_number", "invoice_date", "total_amount"] optional_fields = ["vendor_id", "due_date", "line_items", "currency"] # Check required fields required_score = sum(1 for field in required_fields if data.get(field)) / len(required_fields) # Check optional fields optional_score = sum(1 for field in optional_fields if data.get(field)) / len(optional_fields) # Overall confidence self._confidence = (required_score * 0.7) + (optional_score * 0.3) # Field-specific confidence result.confidence_scores = { "invoice_number": 0.9 if result.invoice_number else 0.0, "dates": 0.9 if result.invoice_date else 0.0, "amounts": 0.9 if result.total_amount > 0 else 0.0, "line_items": 0.8 if result.line_items else 0.3, "overall": self._confidence } ``` ## Vendor Detection Service (src/services/vendor_detector.py) ```python import re import hashlib from typing import Optional, List, Dict, Any import pickle from dataclasses import asdict from ..core.interfaces import VendorDetectorInterface, CacheServiceInterface from ..core.models import PDFDocument, Vendor, ParserType from ..core.exceptions import VendorDetectionError class VendorDetector(VendorDetectorInterface): """Service for detecting vendors from PDF documents""" def __init__(self, cache_service: CacheServiceInterface, vendor_repository, ai_classifier=None): self.cache = cache_service self.vendor_repo = vendor_repository self.ai_classifier = ai_classifier self._vendor_patterns = {} self._load_vendor_patterns() def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]: """Detect vendor from document""" # Generate document hash for caching doc_hash = self._generate_document_hash(document) # Check cache cached_vendor = asyncio.run(self._check_cache(doc_hash)) if cached_vendor: return cached_vendor # Extract identifying text (first 2000 chars) try: text = document.get_text(end_page=2)[:2000] except: # If text extraction fails, return None for AI parsing return None # Try pattern matching vendor = self._match_vendor_patterns(text) if vendor: asyncio.run(self._update_cache(doc_hash, vendor)) return vendor # Try fuzzy matching on vendor names vendor = self._fuzzy_match_vendor(text) if vendor: asyncio.run(self._update_cache(doc_hash, vendor)) return vendor # Use AI classifier if available if self.ai_classifier: vendor = self._classify_with_ai(text) if vendor: asyncio.run(self._update_cache(doc_hash, vendor)) return vendor # Return None if no vendor detected return None def update_vendor_patterns(self, vendor: Vendor, patterns: List[str]) -> None: """Update vendor detection patterns""" vendor.regex_patterns["detection"] = patterns self.vendor_repo.update(vendor) self._load_vendor_patterns() # Reload patterns def _load_vendor_patterns(self): """Load all vendor patterns from repository""" vendors = self.vendor_repo.get_all_active() self._vendor_patterns = {} for vendor in vendors: if vendor.regex_patterns.get("detection"): self._vendor_patterns[vendor.vendor_id] = { "patterns": vendor.regex_patterns["detection"], "vendor": vendor } def _generate_document_hash(self, document: PDFDocument) -> str: """Generate hash for document caching""" # Use file size and first page content for hash try: content = f"{document.file_size}_{document.get_text(end_page=1)[:500]}" return hashlib.sha256(content.encode()).hexdigest() except: return hashlib.sha256(f"{document.document_id}".encode()).hexdigest() async def _check_cache(self, doc_hash: str) -> Optional[Vendor]: """Check cache for vendor detection result""" cached = await self.cache.get(f"vendor_detection:{doc_hash}") if cached: return Vendor(**cached) return None async def _update_cache(self, doc_hash: str, vendor: Vendor): """Update cache with vendor detection result""" await self.cache.set( f"vendor_detection:{doc_hash}", asdict(vendor), ttl=3600 # 1 hour ) def _match_vendor_patterns(self, text: str) -> Optional[Vendor]: """Match text against vendor patterns""" for vendor_id, pattern_data in self._vendor_patterns.items(): for pattern in pattern_data["patterns"]: try: if re.search(pattern, text, re.IGNORECASE | re.MULTILINE): self.logger.info(f"Vendor detected by pattern: {vendor_id}") return pattern_data["vendor"] except re.error: self.logger.warning(f"Invalid regex pattern for vendor {vendor_id}: {pattern}") return None def _fuzzy_match_vendor(self, text: str) -> Optional[Vendor]: """Fuzzy match vendor names in text""" from fuzzywuzzy import fuzz vendors = self.vendor_repo.get_all_active() best_match = None best_score = 0 # Extract potential company names from text company_patterns = [ r"^([A-Z][A-Za-z\s&,.\-]+(?:Inc|LLC|Ltd|Limited|Corp|Corporation|Company|Co)\.?)", r"From:\s*([A-Za-z\s&,.\-]+)", r"Bill To:.*?From:\s*([A-Za-z\s&,.\-]+)", ] potential_names = [] for pattern in company_patterns: matches = re.findall(pattern, text, re.MULTILINE) potential_names.extend(matches) # Match against known vendors for vendor in vendors: for name in potential_names: score = fuzz.ratio(vendor.vendor_name.lower(), name.lower()) if score > best_score and score > 80: # 80% threshold best_score = score best_match = vendor if best_match: self.logger.info(f"Vendor detected by fuzzy match: {best_match.vendor_id} (score: {best_score})") return best_match def _classify_with_ai(self, text: str) -> Optional[Vendor]: """Use AI to classify vendor""" if not self.ai_classifier: return None try: # Get vendor classification from AI result = self.ai_classifier.classify(text) if result.get("vendor_id"): vendor = self.vendor_repo.get(result["vendor_id"]) if vendor: self.logger.info(f"Vendor detected by AI: {vendor.vendor_id}") return vendor # If AI suggests a new vendor if result.get("vendor_name") and result.get("confidence", 0) > 0.8: # Create new vendor entry new_vendor = Vendor( vendor_id=self._generate_vendor_id(result["vendor_name"]), vendor_name=result["vendor_name"], parser_strategy=ParserType.AI, # Default to AI for new vendors confidence_threshold=0.7 ) # Save new vendor self.vendor_repo.create(new_vendor) self.logger.info(f"New vendor created by AI: {new_vendor.vendor_id}") return new_vendor except Exception as e: self.logger.error(f"AI classification failed: {e}") return None def _generate_vendor_id(self, vendor_name: str) -> str: """Generate vendor ID from name""" # Clean and format vendor name clean_name = re.sub(r'[^a-zA-Z0-9\s]', '', vendor_name) words = clean_name.upper().split() # Generate ID (e.g., "ABC_COMPANY_LTD") return '_'.join(words) ``` ## Parser Strategy (src/services/parser_strategy.py) ```pyhon from typing import Dict, Optional import time from ..core.models import PDFDocument, Vendor, ParserType from ..core.interfaces import InvoiceParser from ..parsers.text_parser import TextParser from ..parsers.ocr_parser import OCRParser from ..parsers.ai_parser import AIParser class ParserStrategy: """Strategy pattern implementation for parser selection""" def __init__(self, parser_registry=None): self.parsers: Dict[str, InvoiceParser] = {} self.parser_registry = parser_registry self.quality_threshold = { "text": 0.8, "ocr": 0.6, "ai": 0.5 } def select_parser(self, document: PDFDocument, vendor: Optional[Vendor]) -> InvoiceParser: """Select appropriate parser based on document and vendor""" # If vendor specified with preference, use it if vendor and vendor.parser_strategy != ParserType.AI: return self._get_parser_for_vendor(vendor) # Assess document quality quality = self._assess_document_quality(document) # Select parser based on quality if quality.is_digital and quality.text_quality > self.quality_threshold["text"]: parser_type = ParserType.TEXT elif quality.is_scanned and quality.scan_quality > self.quality_threshold["ocr"]: parser_type = ParserType.OCR else: parser_type = ParserType.AI # Override with vendor preference if specified if vendor and vendor.parser_strategy: parser_type = vendor.parser_strategy # Get or create parser return self._get_or_create_parser(parser_type, vendor) def register_parser(self, parser_key: str, parser: InvoiceParser): """Register a parser instance""" self.parsers[parser_key] = parser def _get_parser_for_vendor(self, vendor: Vendor) -> InvoiceParser: """Get parser configured for specific vendor""" parser_key = f"{vendor.vendor_id}_{vendor.parser_strategy.value}" if parser_key not in self.parsers: # Create vendor-specific parser config = vendor.parser_config.copy() config["vendor"] = vendor parser = self._create_parser(vendor.parser_strategy, config) self.parsers[parser_key] = parser return self.parsers[parser_key] def _get_or_create_parser(self, parser_type: ParserType, vendor: Optional[Vendor]) -> InvoiceParser: """Get or create parser instance""" # Generate cache key parser_key = f"{vendor.vendor_id if vendor else 'default'}_{parser_type.value}" if parser_key not in self.parsers: # Prepare configuration config = {} if vendor: config.update(vendor.parser_config) config["vendor"] = vendor # Create parser parser = self._create_parser(parser_type, config) self.parsers[parser_key] = parser return self.parsers[parser_key] def _create_parser(self, parser_type: ParserType, config: Dict) -> InvoiceParser: """Create parser instance""" if parser_type == ParserType.TEXT: return TextParser(config) elif parser_type == ParserType.OCR: return OCRParser(config) elif parser_type == ParserType.AI: return AIParser(config) else: raise ValueError(f"Unknown parser type: {parser_type}") def _assess_document_quality(self, document: PDFDocument) -> 'DocumentQuality': """Assess document quality for parser selection""" quality = DocumentQuality() try: # Try to extract text sample_text = document.get_text(end_page=1) if sample_text: # Analyze text quality quality.is_digital = True quality.text_quality = self._calculate_text_quality(sample_text) else: quality.is_digital = False quality.is_scanned = True except Exception as e: self.logger.debug(f"Failed to extract text: {e}") quality.is_scanned = True # Check if document has images (likely scanned) if document.metadata.get("has_images", False): quality.is_scanned = True quality.scan_quality = self._estimate_scan_quality(document) return quality def _calculate_text_quality(self, text: str) -> float: """Calculate text extraction quality score""" if not text: return 0.0 # Check for common indicators of good text extraction scores = [] # Has reasonable length scores.append(min(len(text) / 1000, 1.0)) # Normalize to 1000 chars # Has alphanumeric content alnum_ratio = sum(c.isalnum() for c in text) / len(text) scores.append(alnum_ratio) # Has proper spacing space_ratio = text.count(' ') / len(text) scores.append(min(space_ratio * 10, 1.0)) # Expect ~10% spaces # Has newlines (structure) newline_ratio = text.count('\n') / len(text) scores.append(min(newline_ratio * 100, 1.0)) # Expect ~1% newlines # Check for garbled text indicators garbled_indicators = ['�', '\x00', '\xff'] garbled_score = 1.0 - sum(text.count(ind) for ind in garbled_indicators) / len(text) scores.append(garbled_score) return sum(scores) / len(scores) def _estimate_scan_quality(self, document: PDFDocument) -> float: """Estimate scan quality from document metadata""" # This is a simplified estimation # In production, analyze image resolution, contrast, etc. quality_score = 0.7 # Default medium quality # Check resolution if available if document.metadata.get("dpi"): dpi = document.metadata["dpi"] if dpi >= 300: quality_score = 0.9 elif dpi >= 200: quality_score = 0.7 else: quality_score = 0.5 return quality_score class DocumentQuality: """Document quality assessment results""" def __init__(self): self.is_digital = False self.is_scanned = False self.text_quality = 0.0 self.scan_quality = 0.0 self.has_tables = False self.has_forms = False ``` ## Lambda Handlers (src/handlers/parsing_handler.py) ```python import json import os import boto3 from typing import Dict, Any import asyncio from ..core.models import PDFDocument, ProcessingStatus from ..services.vendor_detector import VendorDetector from ..services.parser_strategy import ParserStrategy from ..services.cache_service import CacheService from ..services.storage_service import S3StorageService from ..utils.logger import get_logger # Initialize services outside handler for reuse logger = get_logger() s3_client = boto3.client('s3') dynamodb = boto3.resource('dynamodb') # Initialize services cache_service = CacheService() storage_service = S3StorageService(s3_client) vendor_detector = VendorDetector(cache_service, vendor_repository=None) parser_strategy = ParserStrategy() def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """Main Lambda handler for invoice parsing""" # Handle warmup calls if event.get('source') == 'serverless-plugin-warmup': logger.info('WarmUp - Lambda is warm!') return {'statusCode': 200, 'body': 'Lambda is warm!'} try: # Parse event if 'Records' in event: # SQS event return handle_sqs_event(event, context) else: # Direct invocation return handle_direct_invocation(event, context) except Exception as e: logger.error(f"Lambda handler error: {e}", exc_info=True) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def handle_sqs_event(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """Handle SQS event processing""" results = [] for record in event['Records']: try: # Parse message message = json.loads(record['body']) job_id = message['job_id'] document_path = message['document_path'] # Process document result = asyncio.run(process_document(job_id, document_path)) results.append({ 'job_id': job_id, 'status': 'success', 'result': result }) except Exception as e: logger.error(f"Failed to process record: {e}", exc_info=True) results.append({ 'job_id': message.get('job_id', 'unknown'), 'status': 'failed', 'error': str(e) }) return { 'statusCode': 200, 'body': json.dumps({'processed': len(results), 'results': results}) } def handle_direct_invocation(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """Handle direct Lambda invocation""" job_id = event.get('job_id') document_path = event.get('document_path') if not job_id or not document_path: return { 'statusCode': 400, 'body': json.dumps({'error': 'Missing required parameters: job_id, document_path'}) } # Process document result = asyncio.run(process_document(job_id, document_path)) return { 'statusCode': 200, 'body': json.dumps(result.to_dict()) } async def process_document(job_id: str, document_path: str): """Process a single document""" logger.info(f"Processing document: {job_id}") # Update job status await update_job_status(job_id, ProcessingStatus.PROCESSING) try: # Download document from S3 local_path = f"/tmp/{job_id}.pdf" await storage_service.download_to_file(document_path, local_path) # Create document object document = PDFDocument( document_id=job_id, file_path=local_path ) # Detect vendor vendor = vendor_detector.detect_vendor(document) if vendor: logger.info(f"Vendor detected: {vendor.vendor_id}") # Select and execute parser parser = parser_strategy.select_parser(document, vendor) result = parser.parse(document) # Store result await store_parse_result(result) # Update job status await update_job_status(job_id, ProcessingStatus.COMPLETED) # Cleanup os.remove(local_path) return result except Exception as e: logger.error(f"Document processing failed: {e}", exc_info=True) await update_job_status(job_id, ProcessingStatus.FAILED, error=str(e)) raise async def update_job_status(job_id: str, status: ProcessingStatus, error: str = None): """Update job status in DynamoDB""" table = dynamodb.Table(os.environ.get('JOBS_TABLE', 'invoice-parser-jobs')) update_expr = "SET #status = :status, updated_at = :timestamp" expr_values = { ':status': status.value, ':timestamp': datetime.utcnow().isoformat() } if error: update_expr += ", error_message = :error" expr_values[':error'] = error table.update_item( Key={'job_id': job_id}, UpdateExpression=update_expr, ExpressionAttributeNames={'#status': 'status'}, ExpressionAttributeValues=expr_values ) async def store_parse_result(result: ParseResult): """Store parsing result in DynamoDB""" table = dynamodb.Table(os.environ.get('RESULTS_TABLE', 'invoice-parser-results')) table.put_item( Item=result.to_dict() ) ``` ## API Implementation (src/api/app.py) ```python from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import uvicorn from typing import Optional import uuid from .routes import router from .middleware import SecurityMiddleware, LoggingMiddleware from ..core.models import ProcessingStatus from ..services.queue_service import QueueService # Create FastAPI app app = FastAPI( title="PDF Invoice Parser API", description="Scalable PDF invoice parsing system", version="1.0.0" ) # Add middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware(LoggingMiddleware) app.add_middleware(SecurityMiddleware) # Include routers app.include_router(router, prefix="/api/v1") # Initialize services queue_service = QueueService() @app.post("/api/v1/invoices/upload") async def upload_invoice( background_tasks: BackgroundTasks, file: UploadFile = File(...), vendor_id: Optional[str] = None ): """Upload invoice for parsing""" # Validate file if not file.filename.lower().endswith('.pdf'): raise HTTPException(400, "Only PDF files are supported") if file.size > 50 * 1024 * 1024: # 50MB limit raise HTTPException(400, "File size exceeds 50MB limit") # Generate job ID job_id = str(uuid.uuid4()) # Upload to S3 s3_path = f"invoices/{job_id}/{file.filename}" try: content = await file.read() await storage_service.upload(s3_path, content) except Exception as e: raise HTTPException(500, f"Failed to upload file: {str(e)}") # Create job record job_data = { "job_id": job_id, "status": ProcessingStatus.PENDING.value, "vendor_id": vendor_id, "document_url": s3_path, "created_at": datetime.utcnow().isoformat(), "file_name": file.filename, "file_size": file.size } # Store job in database await create_job_record(job_data) # Queue for processing message = { "job_id": job_id, "document_path": s3_path, "vendor_id": vendor_id } await queue_service.send_message(message) return { "job_id": job_id, "status": ProcessingStatus.PENDING.value, "message": "Invoice uploaded successfully and queued for processing", "estimated_time": 30 # seconds } @app.get("/api/v1/invoices/{job_id}/status") async def get_job_status(job_id: str): """Get parsing job status""" # Fetch job from database job = await get_job_record(job_id) if not job: raise HTTPException(404, "Job not found") return { "job_id": job_id, "status": job["status"], "created_at": job["created_at"], "updated_at": job.get("updated_at"), "error_message": job.get("error_message") } @app.get("/api/v1/invoices/{job_id}/result") async def get_parse_result(job_id: str): """Get parsed invoice data""" # Check job status first job = await get_job_record(job_id) if not job: raise HTTPException(404, "Job not found") if job["status"] != ProcessingStatus.COMPLETED.value: raise HTTPException(400, f"Job status is {job['status']}, not completed") # Fetch result result = await get_parse_result_record(job_id) if not result: raise HTTPException(404, "Parse result not found") return result @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) ## Cache Service (src/services/cache_service.py) import redis import json import pickle from typing import Optional, Any from ..core.interfaces import CacheServiceInterface class CacheService(CacheServiceInterface): """Redis-based caching service""" def __init__(self): self.redis_client = self._init_redis() self.default_ttl = 3600 # 1 hour def _init_redis(self): """Initialize Redis connection""" return redis.Redis( host=os.environ.get('REDIS_HOST', 'localhost'), port=int(os.environ.get('REDIS_PORT', 6379)), decode_responses=False, # Handle binary data connection_pool_kwargs={ 'max_connections': 50, 'socket_keepalive': True, 'socket_keepalive_options': { 1: 1, # TCP_KEEPIDLE 2: 3, # TCP_KEEPINTVL 3: 5 # TCP_KEEPCNT } } ) async def get(self, key: str) -> Optional[Any]: """Get value from cache""" try: value = self.redis_client.get(key) if value: # Try to deserialize as JSON first try: return json.loads(value) except: # Fall back to pickle return pickle.loads(value) except Exception as e: self.logger.error(f"Cache get error: {e}") return None async def set(self, key: str, value: Any, ttl: int = None) -> None: """Set value in cache with TTL""" ttl = ttl or self.default_ttl try: # Try to serialize as JSON first (more portable) try: serialized = json.dumps(value) except: # Fall back to pickle for complex objects serialized = pickle.dumps(value) self.redis_client.setex(key, ttl, serialized) except Exception as e: self.logger.error(f"Cache set error: {e}") async def delete(self, key: str) -> None: """Delete value from cache""" try: self.redis_client.delete(key) except Exception as e: self.logger.error(f"Cache delete error: {e}") async def clear_pattern(self, pattern: str) -> int: """Clear all keys matching pattern""" try: keys = self.redis_client.keys(pattern) if keys: return self.redis_client.delete(*keys) return 0 except Exception as e: self.logger.error(f"Cache clear pattern error: {e}") return 0 ``` ## Test Implementation (tests/unit/test_parsers.py) ``` import pytest from decimal import Decimal from datetime import datetime import os from src.core.models import PDFDocument, Vendor, ParserType from src.parsers.text_parser import TextParser from src.parsers.ocr_parser import OCRParser from src.parsers.ai_parser import AIParser class TestTextParser: """Test suite for TextParser""" @pytest.fixture def text_parser(self): config = { "confidence_threshold": 0.7, "date_formats": ["%Y-%m-%d", "%d/%m/%Y"] } return TextParser(config) @pytest.fixture def sample_document(self): return PDFDocument( document_id="test-001", file_path="tests/fixtures/sample_invoice.pdf", page_count=1 ) def test_parse_invoice_number(self, text_parser): """Test invoice number extraction""" text = "Invoice #: INV-2024-001\nDate: 2024-01-15" result = text_parser._extract_invoice_number(text) assert result == "INV-2024-001" def test_parse_date(self, text_parser): """Test date extraction""" text = "Invoice Date: 2024-01-15\nDue Date: 2024-02-15" invoice_date = text_parser._extract_date(text, "invoice date") assert invoice_date == datetime(2024, 1, 15) due_date = text_parser._extract_date(text, "due date") assert due_date == datetime(2024, 2, 15) def test_parse_amount(self, text_parser): """Test amount extraction""" text = "Subtotal: $1,000.00\nTax: $100.00\nTotal: $1,100.00" total = text_parser._extract_amount(text, "total") assert total == Decimal("1100.00") subtotal = text_parser._extract_amount(text, "subtotal") assert subtotal == Decimal("1000.00") def test_parse_complete_invoice(self, text_parser, sample_document): """Test complete invoice parsing""" result = text_parser.parse(sample_document) assert result.job_id == "test-001" assert result.parser_used == "text" assert result.invoice_number is not None assert result.total_amount > 0 assert len(result.confidence_scores) > 0 class TestOCRParser: """Test suite for OCRParser""" @pytest.fixture def ocr_parser(self): config = { "ocr_engine": "tesseract", "language": "eng", "dpi": 300 } return OCRParser(config) def test_image_preprocessing(self, ocr_parser): """Test image preprocessing steps""" # Create test image import numpy as np from PIL import Image # Create a simple test image img_array = np.ones((100, 100), dtype=np.uint8) * 255 img_array[40:60, 40:60] = 0 # Add black square test_image = Image.fromarray(img_array) # Process image processed = ocr_parser._preprocess_image(test_image) assert processed is not None assert processed.shape == (100, 100) @pytest.mark.skipif(not os.path.exists("/usr/bin/tesseract"), reason="Tesseract not installed") def test_ocr_extraction(self, ocr_parser): """Test OCR text extraction""" # This test requires Tesseract to be installed # In CI/CD, you would have a proper test image pass class TestAIParser: """Test suite for AIParser""" @pytest.fixture def ai_parser(self, mocker): # Mock the LLM client mock_client = mocker.Mock() config = { "model": "gpt-4", "temperature": 0.1, "api_key": "test-key" } parser = AIParser(config) parser.llm_client = mock_client return parser, mock_client def test_prompt_generation(self, ai_parser): """Test extraction prompt generation""" parser, _ = ai_parser text = "Invoice from ABC Company" prompt = parser._generate_extraction_prompt(text) assert "Invoice Text:" in prompt assert text in prompt assert "vendor_name" in prompt assert "invoice_number" in prompt def test_parse_llm_response(self, ai_parser): """Test LLM response parsing""" parser, _ = ai_parser response = '''```json { "vendor_name": "ABC Company", "invoice_number": "INV-001", "total_amount": 1000.00 } ```''' data = parser._parse_llm_response(response) assert data["vendor_name"] == "ABC Company" assert data["invoice_number"] == "INV-001" assert data["total_amount"] == 1000.00 def test_build_parse_result(self, ai_parser): """Test building ParseResult from extracted data""" parser, _ = ai_parser data = { "vendor_name": "ABC Company", "invoice_number": "INV-001", "invoice_date": "2024-01-15", "total_amount": 1000.00, "line_items": [ { "description": "Service", "quantity": 1, "unit_price": 1000.00, "total_price": 1000.00 } ] } document = PDFDocument(document_id="test-001") result = parser._build_parse_result(data, document) assert result.vendor_id == "ABC Company" assert result.invoice_number == "INV-001" assert result.total_amount == Decimal("1000.00") assert len(result.line_items) == 1 assert result.line_items[0].description == "Service" ``` ## Integration Test (tests/integration/test_end_to_end.py) import pytest import asyncio from pathlib import Path from src.core.models import PDFDocument, Vendor, ParserType from src.services.vendor_detector import VendorDetector from src.services.parser_strategy import ParserStrategy from src.services.cache_service import CacheService class TestEndToEnd: """End-to-end integration tests""" @pytest.fixture async def services(self): """Initialize services for testing""" cache = CacheService() vendor_detector = VendorDetector(cache, vendor_repository=None) parser_strategy = ParserStrategy() return { 'cache': cache, 'vendor_detector': vendor_detector, 'parser_strategy': parser_strategy } @pytest.fixture def test_invoices(self): """Load test invoice files""" test_dir = Path("tests/fixtures/invoices") return list(test_dir.glob("*.pdf")) @pytest.mark.asyncio async def test_complete_parsing_flow(self, services, test_invoices): """Test complete parsing flow for multiple vendors""" results = [] for invoice_path in test_invoices: # Create document document = PDFDocument( document_id=f"test-{invoice_path.stem}", file_path=str(invoice_path) ) # Detect vendor vendor = services['vendor_detector'].detect_vendor(document) # Select parser parser = services['parser_strategy'].select_parser(document, vendor) # Parse document result = parser.parse(document) # Validate result assert result.job_id == document.document_id assert result.parser_used in ["text", "ocr", "ai"] if result.errors: pytest.fail(f"Parsing failed for {invoice_path.name}: {result.errors}") results.append({ 'file': invoice_path.name, 'vendor': vendor.vendor_id if vendor else "unknown", 'parser': result.parser_used, 'confidence': parser.get_confidence(), 'invoice_number': result.invoice_number, 'total': result.total_amount }) # Print summary print("\n=== Parsing Results ===") for r in results: print(f"{r['file']}: {r['vendor']} - {r['invoice_number']} - ${r['total']}") # Assert we processed all files assert len(results) == len(test_invoices) # Check success rate successful = sum(1 for r in results if r['invoice_number']) success_rate = successful / len(results) assert success_rate >= 0.8 # 80% success rate minimum ## Performance Test (tests/performance/test_load.py) import pytest import time import concurrent.futures from statistics import mean, stdev from src.parsers.text_parser import TextParser from src.core.models import PDFDocument class TestPerformance: """Performance and load tests""" @pytest.fixture def parser(self): return TextParser({}) def test_parsing_performance(self, parser, benchmark): """Test single document parsing performance""" document = PDFDocument( document_id="perf-test", file_path="tests/fixtures/sample_invoice.pdf" ) # Benchmark parsing result = benchmark(parser.parse, document) assert result is not None assert benchmark.stats['mean'] < 5.0 # Should parse in under 5 seconds def test_concurrent_parsing(self, parser): """Test concurrent parsing performance""" documents = [ PDFDocument( document_id=f"concurrent-{i}", file_path="tests/fixtures/sample_invoice.pdf" ) for i in range(10) ] start_time = time.time() # Parse documents concurrently with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(parser.parse, doc) for doc in documents] results = [f.result() for f in concurrent.futures.as_completed(futures)] total_time = time.time() - start_time # All documents should be parsed assert len(results) == 10 # Should complete in reasonable time assert total_time < 20.0 # 10 documents in under 20 seconds # Calculate statistics processing_times = [r.processing_time for r in results] avg_time = mean(processing_times) std_dev = stdev(processing_times) if len(processing_times) > 1 else 0 print(f"\nConcurrent Parsing Results:") print(f"Total Time: {total_time:.2f}s") print(f"Average Time: {avg_time:.2f}s") print(f"Std Dev: {std_dev:.2f}s") # Performance assertions assert avg_time < 2.0 # Average under 2 seconds per document assert std_dev < 1.0 # Consistent performance ## Docker Configuration (Dockerfile) # Multi-stage build for optimal size FROM python:3.11-slim as builder # Install system dependencies RUN apt-get update && apt-get install -y \ gcc \ g++ \ make \ tesseract-ocr \ tesseract-ocr-eng \ poppler-utils \ libpoppler-cpp-dev \ && rm -rf /var/lib/apt/lists/* # Create virtual environment RUN python -m venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" # Copy and install Python dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Runtime stage FROM python:3.11-slim # Install runtime dependencies only RUN apt-get update && apt-get install -y \ tesseract-ocr \ tesseract-ocr-eng \ poppler-utils \ && rm -rf /var/lib/apt/lists/* # Copy virtual environment from builder COPY --from=builder /opt/venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" # Create app directory WORKDIR /app # Copy application code COPY src/ ./src/ COPY config/ ./config/ # Create non-root user RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # For Lambda deployment RUN pip install awslambdaric # Set entrypoint for Lambda ENTRYPOINT ["python", "-m", "awslambdaric"] CMD ["src.handlers.parsing_handler.lambda_handler"] ## Requirements File (requirements.txt) # Core dependencies pdfplumber==0.9.0 PyPDF2==3.0.1 pdf2image==1.16.3 pytesseract==0.3.10 opencv-python-headless==4.8.1.78 Pillow==10.1.0 # AI/ML openai==1.3.0 anthropic==0.7.0 langchain==0.0.350 # Web framework fastapi==0.104.1 uvicorn[standard]==0.24.0 pydantic==2.5.0 # AWS boto3==1.29.0 aws-lambda-powertools==2.26.0 # Database and caching redis==5.0.1 motor==3.3.2 # Data processing pandas==2.1.3 numpy==1.25.2 python-dateutil==2.8.2 # Utilities structlog==23.2.0 python-multipart==0.0.6 fuzzywuzzy==0.18.0 python-Levenshtein==0.23.0 # Testing pytest==7.4.3 pytest-asyncio==0.21.1 pytest-mock==3.12.0 pytest-benchmark==4.0.0 pytest-cov==4.1.0 # Development black==23.11.0 flake8==6.1.0 mypy==1.7.0 pre-commit==3.5.0