Created
October 6, 2025 21:06
-
-
Save dhruvilp/e7c7ba93a794ee984b9365d542cadac4 to your computer and use it in GitHub Desktop.
crawl4ai web page loader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import json | |
| import os | |
| from base64 import b64decode | |
| from typing import List, Dict, Optional, Any | |
| from pydantic import BaseModel, Field | |
| from crawl4ai import ( | |
| AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, | |
| JsonCssExtractionStrategy, LLMExtractionStrategy, LLMConfig, | |
| RegexExtractionStrategy, CosineStrategy, | |
| PruningContentFilter, DefaultMarkdownGenerator | |
| ) | |
| from crawl4ai.content_filter_strategy import PruningContentFilter # Explicit for clarity | |
| class Product(BaseModel): # Example Pydantic schema for LLM extraction | |
| name: str = Field(..., description="Product name") | |
| price: str = Field(..., description="Product price") | |
| class WebPageLoader: | |
| """ | |
| SDK-ready wrapper for Crawl4AI webpage loading. | |
| Supports single/multi crawls, extraction strategies, advanced configs. | |
| Usage: loader = WebPageLoader(); result = await loader.load(url) | |
| """ | |
| def __init__( | |
| self, | |
| browser_config: Optional[BrowserConfig] = None, | |
| llm_config: Optional[LLMConfig] = None, | |
| base_directory: str = "./crawl4ai_data" | |
| ): | |
| self.browser_config = browser_config or BrowserConfig(headless=True, verbose=True) | |
| self.llm_config = llm_config | |
| self.base_directory = base_directory | |
| self._crawler: Optional[AsyncWebCrawler] = None | |
| async def __aenter__(self): | |
| self._crawler = AsyncWebCrawler( | |
| config=self.browser_config, | |
| base_directory=self.base_directory | |
| ) | |
| await self._crawler.start() | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| if self._crawler: | |
| await self._crawler.close() | |
| def with_proxy(self, server: str, username: Optional[str] = None, password: Optional[str] = None) -> 'WebPageLoader': | |
| """Add proxy support.""" | |
| self.browser_config.proxy_config = {"server": server} | |
| if username and password: | |
| self.browser_config.proxy_config.update({"username": username, "password": password}) | |
| return self | |
| def with_headers(self, headers: Dict[str, str]) -> 'WebPageLoader': | |
| """Set custom headers (e.g., User-Agent).""" | |
| self.browser_config.extra_headers = headers | |
| return self | |
| def with_strategy(self, strategy: Any) -> 'WebPageLoader': | |
| """Set extraction strategy (JsonCss, LLM, Regex, Cosine).""" | |
| self._run_config = self._get_default_run_config() | |
| self._run_config.extraction_strategy = strategy | |
| return self | |
| def with_content_filter(self, threshold: float = 0.4, threshold_type: str = "fixed") -> 'WebPageLoader': | |
| """Add pruning content filter for cleaner Markdown.""" | |
| md_generator = DefaultMarkdownGenerator( | |
| content_filter=PruningContentFilter(threshold=threshold, threshold_type=threshold_type) | |
| ) | |
| self._run_config.markdown_generator = md_generator | |
| return self | |
| def with_screenshot_pdf(self, screenshot: bool = True, pdf: bool = True) -> 'WebPageLoader': | |
| """Enable screenshots/PDF capture.""" | |
| self._run_config = self._get_default_run_config() | |
| self._run_config.screenshot = screenshot | |
| self._run_config.pdf = pdf | |
| return self | |
| def with_ssl_cert(self) -> 'WebPageLoader': | |
| """Fetch SSL certificate.""" | |
| self._run_config = self._get_default_run_config() | |
| self._run_config.fetch_ssl_certificate = True | |
| return self | |
| def with_cache(self, mode: CacheMode = CacheMode.ENABLED) -> 'WebPageLoader': | |
| """Set caching mode.""" | |
| self._run_config = self._get_default_run_config() | |
| self._run_config.cache_mode = mode | |
| return self | |
| def _get_default_run_config(self) -> CrawlerRunConfig: | |
| if not hasattr(self, '_run_config'): | |
| self._run_config = CrawlerRunConfig( | |
| wait_for="body", | |
| js_code=["window.scrollTo(0, document.body.scrollHeight);"], | |
| verbose=True | |
| ) | |
| return self._run_config | |
| async def load_single(self, url: str, config: Optional[CrawlerRunConfig] = None) -> Dict[str, Any]: | |
| """Load single page, return enhanced dict from CrawlResult.""" | |
| run_config = config or self._get_default_run_config() | |
| result = await self._crawler.arun(url=url, config=run_config) | |
| return self._enhance_result(result) | |
| async def load_multiple(self, urls: List[str], config: Optional[CrawlerRunConfig] = None, max_concurrent: int = 5) -> List[Dict[str, Any]]: | |
| """Load multiple pages with rate limiting.""" | |
| run_config = config or self._get_default_run_config() | |
| results = await self._crawler.arun_many(urls=urls, config=run_config) | |
| return [self._enhance_result(r) for r in results] | |
| def _enhance_result(self, result) -> Dict[str, Any]: | |
| """SDK-friendly result dict with extras (e.g., save screenshot/PDF).""" | |
| enhanced = { | |
| "success": result.success, | |
| "url": result.url, | |
| "markdown": result.markdown, | |
| "html": result.html, | |
| "cleaned_html": result.cleaned_html, | |
| "extracted_content": result.extracted_content, | |
| "links": result.links, | |
| "media": result.media, | |
| "error_message": result.error_message if not result.success else None | |
| } | |
| if result.screenshot: | |
| enhanced["screenshot_b64"] = result.screenshot | |
| # Auto-save example | |
| with open(f"{self.base_directory}/screenshot_{result.url.split('/')[-1]}.png", "wb") as f: | |
| f.write(b64decode(result.screenshot)) | |
| if result.pdf: | |
| enhanced["pdf_b64"] = result.pdf | |
| with open(f"{self.base_directory}/pdf_{result.url.split('/')[-1]}.pdf", "wb") as f: | |
| f.write(b64decode(result.pdf)) # Note: PDF is already bytes, but b64decode if encoded | |
| if result.ssl_certificate: | |
| enhanced["ssl_cert"] = { | |
| "issuer_cn": result.ssl_certificate.issuer.get("CN", ""), | |
| "valid_until": result.ssl_certificate.valid_until, | |
| "fingerprint": result.ssl_certificate.fingerprint | |
| } | |
| return enhanced | |
| # Example strategies factory methods (for SDK ease) | |
| def create_json_css_strategy(schema: Dict) -> JsonCssExtractionStrategy: | |
| """CSS/JSON extraction (e.g., from quickstart).""" | |
| return JsonCssExtractionStrategy(schema) | |
| def create_llm_strategy( | |
| schema: Dict, | |
| instruction: str, | |
| provider: str = "openai/gpt-4o-mini", | |
| api_token: Optional[str] = None, | |
| extraction_type: str = "schema" | |
| ) -> LLMExtractionStrategy: | |
| """LLM extraction with Pydantic schema.""" | |
| llm_cfg = LLMConfig(provider=provider, api_token=api_token or os.getenv(f"{provider.upper()}_API_KEY")) | |
| return LLMExtractionStrategy( | |
| llm_config=llm_cfg, | |
| schema=schema, | |
| instruction=instruction, | |
| extraction_type=extraction_type, | |
| chunk_token_threshold=1000, | |
| apply_chunking=True | |
| ) | |
| def create_regex_strategy(patterns: List[str] = None) -> RegexExtractionStrategy: | |
| """Regex for entities (e.g., emails, URLs).""" | |
| from crawl4ai.extraction_strategy.regex_extraction_strategy import RegexPattern | |
| if patterns: | |
| custom = {p: f"pattern_for_{p}" for p in patterns} # Customize regex | |
| return RegexExtractionStrategy(custom=custom) | |
| return RegexExtractionStrategy(pattern=RegexPattern.Email | RegexPattern.Url | RegexPattern.PhoneIntl) | |
| def create_cosine_strategy(semantic_filter: str = "technology", top_k: int = 3) -> CosineStrategy: | |
| """Similarity-based clustering.""" | |
| return CosineStrategy(semantic_filter=semantic_filter, top_k=top_k, word_count_threshold=20) | |
| # Example usage (as script or SDK test) | |
| async def main(): | |
| # Init with proxy and headers | |
| loader = WebPageLoader().with_proxy("http://proxy.example.com:8080").with_headers({"User-Agent": "CustomBot/1.0"}) | |
| # Example 1: Basic load with content filter | |
| async with loader.with_content_filter() as l: | |
| result = await l.load_single("https://example.com") | |
| print(f"Filtered Markdown: {result['markdown'][:200]}...") | |
| # Example 2: LLM extraction | |
| llm_strategy = create_llm_strategy( | |
| schema=Product.model_json_schema(), | |
| instruction="Extract products with name and price." | |
| ) | |
| async with loader.with_strategy(llm_strategy).with_screenshot_pdf() as l: | |
| result = await l.load_single("https://example-ecommerce.com") | |
| print(f"Extracted JSON: {result['extracted_content']}") | |
| print(f"Screenshot saved to {loader.base_directory}") | |
| # Example 3: Regex for entities | |
| regex_strategy = create_regex_strategy(["email", "phone"]) | |
| async with loader.with_strategy(regex_strategy).with_ssl_cert() as l: | |
| result = await l.load_single("https://httpbin.org/html") | |
| print(f"Entities: {result['extracted_content']}") | |
| if "ssl_cert" in result: | |
| print(f"SSL Valid Until: {result['ssl_cert']['valid_until']}") | |
| # Example 4: Cosine clustering | |
| cosine_strategy = create_cosine_strategy("AI news") | |
| async with loader.with_strategy(cosine_strategy) as l: | |
| result = await l.load_single("https://news.ycombinator.com") | |
| print(f"Clustered Content: {result['extracted_content']}") | |
| # Example 5: Multi-load with cache bypass | |
| urls = ["https://example.com", "https://httpbin.org/html"] | |
| async with loader.with_cache(CacheMode.BYPASS) as l: | |
| results = await l.load_multiple(urls) | |
| for i, r in enumerate(results): | |
| print(f"Page {i+1} Success: {r['success']}, Links: {len(r['links'])}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment