Skip to content

Instantly share code, notes, and snippets.

@SimpleZn
Last active August 29, 2025 07:57
Show Gist options
  • Select an option

  • Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.

Select an option

Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.

Revisions

  1. SimpleZn renamed this gist Aug 29, 2025. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. SimpleZn revised this gist Aug 29, 2025. 1 changed file with 1 addition and 3 deletions.
    4 changes: 1 addition & 3 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,4 @@
    # generic article crawler

    ```python
    #!/usr/bin/env python3
    """
    通用文章爬虫框架
    @@ -1252,5 +1250,5 @@ if __name__ == '__main__':

    crawler = SimpleUrlCrawler(config, urls)
    crawler.crawl_articles()
    ```


  3. SimpleZn revised this gist Aug 29, 2025. 1 changed file with 0 additions and 373 deletions.
    373 changes: 0 additions & 373 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -1254,376 +1254,3 @@ if __name__ == '__main__':
    crawler.crawl_articles()
    ```

    # 使用指南

    # 通用文章爬虫框架使用指南

    ## 概述

    本框架提供了一个基于Selenium的通用文章爬虫基础架构,可以轻松扩展用于不同网站的文章爬取。框架采用面向对象设计,支持断点续传、并发处理、内容提取和格式化等功能。

    ## 框架结构

    ### 核心组件

    1. **ArticleCrawlerConfig** - 爬虫配置类
    2. **SeleniumDriverManager** - 浏览器驱动管理器
    3. **ContentExtractor** - 内容提取器
    4. **ProgressManager** - 进度管理器
    5. **BaseArticleCrawler** - 抽象基类

    ### 文件结构

    ```
    scripts/
    ├── generic_article_crawler.py # 通用框架核心
    ├── binance_faq_generic_crawler.py # Binance FAQ实现示例
    ├── crawler_framework_guide.md # 本使用指南
    └── your_custom_crawler.py # 你的自定义爬虫
    ```

    ## 快速开始

    ### 1. 创建自定义爬虫

    继承 `BaseArticleCrawler` 类并实现必需的抽象方法:

    ```python
    from generic_article_crawler import BaseArticleCrawler, ArticleCrawlerConfig

    class YourCustomCrawler(BaseArticleCrawler):
    def fetch_article_links(self) -> List[Dict[str, Any]]:
    """获取文章链接列表 - 必须实现"""
    # 返回文章列表,每个文章包含 id, url, title 等字段
    pass

    def generate_filename(self, article: Dict[str, Any]) -> str:
    """生成文件名 - 必须实现"""
    # 根据文章信息生成安全的文件名
    pass

    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    """格式化文章内容 - 必须实现"""
    # 添加元数据和格式化内容
    pass
    ```

    ### 2. 配置爬虫

    ```python
    from generic_article_crawler import ArticleCrawlerConfig

    config = ArticleCrawlerConfig(
    base_url="https://example.com",
    output_dir="./articles",
    delay=2.0,
    max_workers=3,
    headless=True,
    user_agent="Your User Agent",
    cookies={"key": "value"},
    content_selectors=[".article-content", "article", ".content"]
    )
    ```

    ### 3. 运行爬虫

    ```python
    crawler = YourCustomCrawler(config)
    crawler.crawl_articles(resume=True, progress_file='progress.json')
    ```

    ## 详细实现指南

    ### 实现 fetch_article_links()

    这个方法负责获取所有需要爬取的文章链接。返回格式:

    ```python
    def fetch_article_links(self) -> List[Dict[str, Any]]:
    articles = []

    # 方法1: 从API获取
    response = requests.get("https://api.example.com/articles")
    data = response.json()

    for item in data['articles']:
    articles.append({
    'id': item['id'],
    'url': item['url'],
    'title': item['title'],
    # 其他自定义字段
    })

    # 方法2: 从HTML页面解析
    # 方法3: 从本地文件读取

    return articles
    ```

    ### 实现 generate_filename()

    生成安全的文件名,避免特殊字符:

    ```python
    import re

    def generate_filename(self, article: Dict[str, Any]) -> str:
    title = article['title']
    # 清理特殊字符
    safe_title = re.sub(r'[^\w\s-]', '', title).strip()
    safe_title = re.sub(r'[-\s]+', '_', safe_title)

    # 限制长度
    if len(safe_title) > 100:
    safe_title = safe_title[:100]

    return f"{article['id']}_{safe_title}.md"
    ```

    ### 实现 format_article_content()

    格式化最终的文章内容:

    ```python
    from datetime import datetime

    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    return f"""# {article['title']}

    **ID:** {article['id']}
    **URL:** {article['url']}
    **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

    ---

    {content}

    ---

    *本文档由爬虫自动生成*
    """
    ```

    ## 配置选项详解

    ### ArticleCrawlerConfig 参数

    | 参数 | 类型 | 默认值 | 说明 |
    |------|------|--------|------|
    | base_url | str | 必需 | 网站基础URL |
    | output_dir | str | "./output" | 输出目录 |
    | delay | float | 2.0 | 请求间隔(秒) |
    | max_workers | int | 3 | 并发线程数 |
    | headless | bool | True | 无头模式 |
    | show_browser | bool | False | 显示浏览器 |
    | user_agent | str | 默认UA | 用户代理 |
    | cookies | Dict | {} | Cookie字典 |
    | content_selectors | List[str] | 默认选择器 | 内容选择器列表 |
    | timeout | int | 30 | 页面加载超时 |

    ### 内容选择器

    框架会按优先级尝试以下选择器:

    ```python
    content_selectors = [
    "article", # HTML5 article标签
    ".article-content", # 常见类名
    ".content",
    "#content",
    ".post-content",
    "main",
    ".main-content"
    ]
    ```

    你可以根据目标网站的HTML结构自定义选择器。

    ## 高级功能

    ### 断点续传

    框架自动支持断点续传:

    ```python
    # 启用断点续传(默认)
    crawler.crawl_articles(resume=True, progress_file='progress.json')

    # 禁用断点续传,重新开始
    crawler.crawl_articles(resume=False)
    ```

    ### 自定义Cookie和反检测

    ```python
    config = ArticleCrawlerConfig(
    base_url="https://example.com",
    cookies={
    'session_id': 'your_session_id',
    'csrf_token': 'your_csrf_token'
    },
    user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    )
    ```

    ### 并发控制

    ```python
    # 低并发,适合严格的网站
    config = ArticleCrawlerConfig(max_workers=1, delay=5.0)

    # 高并发,适合宽松的网站
    config = ArticleCrawlerConfig(max_workers=5, delay=1.0)
    ```

    ## 实际案例

    ### 案例1: 博客文章爬虫

    ```python
    class BlogCrawler(BaseArticleCrawler):
    def __init__(self, config, blog_url):
    super().__init__(config)
    self.blog_url = blog_url

    def fetch_article_links(self):
    # 从博客首页解析文章链接
    articles = []
    # 实现解析逻辑...
    return articles

    def generate_filename(self, article):
    date = article.get('date', '').replace('-', '_')
    title = re.sub(r'[^\w\s-]', '', article['title'])[:50]
    return f"{date}_{title}.md"

    def format_article_content(self, article, content):
    return f"""# {article['title']}

    **作者:** {article.get('author', 'Unknown')}
    **发布时间:** {article.get('date', 'Unknown')}
    **原文链接:** {article['url']}

    {content}
    """
    ```

    ### 案例2: 新闻网站爬虫

    ```python
    class NewsCrawler(BaseArticleCrawler):
    def fetch_article_links(self):
    # 从RSS或API获取新闻列表
    pass

    def generate_filename(self, article):
    category = article.get('category', 'general')
    timestamp = article.get('timestamp', int(time.time()))
    return f"{category}_{timestamp}_{article['id']}.md"
    ```

    ## 最佳实践

    ### 1. 遵守网站规则
    - 检查 robots.txt
    - 设置合理的延迟时间
    - 避免过高的并发数
    - 尊重网站的反爬虫措施

    ### 2. 错误处理
    - 实现重试机制
    - 记录失败的文章
    - 监控爬取状态

    ### 3. 性能优化
    - 根据网站响应调整延迟
    - 使用无头模式提高效率
    - 定期清理浏览器缓存

    ### 4. 数据质量
    - 验证提取的内容
    - 清理HTML标签
    - 处理编码问题

    ## 故障排除

    ### 常见问题

    1. **浏览器启动失败**
    - 检查Chrome是否已安装
    - 更新ChromeDriver
    - 检查系统权限

    2. **内容提取失败**
    - 检查content_selectors配置
    - 查看页面HTML结构
    - 调整等待时间

    3. **反爬虫检测**
    - 增加延迟时间
    - 更换User-Agent
    - 添加必要的Cookie
    - 使用代理IP

    4. **内存占用过高**
    - 减少并发数
    - 定期重启浏览器
    - 清理临时文件

    ### 调试技巧

    ```python
    # 启用浏览器显示模式进行调试
    config = ArticleCrawlerConfig(
    base_url="https://example.com",
    headless=False,
    show_browser=True
    )

    # 单线程模式便于调试
    config.max_workers = 1
    ```

    ## 扩展开发

    ### 添加新的内容提取器

    ```python
    class CustomContentExtractor(ContentExtractor):
    def extract_content(self, driver, url, max_retries=3):
    # 自定义提取逻辑
    pass

    # 在爬虫中使用
    class YourCrawler(BaseArticleCrawler):
    def __init__(self, config):
    super().__init__(config)
    self.content_extractor = CustomContentExtractor(config)
    ```

    ### 添加新的进度管理器

    ```python
    class DatabaseProgressManager(ProgressManager):
    def load_progress(self):
    # 从数据库加载进度
    pass

    def save_progress(self, progress):
    # 保存进度到数据库
    pass
    ```

    ## 总结

    通用文章爬虫框架提供了:

    - 🏗️ **模块化设计** - 易于扩展和维护
    - 🔄 **断点续传** - 支持大规模爬取任务
    - 🚀 **并发处理** - 提高爬取效率
    - 🛡️ **反检测** - 内置多种反爬虫措施
    - 📊 **进度跟踪** - 实时监控爬取状态
    - 🎯 **智能提取** - 自动识别内容区域
    - 📝 **格式化输出** - 生成标准Markdown文档

    通过继承基类并实现三个核心方法,你可以快速为任何网站创建专用的文章爬虫。框架处理了所有底层细节,让你专注于业务逻辑的实现。
  4. SimpleZn created this gist Aug 29, 2025.
    1,629 changes: 1,629 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,1629 @@
    # generic article crawler

    ```python
    #!/usr/bin/env python3
    """
    通用文章爬虫框架
    基于Selenium的可扩展文章爬取基础框架

    作者: AI Assistant
    创建时间: 2025-01-27
    """

    import json
    import time
    import argparse
    from pathlib import Path
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from urllib.parse import urljoin, urlparse
    from datetime import datetime
    import re
    import random
    from abc import ABC, abstractmethod
    from typing import List, Dict, Optional, Tuple, Any

    # Selenium相关导入
    from selenium import webdriver
    from selenium.webdriver.chrome.service import Service as ChromeService
    from selenium.webdriver.chrome.options import Options
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
    from webdriver_manager.chrome import ChromeDriverManager

    # HTML转Markdown
    import html2text
    from bs4 import BeautifulSoup


    class ArticleCrawlerConfig:
    """爬虫配置类"""

    def __init__(self,
    base_url: str,
    output_dir: str = "./output",
    delay: float = 2.0,
    max_workers: int = 3,
    headless: bool = True,
    show_browser: bool = False,
    user_agent: str = None,
    cookies: Dict[str, str] = None,
    content_selectors: List[str] = None,
    timeout: int = 30):
    self.base_url = base_url
    self.output_dir = Path(output_dir)
    self.delay = delay
    self.max_workers = max_workers
    self.headless = headless
    self.show_browser = show_browser
    self.timeout = timeout

    # 默认用户代理
    self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'

    # Cookie设置
    self.cookies = cookies or {}

    # 内容选择器(按优先级排序)
    self.content_selectors = content_selectors or [
    "article",
    ".article-content",
    ".content",
    "#content",
    ".post-content",
    "main",
    ".main-content"
    ]

    # 创建输出目录
    self.output_dir.mkdir(parents=True, exist_ok=True)


    class SeleniumDriverManager:
    """Selenium浏览器驱动管理器"""

    def __init__(self, config: ArticleCrawlerConfig):
    self.config = config
    self.driver = None
    self._setup_chrome_options()

    def _setup_chrome_options(self):
    """设置Chrome选项"""
    self.chrome_options = Options()

    # 基础设置
    if self.config.headless and not self.config.show_browser:
    self.chrome_options.add_argument('--headless')

    # 性能和稳定性选项
    chrome_args = [
    '--no-sandbox',
    '--disable-dev-shm-usage',
    '--disable-gpu',
    '--disable-extensions',
    '--disable-plugins',
    '--window-size=1920,1080',
    # 反检测选项
    '--disable-blink-features=AutomationControlled',
    '--disable-web-security',
    '--allow-running-insecure-content',
    '--disable-features=VizDisplayCompositor'
    ]

    for arg in chrome_args:
    self.chrome_options.add_argument(arg)

    # 实验性选项
    self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
    self.chrome_options.add_experimental_option('useAutomationExtension', False)

    # 用户代理
    self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}')

    def setup_driver(self) -> bool:
    """设置Chrome浏览器驱动"""
    try:
    service = ChromeService(ChromeDriverManager().install())
    self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
    self.driver.set_page_load_timeout(self.config.timeout)
    self.driver.implicitly_wait(10)

    # 执行反检测脚本
    anti_detection_scripts = [
    "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})",
    "Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})",
    "Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})",
    "window.chrome = {runtime: {}}"
    ]

    for script in anti_detection_scripts:
    self.driver.execute_script(script)

    # 添加Cookie
    if self.config.cookies:
    self._add_cookies()

    mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式"
    print(f"✅ Chrome浏览器已启动 ({mode_desc})")
    return True

    except Exception as e:
    print(f"❌ 浏览器启动失败: {e}")
    return False

    def _add_cookies(self):
    """添加Cookie"""
    # 先访问基础域名
    domain = urlparse(self.config.base_url).netloc
    self.driver.get(self.config.base_url)

    for name, value in self.config.cookies.items():
    try:
    self.driver.add_cookie({
    'name': name,
    'value': value,
    'domain': f'.{domain}'
    })
    except Exception as e:
    print(f"⚠️ 添加Cookie失败 {name}: {e}")

    def cleanup_driver(self):
    """清理浏览器驱动"""
    if self.driver:
    try:
    self.driver.quit()
    print("✅ 浏览器已关闭")
    except Exception as e:
    print(f"⚠️ 关闭浏览器时出错: {e}")


    class ContentExtractor:
    """内容提取器"""

    def __init__(self, config: ArticleCrawlerConfig):
    self.config = config
    self.html_converter = html2text.HTML2Text()
    self.html_converter.ignore_links = False
    self.html_converter.ignore_images = False
    self.html_converter.body_width = 0

    def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]:
    """提取页面内容"""
    for attempt in range(max_retries):
    try:
    # 随机延迟
    delay = random.uniform(self.config.delay, self.config.delay * 2)
    time.sleep(delay)

    driver.get(url)

    # 等待页面加载
    WebDriverWait(driver, 20).until(
    EC.presence_of_element_located((By.TAG_NAME, "body"))
    )

    # 额外等待确保页面完全加载
    time.sleep(random.uniform(1, 3))

    # 尝试多种选择器找到文章内容
    content_element = self._find_content_element(driver)

    if not content_element:
    if attempt < max_retries - 1:
    print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "No content container found"

    # 获取HTML内容,处理 stale element 异常
    try:
    html_content = content_element.get_attribute('outerHTML')
    except StaleElementReferenceException:
    # 元素已过期,重新查找
    print(f" ⚠️ 元素已过期,重新定位")
    content_element = self._find_content_element(driver)
    if not content_element:
    if attempt < max_retries - 1:
    print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "Content element became stale and could not be relocated"
    html_content = content_element.get_attribute('outerHTML')

    if not html_content or len(html_content.strip()) < 100:
    if attempt < max_retries - 1:
    print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "Content too short"

    # 清理和转换内容
    markdown_content = self._process_html_content(html_content)
    return markdown_content.strip(), None

    except TimeoutException:
    if attempt < max_retries - 1:
    print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "Page load timeout"
    except Exception as e:
    if attempt < max_retries - 1:
    print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}")
    continue
    else:
    return None, f"Extraction error: {str(e)}"

    return None, "Max retries exceeded"

    def _find_content_element(self, driver: webdriver.Chrome):
    """查找内容元素"""
    # 尝试预定义的选择器
    for selector in self.config.content_selectors:
    try:
    # 等待元素出现
    WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.CSS_SELECTOR, selector))
    )
    elements = driver.find_elements(By.CSS_SELECTOR, selector)
    if elements:
    print(f" ✅ 找到内容容器: {selector}")
    return elements[0]
    except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
    continue

    # 如果没找到特定容器,尝试找到包含最多文本的div
    try:
    # 等待页面基本加载完成
    WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.TAG_NAME, "div"))
    )

    divs = driver.find_elements(By.TAG_NAME, "div")
    max_text_length = 0
    best_div = None

    for div in divs:
    try:
    text_length = len(div.text.strip())
    if text_length > max_text_length and text_length > 100:
    max_text_length = text_length
    best_div = div
    except (StaleElementReferenceException, Exception):
    continue

    if best_div:
    print(f" ✅ 找到内容容器: auto-detected div")
    return best_div
    except (TimeoutException, Exception):
    pass

    return None

    def _process_html_content(self, html_content: str) -> str:
    """处理HTML内容"""
    # 使用BeautifulSoup清理HTML
    soup = BeautifulSoup(html_content, 'html.parser')

    # 移除不需要的元素
    unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside']
    for tag in soup.find_all(unwanted_tags):
    tag.decompose()

    # 转换为Markdown
    markdown_content = self.html_converter.handle(str(soup))
    return markdown_content


    class ProgressManager:
    """进度管理器"""

    def __init__(self, progress_file: str = 'crawl_progress.json'):
    self.progress_file = progress_file

    def load_progress(self) -> Dict[str, set]:
    """加载爬取进度"""
    progress_path = Path(self.progress_file)
    if not progress_path.exists():
    return {'completed': set(), 'failed': set()}

    try:
    with open(progress_path, 'r', encoding='utf-8') as f:
    data = json.load(f)
    return {
    'completed': set(data.get('completed', [])),
    'failed': set(data.get('failed', []))
    }
    except Exception as e:
    print(f"⚠️ 读取进度文件失败: {e},将重新开始")
    return {'completed': set(), 'failed': set()}

    def save_progress(self, progress: Dict[str, set]):
    """保存爬取进度"""
    try:
    data = {
    'completed': list(progress['completed']),
    'failed': list(progress['failed']),
    'last_update': datetime.now().isoformat()
    }
    with open(self.progress_file, 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception as e:
    print(f"⚠️ 保存进度文件失败: {e}")


    class BaseArticleCrawler(ABC):
    """文章爬虫基类"""

    def __init__(self, config: ArticleCrawlerConfig):
    self.config = config
    self.driver_manager = SeleniumDriverManager(config)
    self.content_extractor = ContentExtractor(config)
    self.progress_manager = ProgressManager()

    # 统计信息
    self.stats = {
    'total_articles': 0,
    'processed': 0,
    'success': 0,
    'failed': 0
    }

    @abstractmethod
    def fetch_article_links(self) -> List[Dict[str, Any]]:
    """获取文章链接列表 - 子类必须实现"""
    pass

    @abstractmethod
    def generate_filename(self, article: Dict[str, Any]) -> str:
    """生成文件名 - 子类必须实现"""
    pass

    @abstractmethod
    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    """格式化文章内容 - 子类必须实现"""
    pass

    def is_article_completed(self, article: Dict[str, Any]) -> bool:
    """检查文章是否已完成"""
    filename = self.generate_filename(article)
    filepath = self.config.output_dir / filename
    return filepath.exists()

    def save_article(self, article: Dict[str, Any], content: str) -> bool:
    """保存文章为Markdown文件"""
    try:
    filename = self.generate_filename(article)
    filepath = self.config.output_dir / filename

    # 格式化完整内容
    full_content = self.format_article_content(article, content)

    with open(filepath, 'w', encoding='utf-8') as f:
    f.write(full_content)

    print(f" 💾 已保存: {filename}")
    return True

    except Exception as e:
    print(f" ❌ 保存失败: {e}")
    return False

    def process_single_article(self, article: Dict[str, Any]) -> bool:
    """处理单篇文章"""
    article_title = article.get('title', article.get('url', 'Unknown'))
    print(f"📄 处理: {article_title}")

    # 提取内容
    content, error = self.content_extractor.extract_content(
    self.driver_manager.driver,
    article['url']
    )

    if error:
    print(f" ❌ 提取失败: {error}")
    return False

    if not content:
    print(f" ❌ 内容为空")
    return False

    # 保存文章
    success = self.save_article(article, content)

    if success:
    self.stats['success'] += 1
    else:
    self.stats['failed'] += 1

    self.stats['processed'] += 1

    # 显示进度 - 使用剩余文章数计算进度
    remaining_total = getattr(self, '_remaining_total', self.stats['total_articles'])
    progress = (self.stats['processed'] / remaining_total) * 100
    print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)")

    return success

    def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'):
    """爬取所有文章"""
    # 获取文章链接
    articles = self.fetch_article_links()
    if not articles:
    print("❌ 未找到文章链接")
    return

    # 设置进度管理器
    self.progress_manager.progress_file = progress_file

    # 加载进度
    progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()}

    # 过滤已完成的文章
    remaining_articles = []
    skipped_count = 0

    for article in articles:
    article_id = self._get_article_id(article)

    # 检查是否已在进度记录中完成
    if article_id in progress['completed']:
    skipped_count += 1
    continue

    # 检查文件是否已存在
    if self.is_article_completed(article):
    progress['completed'].add(article_id)
    skipped_count += 1
    continue

    remaining_articles.append(article)

    self.stats['total_articles'] = len(articles)
    remaining_count = len(remaining_articles)
    # 设置剩余文章数用于进度计算
    self._remaining_total = remaining_count

    print(f"🚀 开始爬取文章内容")
    print(f"📊 总文章数: {len(articles)}")
    print(f"✅ 已完成: {skipped_count}")
    print(f"🔄 待处理: {remaining_count}")
    print(f"📁 输出目录: {self.config.output_dir.absolute()}")
    print(f"🔧 并发数: {self.config.max_workers}")
    print(f"💾 进度文件: {progress_file}")
    print("-" * 60)

    if remaining_count == 0:
    print("🎉 所有文章已完成,无需继续爬取!")
    return

    # 设置浏览器
    if not self.driver_manager.setup_driver():
    return

    try:
    # 使用线程池处理文章
    with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
    # 为每个文章创建一个任务
    future_to_article = {}

    for article in remaining_articles:
    future = executor.submit(self.process_single_article, article)
    future_to_article[future] = article

    # 处理完成的任务
    for future in as_completed(future_to_article):
    article = future_to_article[future]
    article_id = self._get_article_id(article)

    try:
    success = future.result()
    if success:
    progress['completed'].add(article_id)
    # 定期保存进度
    if len(progress['completed']) % 10 == 0:
    self.progress_manager.save_progress(progress)
    time.sleep(self.config.delay)
    else:
    progress['failed'].add(article_id)
    except Exception as e:
    article_title = article.get('title', article.get('url', 'Unknown'))
    print(f"❌ 处理文章 {article_title} 时出错: {e}")
    progress['failed'].add(article_id)
    self.stats['failed'] += 1
    self.stats['processed'] += 1

    finally:
    # 最终保存进度
    self.progress_manager.save_progress(progress)
    self.driver_manager.cleanup_driver()

    # 显示最终统计
    self._print_final_stats(progress)

    def _get_article_id(self, article: Dict[str, Any]) -> str:
    """获取文章唯一标识"""
    return article.get('id') or article.get('code') or article.get('url', '')

    def _print_final_stats(self, progress: Dict[str, set]):
    """打印最终统计信息"""
    total_completed = len(progress['completed'])
    total_failed = len(progress['failed'])

    print("\n" + "=" * 60)
    print(f"📊 文章爬取完成统计")
    print("=" * 60)
    print(f"总文章数: {self.stats['total_articles']}")
    print(f"本次处理: {self.stats['processed']}")
    print(f"本次成功: {self.stats['success']}")
    print(f"本次失败: {self.stats['failed']}")
    print(f"累计完成: {total_completed}")
    print(f"累计失败: {total_failed}")
    if self.stats['total_articles'] > 0:
    print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%")
    print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}")
    print(f"💾 进度已保存到: {self.progress_manager.progress_file}")


    # 示例实现:简单的URL列表爬虫
    class SimpleUrlCrawler(BaseArticleCrawler):
    """简单的URL列表爬虫示例"""

    def __init__(self, config: ArticleCrawlerConfig, urls: List[str]):
    super().__init__(config)
    self.urls = urls

    def fetch_article_links(self) -> List[Dict[str, Any]]:
    """从URL列表生成文章信息"""
    articles = []
    for i, url in enumerate(self.urls):
    articles.append({
    'id': str(i),
    'url': url,
    'title': f"Article_{i+1}"
    })
    return articles

    def generate_filename(self, article: Dict[str, Any]) -> str:
    """生成文件名"""
    safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip()
    safe_title = re.sub(r'[-\s]+', '_', safe_title)
    return f"{article['id']}_{safe_title}.md"

    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    """格式化文章内容"""
    return f"""# {article['title']}

    **URL:** {article['url']}
    **ID:** {article['id']}
    **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

    ---

    {content}
    """


    def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig:
    """创建默认配置"""
    return ArticleCrawlerConfig(base_url=base_url, **kwargs)


    if __name__ == '__main__':
    # 示例用法
    config = create_default_config(
    base_url="https://example.com",
    output_dir="./articles",
    delay=2.0,
    max_workers=2
    )

    # 示例URL列表
    urls = [
    "https://example.com/article1",
    "https://example.com/article2"
    ]

    crawler = SimpleUrlCrawler(config, urls)
    crawler.crawl_articles()#!/usr/bin/env python3
    """
    通用文章爬虫框架
    基于Selenium的可扩展文章爬取基础框架

    作者: AI Assistant
    创建时间: 2025-01-27
    """

    import json
    import time
    import argparse
    from pathlib import Path
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from urllib.parse import urljoin, urlparse
    from datetime import datetime
    import re
    import random
    from abc import ABC, abstractmethod
    from typing import List, Dict, Optional, Tuple, Any

    # Selenium相关导入
    from selenium import webdriver
    from selenium.webdriver.chrome.service import Service as ChromeService
    from selenium.webdriver.chrome.options import Options
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
    from webdriver_manager.chrome import ChromeDriverManager

    # HTML转Markdown
    import html2text
    from bs4 import BeautifulSoup


    class ArticleCrawlerConfig:
    """爬虫配置类"""

    def __init__(self,
    base_url: str,
    output_dir: str = "./output",
    delay: float = 2.0,
    max_workers: int = 3,
    headless: bool = True,
    show_browser: bool = False,
    user_agent: str = None,
    cookies: Dict[str, str] = None,
    content_selectors: List[str] = None,
    timeout: int = 30):
    self.base_url = base_url
    self.output_dir = Path(output_dir)
    self.delay = delay
    self.max_workers = max_workers
    self.headless = headless
    self.show_browser = show_browser
    self.timeout = timeout

    # 默认用户代理
    self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'

    # Cookie设置
    self.cookies = cookies or {}

    # 内容选择器(按优先级排序)
    self.content_selectors = content_selectors or [
    "article",
    ".article-content",
    ".content",
    "#content",
    ".post-content",
    "main",
    ".main-content"
    ]

    # 创建输出目录
    self.output_dir.mkdir(parents=True, exist_ok=True)


    class SeleniumDriverManager:
    """Selenium浏览器驱动管理器"""

    def __init__(self, config: ArticleCrawlerConfig):
    self.config = config
    self.driver = None
    self._setup_chrome_options()

    def _setup_chrome_options(self):
    """设置Chrome选项"""
    self.chrome_options = Options()

    # 基础设置
    if self.config.headless and not self.config.show_browser:
    self.chrome_options.add_argument('--headless')

    # 性能和稳定性选项
    chrome_args = [
    '--no-sandbox',
    '--disable-dev-shm-usage',
    '--disable-gpu',
    '--disable-extensions',
    '--disable-plugins',
    '--window-size=1920,1080',
    # 反检测选项
    '--disable-blink-features=AutomationControlled',
    '--disable-web-security',
    '--allow-running-insecure-content',
    '--disable-features=VizDisplayCompositor'
    ]

    for arg in chrome_args:
    self.chrome_options.add_argument(arg)

    # 实验性选项
    self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
    self.chrome_options.add_experimental_option('useAutomationExtension', False)

    # 用户代理
    self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}')

    def setup_driver(self) -> bool:
    """设置Chrome浏览器驱动"""
    try:
    service = ChromeService(ChromeDriverManager().install())
    self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
    self.driver.set_page_load_timeout(self.config.timeout)
    self.driver.implicitly_wait(10)

    # 执行反检测脚本
    anti_detection_scripts = [
    "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})",
    "Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})",
    "Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})",
    "window.chrome = {runtime: {}}"
    ]

    for script in anti_detection_scripts:
    self.driver.execute_script(script)

    # 添加Cookie
    if self.config.cookies:
    self._add_cookies()

    mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式"
    print(f"✅ Chrome浏览器已启动 ({mode_desc})")
    return True

    except Exception as e:
    print(f"❌ 浏览器启动失败: {e}")
    return False

    def _add_cookies(self):
    """添加Cookie"""
    # 先访问基础域名
    domain = urlparse(self.config.base_url).netloc
    self.driver.get(self.config.base_url)

    for name, value in self.config.cookies.items():
    try:
    self.driver.add_cookie({
    'name': name,
    'value': value,
    'domain': f'.{domain}'
    })
    except Exception as e:
    print(f"⚠️ 添加Cookie失败 {name}: {e}")

    def cleanup_driver(self):
    """清理浏览器驱动"""
    if self.driver:
    try:
    self.driver.quit()
    print("✅ 浏览器已关闭")
    except Exception as e:
    print(f"⚠️ 关闭浏览器时出错: {e}")


    class ContentExtractor:
    """内容提取器"""

    def __init__(self, config: ArticleCrawlerConfig):
    self.config = config
    self.html_converter = html2text.HTML2Text()
    self.html_converter.ignore_links = False
    self.html_converter.ignore_images = False
    self.html_converter.body_width = 0

    def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]:
    """提取页面内容"""
    for attempt in range(max_retries):
    try:
    # 随机延迟
    delay = random.uniform(self.config.delay, self.config.delay * 2)
    time.sleep(delay)

    driver.get(url)

    # 等待页面加载
    WebDriverWait(driver, 20).until(
    EC.presence_of_element_located((By.TAG_NAME, "body"))
    )

    # 额外等待确保页面完全加载
    time.sleep(random.uniform(1, 3))

    # 尝试多种选择器找到文章内容
    content_element = self._find_content_element(driver)

    if not content_element:
    if attempt < max_retries - 1:
    print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "No content container found"

    # 获取HTML内容,处理 stale element 异常
    try:
    html_content = content_element.get_attribute('outerHTML')
    except StaleElementReferenceException:
    # 元素已过期,重新查找
    print(f" ⚠️ 元素已过期,重新定位")
    content_element = self._find_content_element(driver)
    if not content_element:
    if attempt < max_retries - 1:
    print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "Content element became stale and could not be relocated"
    html_content = content_element.get_attribute('outerHTML')

    if not html_content or len(html_content.strip()) < 100:
    if attempt < max_retries - 1:
    print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "Content too short"

    # 清理和转换内容
    markdown_content = self._process_html_content(html_content)
    return markdown_content.strip(), None

    except TimeoutException:
    if attempt < max_retries - 1:
    print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}")
    continue
    else:
    return None, "Page load timeout"
    except Exception as e:
    if attempt < max_retries - 1:
    print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}")
    continue
    else:
    return None, f"Extraction error: {str(e)}"

    return None, "Max retries exceeded"

    def _find_content_element(self, driver: webdriver.Chrome):
    """查找内容元素"""
    # 尝试预定义的选择器
    for selector in self.config.content_selectors:
    try:
    # 等待元素出现
    WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.CSS_SELECTOR, selector))
    )
    elements = driver.find_elements(By.CSS_SELECTOR, selector)
    if elements:
    print(f" ✅ 找到内容容器: {selector}")
    return elements[0]
    except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
    continue

    # 如果没找到特定容器,尝试找到包含最多文本的div
    try:
    # 等待页面基本加载完成
    WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.TAG_NAME, "div"))
    )

    divs = driver.find_elements(By.TAG_NAME, "div")
    max_text_length = 0
    best_div = None

    for div in divs:
    try:
    text_length = len(div.text.strip())
    if text_length > max_text_length and text_length > 100:
    max_text_length = text_length
    best_div = div
    except (StaleElementReferenceException, Exception):
    continue

    if best_div:
    print(f" ✅ 找到内容容器: auto-detected div")
    return best_div
    except (TimeoutException, Exception):
    pass

    return None

    def _process_html_content(self, html_content: str) -> str:
    """处理HTML内容"""
    # 使用BeautifulSoup清理HTML
    soup = BeautifulSoup(html_content, 'html.parser')

    # 移除不需要的元素
    unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside']
    for tag in soup.find_all(unwanted_tags):
    tag.decompose()

    # 转换为Markdown
    markdown_content = self.html_converter.handle(str(soup))
    return markdown_content


    class ProgressManager:
    """进度管理器"""

    def __init__(self, progress_file: str = 'crawl_progress.json'):
    self.progress_file = progress_file

    def load_progress(self) -> Dict[str, set]:
    """加载爬取进度"""
    progress_path = Path(self.progress_file)
    if not progress_path.exists():
    return {'completed': set(), 'failed': set()}

    try:
    with open(progress_path, 'r', encoding='utf-8') as f:
    data = json.load(f)
    return {
    'completed': set(data.get('completed', [])),
    'failed': set(data.get('failed', []))
    }
    except Exception as e:
    print(f"⚠️ 读取进度文件失败: {e},将重新开始")
    return {'completed': set(), 'failed': set()}

    def save_progress(self, progress: Dict[str, set]):
    """保存爬取进度"""
    try:
    data = {
    'completed': list(progress['completed']),
    'failed': list(progress['failed']),
    'last_update': datetime.now().isoformat()
    }
    with open(self.progress_file, 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception as e:
    print(f"⚠️ 保存进度文件失败: {e}")


    class BaseArticleCrawler(ABC):
    """文章爬虫基类"""

    def __init__(self, config: ArticleCrawlerConfig):
    self.config = config
    self.driver_manager = SeleniumDriverManager(config)
    self.content_extractor = ContentExtractor(config)
    self.progress_manager = ProgressManager()

    # 统计信息
    self.stats = {
    'total_articles': 0,
    'processed': 0,
    'success': 0,
    'failed': 0
    }

    @abstractmethod
    def fetch_article_links(self) -> List[Dict[str, Any]]:
    """获取文章链接列表 - 子类必须实现"""
    pass

    @abstractmethod
    def generate_filename(self, article: Dict[str, Any]) -> str:
    """生成文件名 - 子类必须实现"""
    pass

    @abstractmethod
    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    """格式化文章内容 - 子类必须实现"""
    pass

    def is_article_completed(self, article: Dict[str, Any]) -> bool:
    """检查文章是否已完成"""
    filename = self.generate_filename(article)
    filepath = self.config.output_dir / filename
    return filepath.exists()

    def save_article(self, article: Dict[str, Any], content: str) -> bool:
    """保存文章为Markdown文件"""
    try:
    filename = self.generate_filename(article)
    filepath = self.config.output_dir / filename

    # 格式化完整内容
    full_content = self.format_article_content(article, content)

    with open(filepath, 'w', encoding='utf-8') as f:
    f.write(full_content)

    print(f" 💾 已保存: {filename}")
    return True

    except Exception as e:
    print(f" ❌ 保存失败: {e}")
    return False

    def process_single_article(self, article: Dict[str, Any]) -> bool:
    """处理单篇文章"""
    article_title = article.get('title', article.get('url', 'Unknown'))
    print(f"📄 处理: {article_title}")

    # 提取内容
    content, error = self.content_extractor.extract_content(
    self.driver_manager.driver,
    article['url']
    )

    if error:
    print(f" ❌ 提取失败: {error}")
    return False

    if not content:
    print(f" ❌ 内容为空")
    return False

    # 保存文章
    success = self.save_article(article, content)

    if success:
    self.stats['success'] += 1
    else:
    self.stats['failed'] += 1

    self.stats['processed'] += 1

    # 显示进度 - 使用剩余文章数计算进度
    remaining_total = getattr(self, '_remaining_total', self.stats['total_articles'])
    progress = (self.stats['processed'] / remaining_total) * 100
    print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)")

    return success

    def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'):
    """爬取所有文章"""
    # 获取文章链接
    articles = self.fetch_article_links()
    if not articles:
    print("❌ 未找到文章链接")
    return

    # 设置进度管理器
    self.progress_manager.progress_file = progress_file

    # 加载进度
    progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()}

    # 过滤已完成的文章
    remaining_articles = []
    skipped_count = 0

    for article in articles:
    article_id = self._get_article_id(article)

    # 检查是否已在进度记录中完成
    if article_id in progress['completed']:
    skipped_count += 1
    continue

    # 检查文件是否已存在
    if self.is_article_completed(article):
    progress['completed'].add(article_id)
    skipped_count += 1
    continue

    remaining_articles.append(article)

    self.stats['total_articles'] = len(articles)
    remaining_count = len(remaining_articles)
    # 设置剩余文章数用于进度计算
    self._remaining_total = remaining_count

    print(f"🚀 开始爬取文章内容")
    print(f"📊 总文章数: {len(articles)}")
    print(f"✅ 已完成: {skipped_count}")
    print(f"🔄 待处理: {remaining_count}")
    print(f"📁 输出目录: {self.config.output_dir.absolute()}")
    print(f"🔧 并发数: {self.config.max_workers}")
    print(f"💾 进度文件: {progress_file}")
    print("-" * 60)

    if remaining_count == 0:
    print("🎉 所有文章已完成,无需继续爬取!")
    return

    # 设置浏览器
    if not self.driver_manager.setup_driver():
    return

    try:
    # 使用线程池处理文章
    with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
    # 为每个文章创建一个任务
    future_to_article = {}

    for article in remaining_articles:
    future = executor.submit(self.process_single_article, article)
    future_to_article[future] = article

    # 处理完成的任务
    for future in as_completed(future_to_article):
    article = future_to_article[future]
    article_id = self._get_article_id(article)

    try:
    success = future.result()
    if success:
    progress['completed'].add(article_id)
    # 定期保存进度
    if len(progress['completed']) % 10 == 0:
    self.progress_manager.save_progress(progress)
    time.sleep(self.config.delay)
    else:
    progress['failed'].add(article_id)
    except Exception as e:
    article_title = article.get('title', article.get('url', 'Unknown'))
    print(f"❌ 处理文章 {article_title} 时出错: {e}")
    progress['failed'].add(article_id)
    self.stats['failed'] += 1
    self.stats['processed'] += 1

    finally:
    # 最终保存进度
    self.progress_manager.save_progress(progress)
    self.driver_manager.cleanup_driver()

    # 显示最终统计
    self._print_final_stats(progress)

    def _get_article_id(self, article: Dict[str, Any]) -> str:
    """获取文章唯一标识"""
    return article.get('id') or article.get('code') or article.get('url', '')

    def _print_final_stats(self, progress: Dict[str, set]):
    """打印最终统计信息"""
    total_completed = len(progress['completed'])
    total_failed = len(progress['failed'])

    print("\n" + "=" * 60)
    print(f"📊 文章爬取完成统计")
    print("=" * 60)
    print(f"总文章数: {self.stats['total_articles']}")
    print(f"本次处理: {self.stats['processed']}")
    print(f"本次成功: {self.stats['success']}")
    print(f"本次失败: {self.stats['failed']}")
    print(f"累计完成: {total_completed}")
    print(f"累计失败: {total_failed}")
    if self.stats['total_articles'] > 0:
    print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%")
    print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}")
    print(f"💾 进度已保存到: {self.progress_manager.progress_file}")


    # 示例实现:简单的URL列表爬虫
    class SimpleUrlCrawler(BaseArticleCrawler):
    """简单的URL列表爬虫示例"""

    def __init__(self, config: ArticleCrawlerConfig, urls: List[str]):
    super().__init__(config)
    self.urls = urls

    def fetch_article_links(self) -> List[Dict[str, Any]]:
    """从URL列表生成文章信息"""
    articles = []
    for i, url in enumerate(self.urls):
    articles.append({
    'id': str(i),
    'url': url,
    'title': f"Article_{i+1}"
    })
    return articles

    def generate_filename(self, article: Dict[str, Any]) -> str:
    """生成文件名"""
    safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip()
    safe_title = re.sub(r'[-\s]+', '_', safe_title)
    return f"{article['id']}_{safe_title}.md"

    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    """格式化文章内容"""
    return f"""# {article['title']}

    **URL:** {article['url']}
    **ID:** {article['id']}
    **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

    ---

    {content}
    """


    def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig:
    """创建默认配置"""
    return ArticleCrawlerConfig(base_url=base_url, **kwargs)


    if __name__ == '__main__':
    # 示例用法
    config = create_default_config(
    base_url="https://example.com",
    output_dir="./articles",
    delay=2.0,
    max_workers=2
    )

    # 示例URL列表
    urls = [
    "https://example.com/article1",
    "https://example.com/article2"
    ]

    crawler = SimpleUrlCrawler(config, urls)
    crawler.crawl_articles()
    ```

    # 使用指南

    # 通用文章爬虫框架使用指南

    ## 概述

    本框架提供了一个基于Selenium的通用文章爬虫基础架构,可以轻松扩展用于不同网站的文章爬取。框架采用面向对象设计,支持断点续传、并发处理、内容提取和格式化等功能。

    ## 框架结构

    ### 核心组件

    1. **ArticleCrawlerConfig** - 爬虫配置类
    2. **SeleniumDriverManager** - 浏览器驱动管理器
    3. **ContentExtractor** - 内容提取器
    4. **ProgressManager** - 进度管理器
    5. **BaseArticleCrawler** - 抽象基类

    ### 文件结构

    ```
    scripts/
    ├── generic_article_crawler.py # 通用框架核心
    ├── binance_faq_generic_crawler.py # Binance FAQ实现示例
    ├── crawler_framework_guide.md # 本使用指南
    └── your_custom_crawler.py # 你的自定义爬虫
    ```

    ## 快速开始

    ### 1. 创建自定义爬虫

    继承 `BaseArticleCrawler` 类并实现必需的抽象方法:

    ```python
    from generic_article_crawler import BaseArticleCrawler, ArticleCrawlerConfig

    class YourCustomCrawler(BaseArticleCrawler):
    def fetch_article_links(self) -> List[Dict[str, Any]]:
    """获取文章链接列表 - 必须实现"""
    # 返回文章列表,每个文章包含 id, url, title 等字段
    pass

    def generate_filename(self, article: Dict[str, Any]) -> str:
    """生成文件名 - 必须实现"""
    # 根据文章信息生成安全的文件名
    pass

    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    """格式化文章内容 - 必须实现"""
    # 添加元数据和格式化内容
    pass
    ```

    ### 2. 配置爬虫

    ```python
    from generic_article_crawler import ArticleCrawlerConfig

    config = ArticleCrawlerConfig(
    base_url="https://example.com",
    output_dir="./articles",
    delay=2.0,
    max_workers=3,
    headless=True,
    user_agent="Your User Agent",
    cookies={"key": "value"},
    content_selectors=[".article-content", "article", ".content"]
    )
    ```

    ### 3. 运行爬虫

    ```python
    crawler = YourCustomCrawler(config)
    crawler.crawl_articles(resume=True, progress_file='progress.json')
    ```

    ## 详细实现指南

    ### 实现 fetch_article_links()

    这个方法负责获取所有需要爬取的文章链接。返回格式:

    ```python
    def fetch_article_links(self) -> List[Dict[str, Any]]:
    articles = []

    # 方法1: 从API获取
    response = requests.get("https://api.example.com/articles")
    data = response.json()

    for item in data['articles']:
    articles.append({
    'id': item['id'],
    'url': item['url'],
    'title': item['title'],
    # 其他自定义字段
    })

    # 方法2: 从HTML页面解析
    # 方法3: 从本地文件读取

    return articles
    ```

    ### 实现 generate_filename()

    生成安全的文件名,避免特殊字符:

    ```python
    import re

    def generate_filename(self, article: Dict[str, Any]) -> str:
    title = article['title']
    # 清理特殊字符
    safe_title = re.sub(r'[^\w\s-]', '', title).strip()
    safe_title = re.sub(r'[-\s]+', '_', safe_title)

    # 限制长度
    if len(safe_title) > 100:
    safe_title = safe_title[:100]

    return f"{article['id']}_{safe_title}.md"
    ```

    ### 实现 format_article_content()

    格式化最终的文章内容:

    ```python
    from datetime import datetime

    def format_article_content(self, article: Dict[str, Any], content: str) -> str:
    return f"""# {article['title']}

    **ID:** {article['id']}
    **URL:** {article['url']}
    **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

    ---

    {content}

    ---

    *本文档由爬虫自动生成*
    """
    ```

    ## 配置选项详解

    ### ArticleCrawlerConfig 参数

    | 参数 | 类型 | 默认值 | 说明 |
    |------|------|--------|------|
    | base_url | str | 必需 | 网站基础URL |
    | output_dir | str | "./output" | 输出目录 |
    | delay | float | 2.0 | 请求间隔(秒) |
    | max_workers | int | 3 | 并发线程数 |
    | headless | bool | True | 无头模式 |
    | show_browser | bool | False | 显示浏览器 |
    | user_agent | str | 默认UA | 用户代理 |
    | cookies | Dict | {} | Cookie字典 |
    | content_selectors | List[str] | 默认选择器 | 内容选择器列表 |
    | timeout | int | 30 | 页面加载超时 |

    ### 内容选择器

    框架会按优先级尝试以下选择器:

    ```python
    content_selectors = [
    "article", # HTML5 article标签
    ".article-content", # 常见类名
    ".content",
    "#content",
    ".post-content",
    "main",
    ".main-content"
    ]
    ```

    你可以根据目标网站的HTML结构自定义选择器。

    ## 高级功能

    ### 断点续传

    框架自动支持断点续传:

    ```python
    # 启用断点续传(默认)
    crawler.crawl_articles(resume=True, progress_file='progress.json')

    # 禁用断点续传,重新开始
    crawler.crawl_articles(resume=False)
    ```

    ### 自定义Cookie和反检测

    ```python
    config = ArticleCrawlerConfig(
    base_url="https://example.com",
    cookies={
    'session_id': 'your_session_id',
    'csrf_token': 'your_csrf_token'
    },
    user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    )
    ```

    ### 并发控制

    ```python
    # 低并发,适合严格的网站
    config = ArticleCrawlerConfig(max_workers=1, delay=5.0)

    # 高并发,适合宽松的网站
    config = ArticleCrawlerConfig(max_workers=5, delay=1.0)
    ```

    ## 实际案例

    ### 案例1: 博客文章爬虫

    ```python
    class BlogCrawler(BaseArticleCrawler):
    def __init__(self, config, blog_url):
    super().__init__(config)
    self.blog_url = blog_url

    def fetch_article_links(self):
    # 从博客首页解析文章链接
    articles = []
    # 实现解析逻辑...
    return articles

    def generate_filename(self, article):
    date = article.get('date', '').replace('-', '_')
    title = re.sub(r'[^\w\s-]', '', article['title'])[:50]
    return f"{date}_{title}.md"

    def format_article_content(self, article, content):
    return f"""# {article['title']}

    **作者:** {article.get('author', 'Unknown')}
    **发布时间:** {article.get('date', 'Unknown')}
    **原文链接:** {article['url']}

    {content}
    """
    ```

    ### 案例2: 新闻网站爬虫

    ```python
    class NewsCrawler(BaseArticleCrawler):
    def fetch_article_links(self):
    # 从RSS或API获取新闻列表
    pass

    def generate_filename(self, article):
    category = article.get('category', 'general')
    timestamp = article.get('timestamp', int(time.time()))
    return f"{category}_{timestamp}_{article['id']}.md"
    ```

    ## 最佳实践

    ### 1. 遵守网站规则
    - 检查 robots.txt
    - 设置合理的延迟时间
    - 避免过高的并发数
    - 尊重网站的反爬虫措施

    ### 2. 错误处理
    - 实现重试机制
    - 记录失败的文章
    - 监控爬取状态

    ### 3. 性能优化
    - 根据网站响应调整延迟
    - 使用无头模式提高效率
    - 定期清理浏览器缓存

    ### 4. 数据质量
    - 验证提取的内容
    - 清理HTML标签
    - 处理编码问题

    ## 故障排除

    ### 常见问题

    1. **浏览器启动失败**
    - 检查Chrome是否已安装
    - 更新ChromeDriver
    - 检查系统权限

    2. **内容提取失败**
    - 检查content_selectors配置
    - 查看页面HTML结构
    - 调整等待时间

    3. **反爬虫检测**
    - 增加延迟时间
    - 更换User-Agent
    - 添加必要的Cookie
    - 使用代理IP

    4. **内存占用过高**
    - 减少并发数
    - 定期重启浏览器
    - 清理临时文件

    ### 调试技巧

    ```python
    # 启用浏览器显示模式进行调试
    config = ArticleCrawlerConfig(
    base_url="https://example.com",
    headless=False,
    show_browser=True
    )

    # 单线程模式便于调试
    config.max_workers = 1
    ```

    ## 扩展开发

    ### 添加新的内容提取器

    ```python
    class CustomContentExtractor(ContentExtractor):
    def extract_content(self, driver, url, max_retries=3):
    # 自定义提取逻辑
    pass

    # 在爬虫中使用
    class YourCrawler(BaseArticleCrawler):
    def __init__(self, config):
    super().__init__(config)
    self.content_extractor = CustomContentExtractor(config)
    ```

    ### 添加新的进度管理器

    ```python
    class DatabaseProgressManager(ProgressManager):
    def load_progress(self):
    # 从数据库加载进度
    pass

    def save_progress(self, progress):
    # 保存进度到数据库
    pass
    ```

    ## 总结

    通用文章爬虫框架提供了:

    - 🏗️ **模块化设计** - 易于扩展和维护
    - 🔄 **断点续传** - 支持大规模爬取任务
    - 🚀 **并发处理** - 提高爬取效率
    - 🛡️ **反检测** - 内置多种反爬虫措施
    - 📊 **进度跟踪** - 实时监控爬取状态
    - 🎯 **智能提取** - 自动识别内容区域
    - 📝 **格式化输出** - 生成标准Markdown文档

    通过继承基类并实现三个核心方法,你可以快速为任何网站创建专用的文章爬虫。框架处理了所有底层细节,让你专注于业务逻辑的实现。