invoice-parser/
├── src/
│ ├── __init__.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── interfaces.py
│ │ └── exceptions.py
│ ├── parsers/
│ │ ├── __init__.py
│ │ ├── base_parser.py
│ │ ├── text_parser.py
│ │ ├── ocr_parser.py
│ │ └── ai_parser.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── vendor_detector.py
│ │ ├── parser_strategy.py
│ │ ├── cache_service.py
│ │ └── storage_service.py
│ ├── handlers/
│ │ ├── __init__.py
│ │ ├── upload_handler.py
│ │ ├── parsing_handler.py
│ │ └── result_handler.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routes.py
│ │ └── middleware.py
│ └── utils/
│ ├── __init__.py
│ ├── logger.py
│ ├── metrics.py
│ └── validators.py
├── tests/
│ ├── unit/
│ ├── integration/
│ └── fixtures/
├── config/
│ ├── default.yaml
│ ├── development.yaml
│ └── production.yaml
├── infrastructure/
│ ├── terraform/
│ └── docker/
├── requirements.txt
├── Dockerfile
└── README.md
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from typing import Dict, List, Optional, Any
from enum import Enum
import uuid
class ParserType(Enum):
TEXT = "text"
OCR = "ocr"
AI = "ai"
HYBRID = "hybrid"
class ProcessingStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
RETRY = "retry"
@dataclass
class PDFDocument:
"""Represents a PDF document to be parsed"""
document_id: str = field(default_factory=lambda: str(uuid.uuid4()))
file_path: str = ""
file_size: int = 0
page_count: int = 0
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.utcnow)
content_hash: Optional[str] = None
def get_page(self, page_num: int) -> 'PDFPage':
"""Retrieve a specific page from the document"""
pass
def get_text(self, start_page: int = 0, end_page: Optional[int] = None) -> str:
"""Extract text from specified page range"""
pass
@dataclass
class PDFPage:
"""Represents a single page in a PDF document"""
page_number: int
width: float
height: float
text_content: str = ""
images: List['Image'] = field(default_factory=list)
tables: List['Table'] = field(default_factory=list)
def get_text_blocks(self) -> List['TextBlock']:
"""Extract text blocks with position information"""
pass
@dataclass
class LineItem:
"""Represents a line item in an invoice"""
description: str
quantity: float = 1.0
unit_price: Decimal = Decimal("0.00")
total_price: Decimal = Decimal("0.00")
tax_rate: float = 0.0
tax_amount: Decimal = Decimal("0.00")
metadata: Dict[str, Any] = field(default_factory=dict)
def calculate_total(self) -> Decimal:
"""Calculate total including tax"""
base_total = Decimal(str(self.quantity)) * self.unit_price
tax = base_total * Decimal(str(self.tax_rate))
return base_total + tax
@dataclass
class ParseResult:
"""Result of parsing an invoice"""
job_id: str
vendor_id: Optional[str] = None
invoice_number: Optional[str] = None
invoice_date: Optional[datetime] = None
due_date: Optional[datetime] = None
total_amount: Decimal = Decimal("0.00")
subtotal: Decimal = Decimal("0.00")
tax_amount: Decimal = Decimal("0.00")
currency: str = "USD"
line_items: List[LineItem] = field(default_factory=list)
extracted_fields: Dict[str, Any] = field(default_factory=dict)
confidence_scores: Dict[str, float] = field(default_factory=dict)
parser_used: str = ""
processing_time: float = 0.0
errors: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
return {
"job_id": self.job_id,
"vendor_id": self.vendor_id,
"invoice_number": self.invoice_number,
"invoice_date": self.invoice_date.isoformat() if self.invoice_date else None,
"due_date": self.due_date.isoformat() if self.due_date else None,
"total_amount": str(self.total_amount),
"subtotal": str(self.subtotal),
"tax_amount": str(self.tax_amount),
"currency": self.currency,
"line_items": [
{
"description": item.description,
"quantity": item.quantity,
"unit_price": str(item.unit_price),
"total_price": str(item.total_price),
"tax_rate": item.tax_rate,
"tax_amount": str(item.tax_amount)
}
for item in self.line_items
],
"extracted_fields": self.extracted_fields,
"confidence_scores": self.confidence_scores,
"parser_used": self.parser_used,
"processing_time": self.processing_time,
"errors": self.errors
}
@dataclass
class Vendor:
"""Vendor configuration"""
vendor_id: str
vendor_name: str
parser_strategy: ParserType = ParserType.TEXT
parser_config: Dict[str, Any] = field(default_factory=dict)
field_mappings: Dict[str, str] = field(default_factory=dict)
validation_rules: List[Dict[str, Any]] = field(default_factory=list)
confidence_threshold: float = 0.8
regex_patterns: Dict[str, str] = field(default_factory=dict)
is_active: bool = True
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from .models import PDFDocument, ParseResult, Vendor
class InvoiceParser(ABC):
"""Base interface for all invoice parsers"""
@abstractmethod
def parse(self, document: PDFDocument) -> ParseResult:
"""Parse invoice document and extract data"""
pass
@abstractmethod
def validate(self, result: ParseResult) -> bool:
"""Validate parsed results"""
pass
@abstractmethod
def get_confidence(self) -> float:
"""Get parser confidence score"""
pass
class VendorDetectorInterface(ABC):
"""Interface for vendor detection"""
@abstractmethod
def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]:
"""Detect vendor from document"""
pass
@abstractmethod
def update_vendor_patterns(self, vendor: Vendor, patterns: List[str]) -> None:
"""Update vendor detection patterns"""
pass
class CacheServiceInterface(ABC):
"""Interface for caching service"""
@abstractmethod
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
pass
@abstractmethod
async def set(self, key: str, value: Any, ttl: int = 3600) -> None:
"""Set value in cache with TTL"""
pass
@abstractmethod
async def delete(self, key: str) -> None:
"""Delete value from cache"""
pass
class StorageServiceInterface(ABC):
"""Interface for document storage"""
@abstractmethod
async def upload(self, file_path: str, content: bytes) -> str:
"""Upload document to storage"""
pass
@abstractmethod
async def download(self, file_path: str) -> bytes:
"""Download document from storage"""
pass
@abstractmethod
async def delete(self, file_path: str) -> None:
"""Delete document from storage"""
passfrom abc import abstractmethod
import time
from typing import Dict, Any, Optional
import structlog
from ..core.interfaces import InvoiceParser
from ..core.models import PDFDocument, ParseResult, ProcessingStatus
from ..core.exceptions import ParsingError, ValidationError
from ..utils.metrics import MetricsCollector
class BaseParser(InvoiceParser):
"""Base implementation for all parsers"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = structlog.get_logger()
self.metrics = MetricsCollector()
self._confidence = 0.0
def parse(self, document: PDFDocument) -> ParseResult:
"""Parse document with timing and error handling"""
start_time = time.time()
try:
# Log parsing start
self.logger.info(
"Starting document parsing",
parser_type=self.__class__.__name__,
document_id=document.document_id,
page_count=document.page_count
)
# Extract metadata
metadata = self.extract_metadata(document)
# Perform parsing
result = self._parse_implementation(document)
# Calculate processing time
result.processing_time = time.time() - start_time
# Validate result
if not self.validate(result):
raise ValidationError("Parsed result failed validation")
# Log success
self.logger.info(
"Document parsed successfully",
document_id=document.document_id,
processing_time=result.processing_time,
confidence=self._confidence
)
# Emit metrics
self.metrics.increment(
"documents_parsed",
tags={"parser": self.__class__.__name__, "status": "success"}
)
return result
except Exception as e:
# Log error
self.logger.error(
"Document parsing failed",
document_id=document.document_id,
error=str(e),
parser_type=self.__class__.__name__
)
# Emit error metric
self.metrics.increment(
"parsing_errors",
tags={"parser": self.__class__.__name__, "error_type": type(e).__name__}
)
# Create error result
return ParseResult(
job_id=document.document_id,
parser_used=self.__class__.__name__,
processing_time=time.time() - start_time,
errors=[str(e)]
)
@abstractmethod
def _parse_implementation(self, document: PDFDocument) -> ParseResult:
"""Actual parsing implementation to be overridden"""
pass
def extract_metadata(self, document: PDFDocument) -> Dict[str, Any]:
"""Extract document metadata"""
return {
"page_count": document.page_count,
"file_size": document.file_size,
"created_at": document.created_at.isoformat(),
"document_id": document.document_id
}
def validate(self, result: ParseResult) -> bool:
"""Validate parsed results"""
# Basic validation
if not result.invoice_number:
self.logger.warning("Missing invoice number")
return False
if result.total_amount <= 0:
self.logger.warning("Invalid total amount", amount=result.total_amount)
return False
# Check confidence threshold
avg_confidence = sum(result.confidence_scores.values()) / len(result.confidence_scores) if result.confidence_scores else 0
if avg_confidence < self.config.get("confidence_threshold", 0.5):
self.logger.warning("Low confidence score", confidence=avg_confidence)
return False
return True
def get_confidence(self) -> float:
"""Get parser confidence score"""
return self._confidenceimport re
from decimal import Decimal
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import pdfplumber
import pandas as pd
from .base_parser import BaseParser
from ..core.models import PDFDocument, ParseResult, LineItem
from ..core.exceptions import ParsingError
class TextParser(BaseParser):
"""Parser for digital PDFs with extractable text"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.extraction_library = config.get("extraction_library", "pdfplumber")
self.date_formats = config.get("date_formats", [
"%Y-%m-%d",
"%d/%m/%Y",
"%m/%d/%Y",
"%d.%m.%Y",
"%B %d, %Y"
])
def _parse_implementation(self, document: PDFDocument) -> ParseResult:
"""Extract data from digital PDF"""
# Extract text and tables
text, tables = self._extract_content(document.file_path)
# Create result
result = ParseResult(job_id=document.document_id, parser_used="text")
# Extract fields using regex patterns
result.invoice_number = self._extract_invoice_number(text)
result.invoice_date = self._extract_date(text, "invoice date")
result.due_date = self._extract_date(text, "due date")
# Extract amounts
result.total_amount = self._extract_amount(text, "total")
result.subtotal = self._extract_amount(text, "subtotal")
result.tax_amount = self._extract_amount(text, "tax")
# Extract line items from tables
if tables:
result.line_items = self._extract_line_items(tables)
# Calculate confidence scores
result.confidence_scores = self._calculate_field_confidence(result, text)
# Set overall confidence
self._confidence = sum(result.confidence_scores.values()) / len(result.confidence_scores)
return result
def _extract_content(self, file_path: str) -> Tuple[str, List[pd.DataFrame]]:
"""Extract text and tables from PDF"""
full_text = ""
all_tables = []
try:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
# Extract text
page_text = page.extract_text()
if page_text:
full_text += page_text + "\n"
# Extract tables
tables = page.extract_tables()
if tables:
for table in tables:
df = pd.DataFrame(table[1:], columns=table[0])
all_tables.append(df)
except Exception as e:
raise ParsingError(f"Failed to extract content: {str(e)}")
return full_text, all_tables
def _extract_invoice_number(self, text: str) -> Optional[str]:
"""Extract invoice number using regex patterns"""
patterns = [
r"Invoice\s*#?\s*:?\s*([A-Z0-9-]+)",
r"Invoice Number\s*:?\s*([A-Z0-9-]+)",
r"Bill\s*#?\s*:?\s*([A-Z0-9-]+)",
r"Reference\s*:?\s*([A-Z0-9-]+)"
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1)
return None
def _extract_date(self, text: str, date_type: str) -> Optional[datetime]:
"""Extract date from text"""
# Common date patterns
date_patterns = [
rf"{date_type}\s*:?\s*(\d{{1,2}}[-/\.]\d{{1,2}}[-/\.]\d{{2,4}})",
rf"{date_type}\s*:?\s*(\w+\s+\d{{1,2}},?\s+\d{{4}})",
rf"{date_type}\s*:?\s*(\d{{4}}[-/]\d{{2}}[-/]\d{{2}})"
]
for pattern in date_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
date_str = match.group(1)
# Try to parse with different formats
for fmt in self.date_formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
return None
def _extract_amount(self, text: str, amount_type: str) -> Decimal:
"""Extract monetary amount from text"""
# Currency symbols and patterns
currency_pattern = r"[$£€¥]?\s*([0-9,]+\.?\d{0,2})"
# Search patterns based on amount type
patterns = [
rf"{amount_type}\s*:?\s*{currency_pattern}",
rf"{amount_type}\s+amount\s*:?\s*{currency_pattern}",
rf"amount\s+{amount_type}\s*:?\s*{currency_pattern}"
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
# Clean and convert amount
amount_str = match.group(1).replace(",", "")
try:
return Decimal(amount_str)
except:
continue
return Decimal("0.00")
def _extract_line_items(self, tables: List[pd.DataFrame]) -> List[LineItem]:
"""Extract line items from tables"""
line_items = []
for df in tables:
# Try to identify line item table
if self._is_line_item_table(df):
for _, row in df.iterrows():
item = self._parse_line_item_row(row)
if item:
line_items.append(item)
return line_items
def _is_line_item_table(self, df: pd.DataFrame) -> bool:
"""Check if table contains line items"""
# Look for common column headers
headers = [col.lower() for col in df.columns if col]
required_headers = ["description", "quantity", "price", "amount", "total"]
matches = sum(1 for req in required_headers if any(req in header for header in headers))
return matches >= 2
def _parse_line_item_row(self, row: pd.Series) -> Optional[LineItem]:
"""Parse a single line item from table row"""
try:
# Find columns by partial match
description = self._find_column_value(row, ["description", "item", "product"])
quantity = self._find_column_value(row, ["qty", "quantity", "units"], numeric=True)
unit_price = self._find_column_value(row, ["price", "rate", "unit"], numeric=True)
total = self._find_column_value(row, ["total", "amount", "subtotal"], numeric=True)
if description and (unit_price or total):
return LineItem(
description=str(description),
quantity=float(quantity) if quantity else 1.0,
unit_price=Decimal(str(unit_price)) if unit_price else Decimal("0.00"),
total_price=Decimal(str(total)) if total else Decimal("0.00")
)
except Exception as e:
self.logger.debug(f"Failed to parse line item: {e}")
return None
def _find_column_value(self, row: pd.Series, keywords: List[str], numeric: bool = False):
"""Find value in row by column keywords"""
for col in row.index:
if any(keyword in str(col).lower() for keyword in keywords):
value = row[col]
if numeric and value:
# Clean numeric value
value = re.sub(r'[^\d.,]', '', str(value))
value = value.replace(',', '')
try:
return float(value)
except:
return None
return value
return None
def _calculate_field_confidence(self, result: ParseResult, text: str) -> Dict[str, float]:
"""Calculate confidence scores for extracted fields"""
scores = {}
# Invoice number confidence
if result.invoice_number:
scores["invoice_number"] = 0.9 if len(result.invoice_number) > 3 else 0.7
else:
scores["invoice_number"] = 0.0
# Date confidence
scores["invoice_date"] = 0.9 if result.invoice_date else 0.0
scores["due_date"] = 0.8 if result.due_date else 0.3 # Optional field
# Amount confidence based on consistency
if result.line_items:
calculated_total = sum(item.total_price for item in result.line_items)
difference = abs(calculated_total - result.subtotal) / result.subtotal if result.subtotal > 0 else 1
scores["amounts"] = 0.9 if difference < 0.02 else 0.7
else:
scores["amounts"] = 0.6 if result.total_amount > 0 else 0.0
return scoresimport cv2
import numpy as np
from PIL import Image
import pytesseract
import pdf2image
from typing import List, Tuple, Optional
from .base_parser import BaseParser
from ..core.models import PDFDocument, ParseResult
from ..core.exceptions import ParsingError
class OCRParser(BaseParser):
"""Parser for scanned PDFs using OCR"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.ocr_engine = config.get("ocr_engine", "tesseract")
self.language = config.get("language", "eng")
self.dpi = config.get("dpi", 300)
self.preprocessing_steps = config.get("preprocessing_steps", [
"denoise",
"deskew",
"contrast"
])
def _parse_implementation(self, document: PDFDocument) -> ParseResult:
"""Extract data from scanned PDF using OCR"""
# Convert PDF to images
images = self._pdf_to_images(document.file_path)
# Process each page
full_text = ""
for i, image in enumerate(images):
# Preprocess image
processed_image = self._preprocess_image(image)
# Perform OCR
page_text = self._perform_ocr(processed_image)
full_text += f"\n--- Page {i+1} ---\n{page_text}"
# Use text parser logic on OCR output
text_parser = TextParser(self.config)
result = text_parser._parse_implementation(
PDFDocument(
document_id=document.document_id,
file_path="", # Not used
page_count=len(images)
)
)
# Override with OCR-specific processing
result.parser_used = "ocr"
result = self._enhance_ocr_results(result, full_text, images)
return result
def _pdf_to_images(self, pdf_path: str) -> List[Image.Image]:
"""Convert PDF pages to images"""
try:
images = pdf2image.convert_from_path(
pdf_path,
dpi=self.dpi,
fmt='PNG',
thread_count=4
)
return images
except Exception as e:
raise ParsingError(f"Failed to convert PDF to images: {str(e)}")
def _preprocess_image(self, image: Image.Image) -> np.ndarray:
"""Preprocess image for better OCR results"""
# Convert PIL image to OpenCV format
img_array = np.array(image)
# Convert to grayscale
if len(img_array.shape) == 3:
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
else:
gray = img_array
# Apply preprocessing steps
processed = gray
if "denoise" in self.preprocessing_steps:
processed = cv2.fastNlMeansDenoising(processed)
if "deskew" in self.preprocessing_steps:
processed = self._deskew_image(processed)
if "contrast" in self.preprocessing_steps:
processed = self._enhance_contrast(processed)
if "threshold" in self.preprocessing_steps:
_, processed = cv2.threshold(
processed, 0, 255,
cv2.THRESH_BINARY + cv2.THRESH_OTSU
)
return processed
def _deskew_image(self, image: np.ndarray) -> np.ndarray:
"""Correct image skew"""
# Find all text regions
coords = np.column_stack(np.where(image > 0))
# Calculate rotation angle
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = 90 + angle
# Rotate image
(h, w) = image.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(
image, M, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE
)
return rotated
def _enhance_contrast(self, image: np.ndarray) -> np.ndarray:
"""Enhance image contrast"""
# Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(image)
return enhanced
def _perform_ocr(self, image: np.ndarray) -> str:
"""Perform OCR on preprocessed image"""
if self.ocr_engine == "tesseract":
# Configure Tesseract
custom_config = r'--oem 3 --psm 6'
# Perform OCR
text = pytesseract.image_to_string(
image,
lang=self.language,
config=custom_config
)
# Also get detailed data for confidence scoring
data = pytesseract.image_to_data(
image,
lang=self.language,
output_type=pytesseract.Output.DICT
)
# Calculate average confidence
confidences = [int(conf) for conf in data['conf'] if int(conf) > 0]
self._confidence = sum(confidences) / len(confidences) / 100 if confidences else 0.0
return text
else:
# Placeholder for other OCR engines (Google Vision, AWS Textract)
raise NotImplementedError(f"OCR engine {self.ocr_engine} not implemented")
def _enhance_ocr_results(self, result: ParseResult, full_text: str, images: List[Image.Image]) -> ParseResult:
"""Enhance OCR results with additional processing"""
# If confidence is low, try table detection
if self._confidence < 0.7 and not result.line_items:
for image in images:
tables = self._detect_tables(np.array(image))
if tables:
# Process detected tables
result.line_items.extend(self._process_table_regions(tables, image))
# Update confidence scores
result.confidence_scores["ocr_quality"] = self._confidence
return result
def _detect_tables(self, image: np.ndarray) -> List[Tuple[int, int, int, int]]:
"""Detect table regions in image"""
# Convert to grayscale if needed
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
# Detect horizontal and vertical lines
horizontal = self._detect_lines(gray, horizontal=True)
vertical = self._detect_lines(gray, horizontal=False)
# Combine lines to find table regions
table_mask = cv2.addWeighted(horizontal, 0.5, vertical, 0.5, 0.0)
# Find contours
contours, _ = cv2.findContours(
table_mask,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE
)
# Filter and return table regions
tables = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
if w > 100 and h > 50: # Minimum table size
tables.append((x, y, w, h))
return tables
def _detect_lines(self, image: np.ndarray, horizontal: bool = True) -> np.ndarray:
"""Detect horizontal or vertical lines in image"""
# Create structure element for morphology
if horizontal:
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1))
else:
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40))
# Apply morphology operations
binary = cv2.adaptiveThreshold(
image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, 11, 2
)
lines = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
return lines
def _process_table_regions(self, tables: List[Tuple[int, int, int, int]],
image: Image.Image) -> List[LineItem]:
"""Process detected table regions to extract line items"""
line_items = []
for x, y, w, h in tables:
# Crop table region
table_img = image.crop((x, y, x + w, y + h))
# Perform OCR on table region
table_text = pytesseract.image_to_string(table_img)
# Parse table text
# This is simplified - in production, use more sophisticated parsing
lines = table_text.strip().split('\n')
for line in lines:
if line and not any(header in line.lower() for header in ['description', 'quantity', 'price']):
# Basic parsing - extract numbers and text
parts = line.split()
if len(parts) >= 2:
description = ' '.join(parts[:-2])
try:
amount = Decimal(parts[-1].replace(',', ''))
if description and amount > 0:
line_items.append(LineItem(
description=description,
total_price=amount
))
except:
continue
return line_itemsimport json
import asyncio
from typing import Dict, Any, List, Optional
from decimal import Decimal
from datetime import datetime
from .base_parser import BaseParser
from ..core.models import PDFDocument, ParseResult, LineItem
from ..core.exceptions import ParsingError
class AIParser(BaseParser):
"""AI-powered parser using LLMs"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.model = config.get("model", "gpt-4")
self.temperature = config.get("temperature", 0.1)
self.max_tokens = config.get("max_tokens", 4000)
self.retry_attempts = config.get("retry_attempts", 3)
self.llm_client = self._initialize_client()
def _initialize_client(self):
"""Initialize LLM client based on model"""
if "gpt" in self.model:
from openai import OpenAI
return OpenAI(api_key=self.config.get("api_key"))
elif "claude" in self.model:
from anthropic import Anthropic
return Anthropic(api_key=self.config.get("api_key"))
else:
raise ValueError(f"Unsupported model: {self.model}")
def _parse_implementation(self, document: PDFDocument) -> ParseResult:
"""Extract data using AI/LLM"""
# Extract text for context
text = self._get_document_text(document)
# Generate extraction prompt
prompt = self._generate_extraction_prompt(text)
# Call LLM with retry logic
for attempt in range(self.retry_attempts):
try:
response = self._call_llm(prompt)
extracted_data = self._parse_llm_response(response)
# Build result
result = self._build_parse_result(extracted_data, document)
# Validate extraction
if self.validate(result):
return result
except Exception as e:
self.logger.warning(f"AI parsing attempt {attempt + 1} failed: {e}")
if attempt == self.retry_attempts - 1:
raise ParsingError(f"AI parsing failed after {self.retry_attempts} attempts")
raise ParsingError("Failed to extract valid data")
def _get_document_text(self, document: PDFDocument) -> str:
"""Extract text from document for AI processing"""
try:
# Try text extraction first
text = document.get_text()
if text and len(text) > 100:
return text[:self.max_tokens] # Limit for token constraints
except:
pass
# Fallback to OCR if needed
from .ocr_parser import OCRParser
ocr_parser = OCRParser(self.config)
images = ocr_parser._pdf_to_images(document.file_path)
text = ""
for image in images[:3]: # Process first 3 pages only
processed = ocr_parser._preprocess_image(image)
page_text = ocr_parser._perform_ocr(processed)
text += page_text + "\n"
if len(text) > self.max_tokens:
break
return text[:self.max_tokens]
def _generate_extraction_prompt(self, text: str) -> str:
"""Generate prompt for invoice data extraction"""
prompt = f"""Extract invoice information from the following text and return as JSON.
Invoice Text:
{text}
Extract the following information:
1. vendor_name: Name of the company issuing the invoice
2. vendor_id: Any vendor ID or registration number
3. invoice_number: Invoice number or reference
4. invoice_date: Date of invoice (ISO format: YYYY-MM-DD)
5. due_date: Payment due date (ISO format: YYYY-MM-DD)
6. currency: Currency code (e.g., USD, EUR, GBP)
7. subtotal: Subtotal amount before tax
8. tax_amount: Tax amount
9. total_amount: Total amount including tax
10. line_items: Array of items with:
- description: Item description
- quantity: Quantity (default 1 if not specified)
- unit_price: Price per unit
- total_price: Total price for this line
- tax_rate: Tax rate if specified
Return ONLY valid JSON without any explanation or markdown formatting.
Example response:
{{
"vendor_name": "ABC Company Ltd",
"invoice_number": "INV-2024-001",
"invoice_date": "2024-01-15",
"total_amount": 1250.00,
"line_items": [
{{
"description": "Professional Services",
"quantity": 1,
"unit_price": 1000.00,
"total_price": 1000.00
}}
]
}}
"""
return prompt
def _call_llm(self, prompt: str) -> str:
"""Call LLM API"""
if "gpt" in self.model:
response = self.llm_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an invoice data extraction assistant. Extract data accurately and return only JSON."},
{"role": "user", "content": prompt}
],
temperature=self.temperature,
response_format={"type": "json_object"}
)
return response.choices[0].message.content
elif "claude" in self.model:
response = self.llm_client.messages.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=self.temperature,
max_tokens=self.max_tokens
)
return response.content[0].text
else:
raise NotImplementedError(f"Model {self.model} not implemented")
def _parse_llm_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM response to extract JSON data"""
try:
# Clean response - remove markdown if present
cleaned = response.strip()
if cleaned.startswith("```json"):
cleaned = cleaned[7:]
if cleaned.startswith("```"):
cleaned = cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
# Parse JSON
data = json.loads(cleaned.strip())
return data
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse LLM response as JSON: {e}")
self.logger.debug(f"Response: {response}")
raise ParsingError("Invalid JSON response from LLM")
def _build_parse_result(self, data: Dict[str, Any], document: PDFDocument) -> ParseResult:
"""Build ParseResult from extracted data"""
result = ParseResult(
job_id=document.document_id,
parser_used="ai"
)
# Map extracted fields
result.vendor_id = data.get("vendor_id") or data.get("vendor_name")
result.invoice_number = data.get("invoice_number")
result.currency = data.get("currency", "USD")
# Parse dates
if data.get("invoice_date"):
result.invoice_date = self._parse_date(data["invoice_date"])
if data.get("due_date"):
result.due_date = self._parse_date(data["due_date"])
# Parse amounts
result.total_amount = Decimal(str(data.get("total_amount", 0)))
result.subtotal = Decimal(str(data.get("subtotal", 0)))
result.tax_amount = Decimal(str(data.get("tax_amount", 0)))
# Parse line items
result.line_items = self._parse_line_items(data.get("line_items", []))
# Store all extracted fields
result.extracted_fields = data
# Calculate confidence based on field completeness
self._calculate_ai_confidence(result, data)
return result
def _parse_date(self, date_str: str) -> Optional[datetime]:
"""Parse date string to datetime"""
if not date_str:
return None
# Try common formats
formats = ["%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%Y/%m/%d"]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
# Try parsing with dateutil as fallback
try:
from dateutil import parser
return parser.parse(date_str)
except:
self.logger.warning(f"Failed to parse date: {date_str}")
return None
def _parse_line_items(self, items_data: List[Dict[str, Any]]) -> List[LineItem]:
"""Parse line items from extracted data"""
line_items = []
for item in items_data:
try:
line_item = LineItem(
description=item.get("description", ""),
quantity=float(item.get("quantity", 1)),
unit_price=Decimal(str(item.get("unit_price", 0))),
total_price=Decimal(str(item.get("total_price", 0))),
tax_rate=float(item.get("tax_rate", 0))
)
# Calculate total if not provided
if line_item.total_price == 0 and line_item.unit_price > 0:
line_item.total_price = line_item.calculate_total()
line_items.append(line_item)
except Exception as e:
self.logger.warning(f"Failed to parse line item: {e}")
continue
return line_items
def _calculate_ai_confidence(self, result: ParseResult, data: Dict[str, Any]):
"""Calculate confidence scores for AI extraction"""
required_fields = ["invoice_number", "invoice_date", "total_amount"]
optional_fields = ["vendor_id", "due_date", "line_items", "currency"]
# Check required fields
required_score = sum(1 for field in required_fields if data.get(field)) / len(required_fields)
# Check optional fields
optional_score = sum(1 for field in optional_fields if data.get(field)) / len(optional_fields)
# Overall confidence
self._confidence = (required_score * 0.7) + (optional_score * 0.3)
# Field-specific confidence
result.confidence_scores = {
"invoice_number": 0.9 if result.invoice_number else 0.0,
"dates": 0.9 if result.invoice_date else 0.0,
"amounts": 0.9 if result.total_amount > 0 else 0.0,
"line_items": 0.8 if result.line_items else 0.3,
"overall": self._confidence
}import re
import hashlib
from typing import Optional, List, Dict, Any
import pickle
from dataclasses import asdict
from ..core.interfaces import VendorDetectorInterface, CacheServiceInterface
from ..core.models import PDFDocument, Vendor, ParserType
from ..core.exceptions import VendorDetectionError
class VendorDetector(VendorDetectorInterface):
"""Service for detecting vendors from PDF documents"""
def __init__(self, cache_service: CacheServiceInterface,
vendor_repository, ai_classifier=None):
self.cache = cache_service
self.vendor_repo = vendor_repository
self.ai_classifier = ai_classifier
self._vendor_patterns = {}
self._load_vendor_patterns()
def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]:
"""Detect vendor from document"""
# Generate document hash for caching
doc_hash = self._generate_document_hash(document)
# Check cache
cached_vendor = asyncio.run(self._check_cache(doc_hash))
if cached_vendor:
return cached_vendor
# Extract identifying text (first 2000 chars)
try:
text = document.get_text(end_page=2)[:2000]
except:
# If text extraction fails, return None for AI parsing
return None
# Try pattern matching
vendor = self._match_vendor_patterns(text)
if vendor:
asyncio.run(self._update_cache(doc_hash, vendor))
return vendor
# Try fuzzy matching on vendor names
vendor = self._fuzzy_match_vendor(text)
if vendor:
asyncio.run(self._update_cache(doc_hash, vendor))
return vendor
# Use AI classifier if available
if self.ai_classifier:
vendor = self._classify_with_ai(text)
if vendor:
asyncio.run(self._update_cache(doc_hash, vendor))
return vendor
# Return None if no vendor detected
return None
def update_vendor_patterns(self, vendor: Vendor, patterns: List[str]) -> None:
"""Update vendor detection patterns"""
vendor.regex_patterns["detection"] = patterns
self.vendor_repo.update(vendor)
self._load_vendor_patterns() # Reload patterns
def _load_vendor_patterns(self):
"""Load all vendor patterns from repository"""
vendors = self.vendor_repo.get_all_active()
self._vendor_patterns = {}
for vendor in vendors:
if vendor.regex_patterns.get("detection"):
self._vendor_patterns[vendor.vendor_id] = {
"patterns": vendor.regex_patterns["detection"],
"vendor": vendor
}
def _generate_document_hash(self, document: PDFDocument) -> str:
"""Generate hash for document caching"""
# Use file size and first page content for hash
try:
content = f"{document.file_size}_{document.get_text(end_page=1)[:500]}"
return hashlib.sha256(content.encode()).hexdigest()
except:
return hashlib.sha256(f"{document.document_id}".encode()).hexdigest()
async def _check_cache(self, doc_hash: str) -> Optional[Vendor]:
"""Check cache for vendor detection result"""
cached = await self.cache.get(f"vendor_detection:{doc_hash}")
if cached:
return Vendor(**cached)
return None
async def _update_cache(self, doc_hash: str, vendor: Vendor):
"""Update cache with vendor detection result"""
await self.cache.set(
f"vendor_detection:{doc_hash}",
asdict(vendor),
ttl=3600 # 1 hour
)
def _match_vendor_patterns(self, text: str) -> Optional[Vendor]:
"""Match text against vendor patterns"""
for vendor_id, pattern_data in self._vendor_patterns.items():
for pattern in pattern_data["patterns"]:
try:
if re.search(pattern, text, re.IGNORECASE | re.MULTILINE):
self.logger.info(f"Vendor detected by pattern: {vendor_id}")
return pattern_data["vendor"]
except re.error:
self.logger.warning(f"Invalid regex pattern for vendor {vendor_id}: {pattern}")
return None
def _fuzzy_match_vendor(self, text: str) -> Optional[Vendor]:
"""Fuzzy match vendor names in text"""
from fuzzywuzzy import fuzz
vendors = self.vendor_repo.get_all_active()
best_match = None
best_score = 0
# Extract potential company names from text
company_patterns = [
r"^([A-Z][A-Za-z\s&,.\-]+(?:Inc|LLC|Ltd|Limited|Corp|Corporation|Company|Co)\.?)",
r"From:\s*([A-Za-z\s&,.\-]+)",
r"Bill To:.*?From:\s*([A-Za-z\s&,.\-]+)",
]
potential_names = []
for pattern in company_patterns:
matches = re.findall(pattern, text, re.MULTILINE)
potential_names.extend(matches)
# Match against known vendors
for vendor in vendors:
for name in potential_names:
score = fuzz.ratio(vendor.vendor_name.lower(), name.lower())
if score > best_score and score > 80: # 80% threshold
best_score = score
best_match = vendor
if best_match:
self.logger.info(f"Vendor detected by fuzzy match: {best_match.vendor_id} (score: {best_score})")
return best_match
def _classify_with_ai(self, text: str) -> Optional[Vendor]:
"""Use AI to classify vendor"""
if not self.ai_classifier:
return None
try:
# Get vendor classification from AI
result = self.ai_classifier.classify(text)
if result.get("vendor_id"):
vendor = self.vendor_repo.get(result["vendor_id"])
if vendor:
self.logger.info(f"Vendor detected by AI: {vendor.vendor_id}")
return vendor
# If AI suggests a new vendor
if result.get("vendor_name") and result.get("confidence", 0) > 0.8:
# Create new vendor entry
new_vendor = Vendor(
vendor_id=self._generate_vendor_id(result["vendor_name"]),
vendor_name=result["vendor_name"],
parser_strategy=ParserType.AI, # Default to AI for new vendors
confidence_threshold=0.7
)
# Save new vendor
self.vendor_repo.create(new_vendor)
self.logger.info(f"New vendor created by AI: {new_vendor.vendor_id}")
return new_vendor
except Exception as e:
self.logger.error(f"AI classification failed: {e}")
return None
def _generate_vendor_id(self, vendor_name: str) -> str:
"""Generate vendor ID from name"""
# Clean and format vendor name
clean_name = re.sub(r'[^a-zA-Z0-9\s]', '', vendor_name)
words = clean_name.upper().split()
# Generate ID (e.g., "ABC_COMPANY_LTD")
return '_'.join(words)from typing import Dict, Optional
import time
from ..core.models import PDFDocument, Vendor, ParserType
from ..core.interfaces import InvoiceParser
from ..parsers.text_parser import TextParser
from ..parsers.ocr_parser import OCRParser
from ..parsers.ai_parser import AIParser
class ParserStrategy:
"""Strategy pattern implementation for parser selection"""
def __init__(self, parser_registry=None):
self.parsers: Dict[str, InvoiceParser] = {}
self.parser_registry = parser_registry
self.quality_threshold = {
"text": 0.8,
"ocr": 0.6,
"ai": 0.5
}
def select_parser(self, document: PDFDocument, vendor: Optional[Vendor]) -> InvoiceParser:
"""Select appropriate parser based on document and vendor"""
# If vendor specified with preference, use it
if vendor and vendor.parser_strategy != ParserType.AI:
return self._get_parser_for_vendor(vendor)
# Assess document quality
quality = self._assess_document_quality(document)
# Select parser based on quality
if quality.is_digital and quality.text_quality > self.quality_threshold["text"]:
parser_type = ParserType.TEXT
elif quality.is_scanned and quality.scan_quality > self.quality_threshold["ocr"]:
parser_type = ParserType.OCR
else:
parser_type = ParserType.AI
# Override with vendor preference if specified
if vendor and vendor.parser_strategy:
parser_type = vendor.parser_strategy
# Get or create parser
return self._get_or_create_parser(parser_type, vendor)
def register_parser(self, parser_key: str, parser: InvoiceParser):
"""Register a parser instance"""
self.parsers[parser_key] = parser
def _get_parser_for_vendor(self, vendor: Vendor) -> InvoiceParser:
"""Get parser configured for specific vendor"""
parser_key = f"{vendor.vendor_id}_{vendor.parser_strategy.value}"
if parser_key not in self.parsers:
# Create vendor-specific parser
config = vendor.parser_config.copy()
config["vendor"] = vendor
parser = self._create_parser(vendor.parser_strategy, config)
self.parsers[parser_key] = parser
return self.parsers[parser_key]
def _get_or_create_parser(self, parser_type: ParserType,
vendor: Optional[Vendor]) -> InvoiceParser:
"""Get or create parser instance"""
# Generate cache key
parser_key = f"{vendor.vendor_id if vendor else 'default'}_{parser_type.value}"
if parser_key not in self.parsers:
# Prepare configuration
config = {}
if vendor:
config.update(vendor.parser_config)
config["vendor"] = vendor
# Create parser
parser = self._create_parser(parser_type, config)
self.parsers[parser_key] = parser
return self.parsers[parser_key]
def _create_parser(self, parser_type: ParserType, config: Dict) -> InvoiceParser:
"""Create parser instance"""
if parser_type == ParserType.TEXT:
return TextParser(config)
elif parser_type == ParserType.OCR:
return OCRParser(config)
elif parser_type == ParserType.AI:
return AIParser(config)
else:
raise ValueError(f"Unknown parser type: {parser_type}")
def _assess_document_quality(self, document: PDFDocument) -> 'DocumentQuality':
"""Assess document quality for parser selection"""
quality = DocumentQuality()
try:
# Try to extract text
sample_text = document.get_text(end_page=1)
if sample_text:
# Analyze text quality
quality.is_digital = True
quality.text_quality = self._calculate_text_quality(sample_text)
else:
quality.is_digital = False
quality.is_scanned = True
except Exception as e:
self.logger.debug(f"Failed to extract text: {e}")
quality.is_scanned = True
# Check if document has images (likely scanned)
if document.metadata.get("has_images", False):
quality.is_scanned = True
quality.scan_quality = self._estimate_scan_quality(document)
return quality
def _calculate_text_quality(self, text: str) -> float:
"""Calculate text extraction quality score"""
if not text:
return 0.0
# Check for common indicators of good text extraction
scores = []
# Has reasonable length
scores.append(min(len(text) / 1000, 1.0)) # Normalize to 1000 chars
# Has alphanumeric content
alnum_ratio = sum(c.isalnum() for c in text) / len(text)
scores.append(alnum_ratio)
# Has proper spacing
space_ratio = text.count(' ') / len(text)
scores.append(min(space_ratio * 10, 1.0)) # Expect ~10% spaces
# Has newlines (structure)
newline_ratio = text.count('\n') / len(text)
scores.append(min(newline_ratio * 100, 1.0)) # Expect ~1% newlines
# Check for garbled text indicators
garbled_indicators = ['�', '\x00', '\xff']
garbled_score = 1.0 - sum(text.count(ind) for ind in garbled_indicators) / len(text)
scores.append(garbled_score)
return sum(scores) / len(scores)
def _estimate_scan_quality(self, document: PDFDocument) -> float:
"""Estimate scan quality from document metadata"""
# This is a simplified estimation
# In production, analyze image resolution, contrast, etc.
quality_score = 0.7 # Default medium quality
# Check resolution if available
if document.metadata.get("dpi"):
dpi = document.metadata["dpi"]
if dpi >= 300:
quality_score = 0.9
elif dpi >= 200:
quality_score = 0.7
else:
quality_score = 0.5
return quality_score
class DocumentQuality:
"""Document quality assessment results"""
def __init__(self):
self.is_digital = False
self.is_scanned = False
self.text_quality = 0.0
self.scan_quality = 0.0
self.has_tables = False
self.has_forms = False
import json
import os
import boto3
from typing import Dict, Any
import asyncio
from ..core.models import PDFDocument, ProcessingStatus
from ..services.vendor_detector import VendorDetector
from ..services.parser_strategy import ParserStrategy
from ..services.cache_service import CacheService
from ..services.storage_service import S3StorageService
from ..utils.logger import get_logger
# Initialize services outside handler for reuse
logger = get_logger()
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
# Initialize services
cache_service = CacheService()
storage_service = S3StorageService(s3_client)
vendor_detector = VendorDetector(cache_service, vendor_repository=None)
parser_strategy = ParserStrategy()
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""Main Lambda handler for invoice parsing"""
# Handle warmup calls
if event.get('source') == 'serverless-plugin-warmup':
logger.info('WarmUp - Lambda is warm!')
return {'statusCode': 200, 'body': 'Lambda is warm!'}
try:
# Parse event
if 'Records' in event:
# SQS event
return handle_sqs_event(event, context)
else:
# Direct invocation
return handle_direct_invocation(event, context)
except Exception as e:
logger.error(f"Lambda handler error: {e}", exc_info=True)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def handle_sqs_event(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""Handle SQS event processing"""
results = []
for record in event['Records']:
try:
# Parse message
message = json.loads(record['body'])
job_id = message['job_id']
document_path = message['document_path']
# Process document
result = asyncio.run(process_document(job_id, document_path))
results.append({
'job_id': job_id,
'status': 'success',
'result': result
})
except Exception as e:
logger.error(f"Failed to process record: {e}", exc_info=True)
results.append({
'job_id': message.get('job_id', 'unknown'),
'status': 'failed',
'error': str(e)
})
return {
'statusCode': 200,
'body': json.dumps({'processed': len(results), 'results': results})
}
def handle_direct_invocation(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""Handle direct Lambda invocation"""
job_id = event.get('job_id')
document_path = event.get('document_path')
if not job_id or not document_path:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Missing required parameters: job_id, document_path'})
}
# Process document
result = asyncio.run(process_document(job_id, document_path))
return {
'statusCode': 200,
'body': json.dumps(result.to_dict())
}
async def process_document(job_id: str, document_path: str):
"""Process a single document"""
logger.info(f"Processing document: {job_id}")
# Update job status
await update_job_status(job_id, ProcessingStatus.PROCESSING)
try:
# Download document from S3
local_path = f"/tmp/{job_id}.pdf"
await storage_service.download_to_file(document_path, local_path)
# Create document object
document = PDFDocument(
document_id=job_id,
file_path=local_path
)
# Detect vendor
vendor = vendor_detector.detect_vendor(document)
if vendor:
logger.info(f"Vendor detected: {vendor.vendor_id}")
# Select and execute parser
parser = parser_strategy.select_parser(document, vendor)
result = parser.parse(document)
# Store result
await store_parse_result(result)
# Update job status
await update_job_status(job_id, ProcessingStatus.COMPLETED)
# Cleanup
os.remove(local_path)
return result
except Exception as e:
logger.error(f"Document processing failed: {e}", exc_info=True)
await update_job_status(job_id, ProcessingStatus.FAILED, error=str(e))
raise
async def update_job_status(job_id: str, status: ProcessingStatus, error: str = None):
"""Update job status in DynamoDB"""
table = dynamodb.Table(os.environ.get('JOBS_TABLE', 'invoice-parser-jobs'))
update_expr = "SET #status = :status, updated_at = :timestamp"
expr_values = {
':status': status.value,
':timestamp': datetime.utcnow().isoformat()
}
if error:
update_expr += ", error_message = :error"
expr_values[':error'] = error
table.update_item(
Key={'job_id': job_id},
UpdateExpression=update_expr,
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues=expr_values
)
async def store_parse_result(result: ParseResult):
"""Store parsing result in DynamoDB"""
table = dynamodb.Table(os.environ.get('RESULTS_TABLE', 'invoice-parser-results'))
table.put_item(
Item=result.to_dict()
)from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from typing import Optional
import uuid
from .routes import router
from .middleware import SecurityMiddleware, LoggingMiddleware
from ..core.models import ProcessingStatus
from ..services.queue_service import QueueService
# Create FastAPI app
app = FastAPI(
title="PDF Invoice Parser API",
description="Scalable PDF invoice parsing system",
version="1.0.0"
)
# Add middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(LoggingMiddleware)
app.add_middleware(SecurityMiddleware)
# Include routers
app.include_router(router, prefix="/api/v1")
# Initialize services
queue_service = QueueService()
@app.post("/api/v1/invoices/upload")
async def upload_invoice(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
vendor_id: Optional[str] = None
):
"""Upload invoice for parsing"""
# Validate file
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(400, "Only PDF files are supported")
if file.size > 50 * 1024 * 1024: # 50MB limit
raise HTTPException(400, "File size exceeds 50MB limit")
# Generate job ID
job_id = str(uuid.uuid4())
# Upload to S3
s3_path = f"invoices/{job_id}/{file.filename}"
try:
content = await file.read()
await storage_service.upload(s3_path, content)
except Exception as e:
raise HTTPException(500, f"Failed to upload file: {str(e)}")
# Create job record
job_data = {
"job_id": job_id,
"status": ProcessingStatus.PENDING.value,
"vendor_id": vendor_id,
"document_url": s3_path,
"created_at": datetime.utcnow().isoformat(),
"file_name": file.filename,
"file_size": file.size
}
# Store job in database
await create_job_record(job_data)
# Queue for processing
message = {
"job_id": job_id,
"document_path": s3_path,
"vendor_id": vendor_id
}
await queue_service.send_message(message)
return {
"job_id": job_id,
"status": ProcessingStatus.PENDING.value,
"message": "Invoice uploaded successfully and queued for processing",
"estimated_time": 30 # seconds
}
@app.get("/api/v1/invoices/{job_id}/status")
async def get_job_status(job_id: str):
"""Get parsing job status"""
# Fetch job from database
job = await get_job_record(job_id)
if not job:
raise HTTPException(404, "Job not found")
return {
"job_id": job_id,
"status": job["status"],
"created_at": job["created_at"],
"updated_at": job.get("updated_at"),
"error_message": job.get("error_message")
}
@app.get("/api/v1/invoices/{job_id}/result")
async def get_parse_result(job_id: str):
"""Get parsed invoice data"""
# Check job status first
job = await get_job_record(job_id)
if not job:
raise HTTPException(404, "Job not found")
if job["status"] != ProcessingStatus.COMPLETED.value:
raise HTTPException(400, f"Job status is {job['status']}, not completed")
# Fetch result
result = await get_parse_result_record(job_id)
if not result:
raise HTTPException(404, "Parse result not found")
return result
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
## Cache Service (src/services/cache_service.py)
import redis
import json
import pickle
from typing import Optional, Any
from ..core.interfaces import CacheServiceInterface
class CacheService(CacheServiceInterface):
"""Redis-based caching service"""
def __init__(self):
self.redis_client = self._init_redis()
self.default_ttl = 3600 # 1 hour
def _init_redis(self):
"""Initialize Redis connection"""
return redis.Redis(
host=os.environ.get('REDIS_HOST', 'localhost'),
port=int(os.environ.get('REDIS_PORT', 6379)),
decode_responses=False, # Handle binary data
connection_pool_kwargs={
'max_connections': 50,
'socket_keepalive': True,
'socket_keepalive_options': {
1: 1, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 5 # TCP_KEEPCNT
}
}
)
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
value = self.redis_client.get(key)
if value:
# Try to deserialize as JSON first
try:
return json.loads(value)
except:
# Fall back to pickle
return pickle.loads(value)
except Exception as e:
self.logger.error(f"Cache get error: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = None) -> None:
"""Set value in cache with TTL"""
ttl = ttl or self.default_ttl
try:
# Try to serialize as JSON first (more portable)
try:
serialized = json.dumps(value)
except:
# Fall back to pickle for complex objects
serialized = pickle.dumps(value)
self.redis_client.setex(key, ttl, serialized)
except Exception as e:
self.logger.error(f"Cache set error: {e}")
async def delete(self, key: str) -> None:
"""Delete value from cache"""
try:
self.redis_client.delete(key)
except Exception as e:
self.logger.error(f"Cache delete error: {e}")
async def clear_pattern(self, pattern: str) -> int:
"""Clear all keys matching pattern"""
try:
keys = self.redis_client.keys(pattern)
if keys:
return self.redis_client.delete(*keys)
return 0
except Exception as e:
self.logger.error(f"Cache clear pattern error: {e}")
return 0import pytest
from decimal import Decimal
from datetime import datetime
import os
from src.core.models import PDFDocument, Vendor, ParserType
from src.parsers.text_parser import TextParser
from src.parsers.ocr_parser import OCRParser
from src.parsers.ai_parser import AIParser
class TestTextParser:
"""Test suite for TextParser"""
@pytest.fixture
def text_parser(self):
config = {
"confidence_threshold": 0.7,
"date_formats": ["%Y-%m-%d", "%d/%m/%Y"]
}
return TextParser(config)
@pytest.fixture
def sample_document(self):
return PDFDocument(
document_id="test-001",
file_path="tests/fixtures/sample_invoice.pdf",
page_count=1
)
def test_parse_invoice_number(self, text_parser):
"""Test invoice number extraction"""
text = "Invoice #: INV-2024-001\nDate: 2024-01-15"
result = text_parser._extract_invoice_number(text)
assert result == "INV-2024-001"
def test_parse_date(self, text_parser):
"""Test date extraction"""
text = "Invoice Date: 2024-01-15\nDue Date: 2024-02-15"
invoice_date = text_parser._extract_date(text, "invoice date")
assert invoice_date == datetime(2024, 1, 15)
due_date = text_parser._extract_date(text, "due date")
assert due_date == datetime(2024, 2, 15)
def test_parse_amount(self, text_parser):
"""Test amount extraction"""
text = "Subtotal: $1,000.00\nTax: $100.00\nTotal: $1,100.00"
total = text_parser._extract_amount(text, "total")
assert total == Decimal("1100.00")
subtotal = text_parser._extract_amount(text, "subtotal")
assert subtotal == Decimal("1000.00")
def test_parse_complete_invoice(self, text_parser, sample_document):
"""Test complete invoice parsing"""
result = text_parser.parse(sample_document)
assert result.job_id == "test-001"
assert result.parser_used == "text"
assert result.invoice_number is not None
assert result.total_amount > 0
assert len(result.confidence_scores) > 0
class TestOCRParser:
"""Test suite for OCRParser"""
@pytest.fixture
def ocr_parser(self):
config = {
"ocr_engine": "tesseract",
"language": "eng",
"dpi": 300
}
return OCRParser(config)
def test_image_preprocessing(self, ocr_parser):
"""Test image preprocessing steps"""
# Create test image
import numpy as np
from PIL import Image
# Create a simple test image
img_array = np.ones((100, 100), dtype=np.uint8) * 255
img_array[40:60, 40:60] = 0 # Add black square
test_image = Image.fromarray(img_array)
# Process image
processed = ocr_parser._preprocess_image(test_image)
assert processed is not None
assert processed.shape == (100, 100)
@pytest.mark.skipif(not os.path.exists("/usr/bin/tesseract"),
reason="Tesseract not installed")
def test_ocr_extraction(self, ocr_parser):
"""Test OCR text extraction"""
# This test requires Tesseract to be installed
# In CI/CD, you would have a proper test image
pass
class TestAIParser:
"""Test suite for AIParser"""
@pytest.fixture
def ai_parser(self, mocker):
# Mock the LLM client
mock_client = mocker.Mock()
config = {
"model": "gpt-4",
"temperature": 0.1,
"api_key": "test-key"
}
parser = AIParser(config)
parser.llm_client = mock_client
return parser, mock_client
def test_prompt_generation(self, ai_parser):
"""Test extraction prompt generation"""
parser, _ = ai_parser
text = "Invoice from ABC Company"
prompt = parser._generate_extraction_prompt(text)
assert "Invoice Text:" in prompt
assert text in prompt
assert "vendor_name" in prompt
assert "invoice_number" in prompt
def test_parse_llm_response(self, ai_parser):
"""Test LLM response parsing"""
parser, _ = ai_parser
response = '''```json
{
"vendor_name": "ABC Company",
"invoice_number": "INV-001",
"total_amount": 1000.00
}
```'''
data = parser._parse_llm_response(response)
assert data["vendor_name"] == "ABC Company"
assert data["invoice_number"] == "INV-001"
assert data["total_amount"] == 1000.00
def test_build_parse_result(self, ai_parser):
"""Test building ParseResult from extracted data"""
parser, _ = ai_parser
data = {
"vendor_name": "ABC Company",
"invoice_number": "INV-001",
"invoice_date": "2024-01-15",
"total_amount": 1000.00,
"line_items": [
{
"description": "Service",
"quantity": 1,
"unit_price": 1000.00,
"total_price": 1000.00
}
]
}
document = PDFDocument(document_id="test-001")
result = parser._build_parse_result(data, document)
assert result.vendor_id == "ABC Company"
assert result.invoice_number == "INV-001"
assert result.total_amount == Decimal("1000.00")
assert len(result.line_items) == 1
assert result.line_items[0].description == "Service"
import pytest import asyncio from pathlib import Path
from src.core.models import PDFDocument, Vendor, ParserType from src.services.vendor_detector import VendorDetector from src.services.parser_strategy import ParserStrategy from src.services.cache_service import CacheService
class TestEndToEnd: """End-to-end integration tests"""
@pytest.fixture
async def services(self):
"""Initialize services for testing"""
cache = CacheService()
vendor_detector = VendorDetector(cache, vendor_repository=None)
parser_strategy = ParserStrategy()
return {
'cache': cache,
'vendor_detector': vendor_detector,
'parser_strategy': parser_strategy
}
@pytest.fixture
def test_invoices(self):
"""Load test invoice files"""
test_dir = Path("tests/fixtures/invoices")
return list(test_dir.glob("*.pdf"))
@pytest.mark.asyncio
async def test_complete_parsing_flow(self, services, test_invoices):
"""Test complete parsing flow for multiple vendors"""
results = []
for invoice_path in test_invoices:
# Create document
document = PDFDocument(
document_id=f"test-{invoice_path.stem}",
file_path=str(invoice_path)
)
# Detect vendor
vendor = services['vendor_detector'].detect_vendor(document)
# Select parser
parser = services['parser_strategy'].select_parser(document, vendor)
# Parse document
result = parser.parse(document)
# Validate result
assert result.job_id == document.document_id
assert result.parser_used in ["text", "ocr", "ai"]
if result.errors:
pytest.fail(f"Parsing failed for {invoice_path.name}: {result.errors}")
results.append({
'file': invoice_path.name,
'vendor': vendor.vendor_id if vendor else "unknown",
'parser': result.parser_used,
'confidence': parser.get_confidence(),
'invoice_number': result.invoice_number,
'total': result.total_amount
})
# Print summary
print("\n=== Parsing Results ===")
for r in results:
print(f"{r['file']}: {r['vendor']} - {r['invoice_number']} - ${r['total']}")
# Assert we processed all files
assert len(results) == len(test_invoices)
# Check success rate
successful = sum(1 for r in results if r['invoice_number'])
success_rate = successful / len(results)
assert success_rate >= 0.8 # 80% success rate minimum
import pytest import time import concurrent.futures from statistics import mean, stdev
from src.parsers.text_parser import TextParser from src.core.models import PDFDocument
class TestPerformance: """Performance and load tests"""
@pytest.fixture
def parser(self):
return TextParser({})
def test_parsing_performance(self, parser, benchmark):
"""Test single document parsing performance"""
document = PDFDocument(
document_id="perf-test",
file_path="tests/fixtures/sample_invoice.pdf"
)
# Benchmark parsing
result = benchmark(parser.parse, document)
assert result is not None
assert benchmark.stats['mean'] < 5.0 # Should parse in under 5 seconds
def test_concurrent_parsing(self, parser):
"""Test concurrent parsing performance"""
documents = [
PDFDocument(
document_id=f"concurrent-{i}",
file_path="tests/fixtures/sample_invoice.pdf"
)
for i in range(10)
]
start_time = time.time()
# Parse documents concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(parser.parse, doc) for doc in documents]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
total_time = time.time() - start_time
# All documents should be parsed
assert len(results) == 10
# Should complete in reasonable time
assert total_time < 20.0 # 10 documents in under 20 seconds
# Calculate statistics
processing_times = [r.processing_time for r in results]
avg_time = mean(processing_times)
std_dev = stdev(processing_times) if len(processing_times) > 1 else 0
print(f"\nConcurrent Parsing Results:")
print(f"Total Time: {total_time:.2f}s")
print(f"Average Time: {avg_time:.2f}s")
print(f"Std Dev: {std_dev:.2f}s")
# Performance assertions
assert avg_time < 2.0 # Average under 2 seconds per document
assert std_dev < 1.0 # Consistent performance
FROM python:3.11-slim as builder
RUN apt-get update && apt-get install -y
gcc
g++
make
tesseract-ocr
tesseract-ocr-eng
poppler-utils
libpoppler-cpp-dev
&& rm -rf /var/lib/apt/lists/*
RUN python -m venv /opt/venv ENV PATH="/opt/venv/bin:$PATH"
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slim
RUN apt-get update && apt-get install -y
tesseract-ocr
tesseract-ocr-eng
poppler-utils
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /opt/venv /opt/venv ENV PATH="/opt/venv/bin:$PATH"
WORKDIR /app
COPY src/ ./src/ COPY config/ ./config/
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser
RUN pip install awslambdaric
ENTRYPOINT ["python", "-m", "awslambdaric"] CMD ["src.handlers.parsing_handler.lambda_handler"]
pdfplumber==0.9.0 PyPDF2==3.0.1 pdf2image==1.16.3 pytesseract==0.3.10 opencv-python-headless==4.8.1.78 Pillow==10.1.0
openai==1.3.0 anthropic==0.7.0 langchain==0.0.350
fastapi==0.104.1 uvicorn[standard]==0.24.0 pydantic==2.5.0
boto3==1.29.0 aws-lambda-powertools==2.26.0
redis==5.0.1 motor==3.3.2
pandas==2.1.3 numpy==1.25.2 python-dateutil==2.8.2
structlog==23.2.0 python-multipart==0.0.6 fuzzywuzzy==0.18.0 python-Levenshtein==0.23.0
pytest==7.4.3 pytest-asyncio==0.21.1 pytest-mock==3.12.0 pytest-benchmark==4.0.0 pytest-cov==4.1.0
black==23.11.0 flake8==6.1.0 mypy==1.7.0 pre-commit==3.5.0