import asyncio import json import os from base64 import b64decode from typing import List, Dict, Optional, Any from pydantic import BaseModel, Field from crawl4ai import ( AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, JsonCssExtractionStrategy, LLMExtractionStrategy, LLMConfig, RegexExtractionStrategy, CosineStrategy, PruningContentFilter, DefaultMarkdownGenerator ) from crawl4ai.content_filter_strategy import PruningContentFilter # Explicit for clarity class Product(BaseModel): # Example Pydantic schema for LLM extraction name: str = Field(..., description="Product name") price: str = Field(..., description="Product price") class WebPageLoader: """ SDK-ready wrapper for Crawl4AI webpage loading. Supports single/multi crawls, extraction strategies, advanced configs. Usage: loader = WebPageLoader(); result = await loader.load(url) """ def __init__( self, browser_config: Optional[BrowserConfig] = None, llm_config: Optional[LLMConfig] = None, base_directory: str = "./crawl4ai_data" ): self.browser_config = browser_config or BrowserConfig(headless=True, verbose=True) self.llm_config = llm_config self.base_directory = base_directory self._crawler: Optional[AsyncWebCrawler] = None async def __aenter__(self): self._crawler = AsyncWebCrawler( config=self.browser_config, base_directory=self.base_directory ) await self._crawler.start() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._crawler: await self._crawler.close() def with_proxy(self, server: str, username: Optional[str] = None, password: Optional[str] = None) -> 'WebPageLoader': """Add proxy support.""" self.browser_config.proxy_config = {"server": server} if username and password: self.browser_config.proxy_config.update({"username": username, "password": password}) return self def with_headers(self, headers: Dict[str, str]) -> 'WebPageLoader': """Set custom headers (e.g., User-Agent).""" self.browser_config.extra_headers = headers return self def with_strategy(self, strategy: Any) -> 'WebPageLoader': """Set extraction strategy (JsonCss, LLM, Regex, Cosine).""" self._run_config = self._get_default_run_config() self._run_config.extraction_strategy = strategy return self def with_content_filter(self, threshold: float = 0.4, threshold_type: str = "fixed") -> 'WebPageLoader': """Add pruning content filter for cleaner Markdown.""" md_generator = DefaultMarkdownGenerator( content_filter=PruningContentFilter(threshold=threshold, threshold_type=threshold_type) ) self._run_config.markdown_generator = md_generator return self def with_screenshot_pdf(self, screenshot: bool = True, pdf: bool = True) -> 'WebPageLoader': """Enable screenshots/PDF capture.""" self._run_config = self._get_default_run_config() self._run_config.screenshot = screenshot self._run_config.pdf = pdf return self def with_ssl_cert(self) -> 'WebPageLoader': """Fetch SSL certificate.""" self._run_config = self._get_default_run_config() self._run_config.fetch_ssl_certificate = True return self def with_cache(self, mode: CacheMode = CacheMode.ENABLED) -> 'WebPageLoader': """Set caching mode.""" self._run_config = self._get_default_run_config() self._run_config.cache_mode = mode return self def _get_default_run_config(self) -> CrawlerRunConfig: if not hasattr(self, '_run_config'): self._run_config = CrawlerRunConfig( wait_for="body", js_code=["window.scrollTo(0, document.body.scrollHeight);"], verbose=True ) return self._run_config async def load_single(self, url: str, config: Optional[CrawlerRunConfig] = None) -> Dict[str, Any]: """Load single page, return enhanced dict from CrawlResult.""" run_config = config or self._get_default_run_config() result = await self._crawler.arun(url=url, config=run_config) return self._enhance_result(result) async def load_multiple(self, urls: List[str], config: Optional[CrawlerRunConfig] = None, max_concurrent: int = 5) -> List[Dict[str, Any]]: """Load multiple pages with rate limiting.""" run_config = config or self._get_default_run_config() results = await self._crawler.arun_many(urls=urls, config=run_config) return [self._enhance_result(r) for r in results] def _enhance_result(self, result) -> Dict[str, Any]: """SDK-friendly result dict with extras (e.g., save screenshot/PDF).""" enhanced = { "success": result.success, "url": result.url, "markdown": result.markdown, "html": result.html, "cleaned_html": result.cleaned_html, "extracted_content": result.extracted_content, "links": result.links, "media": result.media, "error_message": result.error_message if not result.success else None } if result.screenshot: enhanced["screenshot_b64"] = result.screenshot # Auto-save example with open(f"{self.base_directory}/screenshot_{result.url.split('/')[-1]}.png", "wb") as f: f.write(b64decode(result.screenshot)) if result.pdf: enhanced["pdf_b64"] = result.pdf with open(f"{self.base_directory}/pdf_{result.url.split('/')[-1]}.pdf", "wb") as f: f.write(b64decode(result.pdf)) # Note: PDF is already bytes, but b64decode if encoded if result.ssl_certificate: enhanced["ssl_cert"] = { "issuer_cn": result.ssl_certificate.issuer.get("CN", ""), "valid_until": result.ssl_certificate.valid_until, "fingerprint": result.ssl_certificate.fingerprint } return enhanced # Example strategies factory methods (for SDK ease) def create_json_css_strategy(schema: Dict) -> JsonCssExtractionStrategy: """CSS/JSON extraction (e.g., from quickstart).""" return JsonCssExtractionStrategy(schema) def create_llm_strategy( schema: Dict, instruction: str, provider: str = "openai/gpt-4o-mini", api_token: Optional[str] = None, extraction_type: str = "schema" ) -> LLMExtractionStrategy: """LLM extraction with Pydantic schema.""" llm_cfg = LLMConfig(provider=provider, api_token=api_token or os.getenv(f"{provider.upper()}_API_KEY")) return LLMExtractionStrategy( llm_config=llm_cfg, schema=schema, instruction=instruction, extraction_type=extraction_type, chunk_token_threshold=1000, apply_chunking=True ) def create_regex_strategy(patterns: List[str] = None) -> RegexExtractionStrategy: """Regex for entities (e.g., emails, URLs).""" from crawl4ai.extraction_strategy.regex_extraction_strategy import RegexPattern if patterns: custom = {p: f"pattern_for_{p}" for p in patterns} # Customize regex return RegexExtractionStrategy(custom=custom) return RegexExtractionStrategy(pattern=RegexPattern.Email | RegexPattern.Url | RegexPattern.PhoneIntl) def create_cosine_strategy(semantic_filter: str = "technology", top_k: int = 3) -> CosineStrategy: """Similarity-based clustering.""" return CosineStrategy(semantic_filter=semantic_filter, top_k=top_k, word_count_threshold=20) # Example usage (as script or SDK test) async def main(): # Init with proxy and headers loader = WebPageLoader().with_proxy("http://proxy.example.com:8080").with_headers({"User-Agent": "CustomBot/1.0"}) # Example 1: Basic load with content filter async with loader.with_content_filter() as l: result = await l.load_single("https://example.com") print(f"Filtered Markdown: {result['markdown'][:200]}...") # Example 2: LLM extraction llm_strategy = create_llm_strategy( schema=Product.model_json_schema(), instruction="Extract products with name and price." ) async with loader.with_strategy(llm_strategy).with_screenshot_pdf() as l: result = await l.load_single("https://example-ecommerce.com") print(f"Extracted JSON: {result['extracted_content']}") print(f"Screenshot saved to {loader.base_directory}") # Example 3: Regex for entities regex_strategy = create_regex_strategy(["email", "phone"]) async with loader.with_strategy(regex_strategy).with_ssl_cert() as l: result = await l.load_single("https://httpbin.org/html") print(f"Entities: {result['extracted_content']}") if "ssl_cert" in result: print(f"SSL Valid Until: {result['ssl_cert']['valid_until']}") # Example 4: Cosine clustering cosine_strategy = create_cosine_strategy("AI news") async with loader.with_strategy(cosine_strategy) as l: result = await l.load_single("https://news.ycombinator.com") print(f"Clustered Content: {result['extracted_content']}") # Example 5: Multi-load with cache bypass urls = ["https://example.com", "https://httpbin.org/html"] async with loader.with_cache(CacheMode.BYPASS) as l: results = await l.load_multiple(urls) for i, r in enumerate(results): print(f"Page {i+1} Success: {r['success']}, Links: {len(r['links'])}") if __name__ == "__main__": asyncio.run(main())