"""infinity-loop/main.py Main module for the Infinity Loop Coding Challenge. This module provides contract metadata normalization functionality, converting heterogeneous vendor contract data into a standardized format. """ from concurrent.futures import ProcessPoolExecutor from enum import StrEnum from pprint import pprint from typing import Any, List, Optional from loguru import logger from pydantic import BaseModel, Field, field_validator from rich.console import Console from rich.logging import RichHandler from tqdm import tqdm CONSOLE_LOGGING = True FILE_LOGGING = True LOG_PATH = "./logs.txt" # Configure loguru logger.remove() console = Console() if CONSOLE_LOGGING: logger.add( RichHandler( console = console, show_time = True, show_level = True, show_path = True, markup = True, rich_tracebacks = True, tracebacks_show_locals = True, ), level = "DEBUG", format = ( "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | " "{name}:{function}:{line} - {message}" ), ) if FILE_LOGGING: logger.add( LOG_PATH, level = "DEBUG", format = ( "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | " "{name}:{function}:{line} - {message}" ), ) class TerminationType(StrEnum): """ party that can terminate the contract """ CLIENT = "client" VENDOR = "vendor" EITHER_PARTY = "either party" class AgreementRaw(BaseModel): """ agreement data """ vendor : str = Field(description="vendor name") auto_renew: bool = Field( default = True, description = "auto-renewal status", alias = "autoRenew" ) renewal_period: Optional[str] = Field( default = None, description = "renewal period", alias = "renewalPeriod" ) renewal_notice_days: Optional[int] = Field( default = None, description = "renewal notice days", alias = "renewalNoticeDays", ) termination_type: Optional[TerminationType] = Field( default = None, description = "termination type", alias = "terminationType" ) termination_notice_days: Optional[int] = Field( default = None, description = "termination notice days", alias = "terminationNoticeDays", ) model_config = {"populate_by_name": True} @field_validator("renewal_notice_days") @classmethod def check_renewal_notice_days(cls, v: Optional[int]) -> Optional[int]: if v is None: return v if v <= 0: logger.error(f"Invalid renewal_notice_days: {v}. Must be greater than 0.") raise ValueError("renewal notice days must be greater than 0") return v @field_validator("termination_notice_days") @classmethod def check_termination_notice_days(cls, v: Optional[int]) -> Optional[int]: if v is None: return v if v <= 0: logger.error( f"Invalid termination_notice_days: {v}. " f"Must be greater than 0." ) raise ValueError("termination notice period must be greater than 0") return v class AgreementNormalized(BaseModel): """ normalized agreement data """ vendor : str = Field(description="vendor name") renewal_terms : str = Field(description="renewal terms") termination_terms: str = Field(description="termination terms") class Agreements(BaseModel): """ list of agreements """ agreements: List[AgreementRaw] class DataNormalizer: """ main class for data normalization """ @staticmethod def compileRenewalTerms( auto_renew : Optional[bool], renewal_period : Optional[str], notice_period_days: Optional[int], ) -> str: """ Compiles the renewal terms components (or placeholders) into the final renewal terms data string. Handles cases for missing renewal period, notice period, or their units, and auto-renewal status. """ logger.debug( f"compileRenewalTerms: auto_renew={auto_renew}, " f"renewal_period='{renewal_period}', " f"notice_period_days={notice_period_days}" ) if auto_renew is False: logger.debug("compileRenewalTerms: No auto-renewal.") return "No auto-renewal" parts = [] # Renewal part if not renewal_period: parts.append("Renewal period unknown") else: parts.append(f"Renews every {renewal_period}") # Notice part if not notice_period_days: parts.append("notice period unknown") else: parts.append(f"{notice_period_days} days notice") compiled_terms = ", ".join(parts) logger.debug(f"compileRenewalTerms: Compiled to '{compiled_terms}'") return compiled_terms @staticmethod def compileTerminationTerms( termination_party: Optional[TerminationType], termination_notice_period_days: Optional[int], ) -> str: """ Compiles the termination terms components (or placeholders) into the final termination terms data string. """ logger.debug( f"compileTerminationTerms: termination_party={termination_party}, " f"termination_notice_period_days={termination_notice_period_days}" ) parts = [] # Termination part if not termination_party: parts.append("termination party unknown") else: parts.append(f"May be terminated by {termination_party} with") if not termination_notice_period_days: parts.append("notice period unknown") else: parts.append(f"{termination_notice_period_days} days notice") compiled_terms = ", ".join(parts) logger.debug(f"compileTerminationTerms: Compiled to '{compiled_terms}'") return compiled_terms @staticmethod def normalize(agreement: dict[str, Any]) -> dict[str, str]: """normalize a single agreement""" vendor_name = agreement.get("vendor", "Unknown Vendor") logger.debug(f"normalize: Starting normalization for vendor: {vendor_name}") try: data = AgreementRaw(**agreement) logger.debug( f"normalize: Successfully validated raw data for {data.vendor}" ) except Exception as e: logger.error(f"normalize: Validation error for {vendor_name}: {e}") raise normalized_model = AgreementNormalized( vendor = data.vendor, renewal_terms = DataNormalizer.compileRenewalTerms( data.auto_renew, data.renewal_period, data.renewal_notice_days, ), termination_terms=DataNormalizer.compileTerminationTerms( data.termination_type, data.termination_notice_days ), ) result = normalized_model.model_dump() logger.info( f"normalize: Successfully normalized agreement for vendor: {data.vendor}" ) logger.debug(f"normalize: Normalized data for {data.vendor}: {result}") return result @staticmethod def normalizeAll(agreements: list[dict[str, Any]]) -> list[dict[str, str]]: """main method to normalize the agreements sequentially""" logger.info( f"normalizeAll: Starting sequential normalization for " f"{len(agreements)} agreements." ) normalized_agreements = [] for agreement in tqdm(agreements, desc="Normalizing agreements (sequential)"): try: normalized_agreements.append(DataNormalizer.normalize(agreement)) except Exception as e: vendor_name = agreement.get("vendor", "Unknown Vendor") logger.error( f"normalizeAll: Failed to normalize agreement for " f"{vendor_name}: {e}. Skipping." ) logger.info( f"normalizeAll: Finished sequential normalization. " f"Processed {len(normalized_agreements)} out of " f"{len(agreements)} agreements." ) return normalized_agreements @staticmethod def normalizeAllParallel( agreements: list[dict[str, Any]], max_workers: Optional[int] = None ) -> list[dict[str, str]]: """main method to normalize the agreements in parallel""" num_agreements = len(agreements) logger.info( f"normalizeAllParallel: Starting parallel normalization for " f"{num_agreements} agreements with " f"max_workers={max_workers if max_workers is not None else 'default'}." ) results: List[dict[str, str]] = [] if not agreements: logger.info("normalizeAllParallel: No agreements to process.") return results with ProcessPoolExecutor(max_workers=max_workers) as executor: try: logger.debug( f"normalizeAllParallel: Submitting {num_agreements} " f"tasks to ProcessPoolExecutor." ) results = list( tqdm( executor.map(DataNormalizer.normalize, agreements), total=num_agreements, desc="Normalizing agreements (parallel)", ) ) logger.debug( "normalizeAllParallel: All parallel tasks completed processing." ) except Exception as e: logger.error( f"normalizeAllParallel: Halting due to an error in a worker " f"process: {e}. No agreements will be returned from this " f"parallel batch." ) results = [] processed_count = len(results) if not results and num_agreements > 0: logger.warning( f"normalizeAllParallel: Finished. 0/{num_agreements} agreements " f"successfully processed and returned, likely due to an error " f"during parallel execution (see error logs)." ) else: logger.info( f"normalizeAllParallel: Finished. Successfully processed " f"{processed_count}/{num_agreements} agreements." ) return results if __name__ == "__main__": agreements_data = [ { "vendor" : "ACME", "autoRenew" : False, "terminationType" : "either party", "terminationNoticeDays": 60, }, { "vendor" : "Initech", "renewalPeriod" : "12 months", "renewalNoticeDays": 30, }, { "vendor" : "Globex", "autoRenew" : True, "renewalNoticeDays" : 60, "terminationType" : "vendor", "terminationNoticeDays": 30, }, ] logger.info("\n--- Running Sequential Normalization ---") normalized_agreements_seq = DataNormalizer.normalizeAll(agreements_data) logger.info("Normalized agreements (sequential):") pprint(normalized_agreements_seq) logger.info("\n--- Running Parallel Normalization ---") normalized_agreements_par = DataNormalizer.normalizeAllParallel(agreements_data) logger.info("Normalized agreements (parallel):") pprint(normalized_agreements_par)