Last active
August 29, 2025 07:57
-
-
Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.
generic_article_crawler.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| 通用文章爬虫框架 | |
| 基于Selenium的可扩展文章爬取基础框架 | |
| 作者: AI Assistant | |
| 创建时间: 2025-01-27 | |
| """ | |
| import json | |
| import time | |
| import argparse | |
| from pathlib import Path | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from urllib.parse import urljoin, urlparse | |
| from datetime import datetime | |
| import re | |
| import random | |
| from abc import ABC, abstractmethod | |
| from typing import List, Dict, Optional, Tuple, Any | |
| # Selenium相关导入 | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service as ChromeService | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| # HTML转Markdown | |
| import html2text | |
| from bs4 import BeautifulSoup | |
| class ArticleCrawlerConfig: | |
| """爬虫配置类""" | |
| def __init__(self, | |
| base_url: str, | |
| output_dir: str = "./output", | |
| delay: float = 2.0, | |
| max_workers: int = 3, | |
| headless: bool = True, | |
| show_browser: bool = False, | |
| user_agent: str = None, | |
| cookies: Dict[str, str] = None, | |
| content_selectors: List[str] = None, | |
| timeout: int = 30): | |
| self.base_url = base_url | |
| self.output_dir = Path(output_dir) | |
| self.delay = delay | |
| self.max_workers = max_workers | |
| self.headless = headless | |
| self.show_browser = show_browser | |
| self.timeout = timeout | |
| # 默认用户代理 | |
| self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36' | |
| # Cookie设置 | |
| self.cookies = cookies or {} | |
| # 内容选择器(按优先级排序) | |
| self.content_selectors = content_selectors or [ | |
| "article", | |
| ".article-content", | |
| ".content", | |
| "#content", | |
| ".post-content", | |
| "main", | |
| ".main-content" | |
| ] | |
| # 创建输出目录 | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| class SeleniumDriverManager: | |
| """Selenium浏览器驱动管理器""" | |
| def __init__(self, config: ArticleCrawlerConfig): | |
| self.config = config | |
| self.driver = None | |
| self._setup_chrome_options() | |
| def _setup_chrome_options(self): | |
| """设置Chrome选项""" | |
| self.chrome_options = Options() | |
| # 基础设置 | |
| if self.config.headless and not self.config.show_browser: | |
| self.chrome_options.add_argument('--headless') | |
| # 性能和稳定性选项 | |
| chrome_args = [ | |
| '--no-sandbox', | |
| '--disable-dev-shm-usage', | |
| '--disable-gpu', | |
| '--disable-extensions', | |
| '--disable-plugins', | |
| '--window-size=1920,1080', | |
| # 反检测选项 | |
| '--disable-blink-features=AutomationControlled', | |
| '--disable-web-security', | |
| '--allow-running-insecure-content', | |
| '--disable-features=VizDisplayCompositor' | |
| ] | |
| for arg in chrome_args: | |
| self.chrome_options.add_argument(arg) | |
| # 实验性选项 | |
| self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) | |
| self.chrome_options.add_experimental_option('useAutomationExtension', False) | |
| # 用户代理 | |
| self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}') | |
| def setup_driver(self) -> bool: | |
| """设置Chrome浏览器驱动""" | |
| try: | |
| service = ChromeService(ChromeDriverManager().install()) | |
| self.driver = webdriver.Chrome(service=service, options=self.chrome_options) | |
| self.driver.set_page_load_timeout(self.config.timeout) | |
| self.driver.implicitly_wait(10) | |
| # 执行反检测脚本 | |
| anti_detection_scripts = [ | |
| "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})", | |
| "Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})", | |
| "Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})", | |
| "window.chrome = {runtime: {}}" | |
| ] | |
| for script in anti_detection_scripts: | |
| self.driver.execute_script(script) | |
| # 添加Cookie | |
| if self.config.cookies: | |
| self._add_cookies() | |
| mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式" | |
| print(f"✅ Chrome浏览器已启动 ({mode_desc})") | |
| return True | |
| except Exception as e: | |
| print(f"❌ 浏览器启动失败: {e}") | |
| return False | |
| def _add_cookies(self): | |
| """添加Cookie""" | |
| # 先访问基础域名 | |
| domain = urlparse(self.config.base_url).netloc | |
| self.driver.get(self.config.base_url) | |
| for name, value in self.config.cookies.items(): | |
| try: | |
| self.driver.add_cookie({ | |
| 'name': name, | |
| 'value': value, | |
| 'domain': f'.{domain}' | |
| }) | |
| except Exception as e: | |
| print(f"⚠️ 添加Cookie失败 {name}: {e}") | |
| def cleanup_driver(self): | |
| """清理浏览器驱动""" | |
| if self.driver: | |
| try: | |
| self.driver.quit() | |
| print("✅ 浏览器已关闭") | |
| except Exception as e: | |
| print(f"⚠️ 关闭浏览器时出错: {e}") | |
| class ContentExtractor: | |
| """内容提取器""" | |
| def __init__(self, config: ArticleCrawlerConfig): | |
| self.config = config | |
| self.html_converter = html2text.HTML2Text() | |
| self.html_converter.ignore_links = False | |
| self.html_converter.ignore_images = False | |
| self.html_converter.body_width = 0 | |
| def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]: | |
| """提取页面内容""" | |
| for attempt in range(max_retries): | |
| try: | |
| # 随机延迟 | |
| delay = random.uniform(self.config.delay, self.config.delay * 2) | |
| time.sleep(delay) | |
| driver.get(url) | |
| # 等待页面加载 | |
| WebDriverWait(driver, 20).until( | |
| EC.presence_of_element_located((By.TAG_NAME, "body")) | |
| ) | |
| # 额外等待确保页面完全加载 | |
| time.sleep(random.uniform(1, 3)) | |
| # 尝试多种选择器找到文章内容 | |
| content_element = self._find_content_element(driver) | |
| if not content_element: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "No content container found" | |
| # 获取HTML内容,处理 stale element 异常 | |
| try: | |
| html_content = content_element.get_attribute('outerHTML') | |
| except StaleElementReferenceException: | |
| # 元素已过期,重新查找 | |
| print(f" ⚠️ 元素已过期,重新定位") | |
| content_element = self._find_content_element(driver) | |
| if not content_element: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "Content element became stale and could not be relocated" | |
| html_content = content_element.get_attribute('outerHTML') | |
| if not html_content or len(html_content.strip()) < 100: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "Content too short" | |
| # 清理和转换内容 | |
| markdown_content = self._process_html_content(html_content) | |
| return markdown_content.strip(), None | |
| except TimeoutException: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "Page load timeout" | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}") | |
| continue | |
| else: | |
| return None, f"Extraction error: {str(e)}" | |
| return None, "Max retries exceeded" | |
| def _find_content_element(self, driver: webdriver.Chrome): | |
| """查找内容元素""" | |
| # 尝试预定义的选择器 | |
| for selector in self.config.content_selectors: | |
| try: | |
| # 等待元素出现 | |
| WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, selector)) | |
| ) | |
| elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
| if elements: | |
| print(f" ✅ 找到内容容器: {selector}") | |
| return elements[0] | |
| except (TimeoutException, NoSuchElementException, StaleElementReferenceException): | |
| continue | |
| # 如果没找到特定容器,尝试找到包含最多文本的div | |
| try: | |
| # 等待页面基本加载完成 | |
| WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.TAG_NAME, "div")) | |
| ) | |
| divs = driver.find_elements(By.TAG_NAME, "div") | |
| max_text_length = 0 | |
| best_div = None | |
| for div in divs: | |
| try: | |
| text_length = len(div.text.strip()) | |
| if text_length > max_text_length and text_length > 100: | |
| max_text_length = text_length | |
| best_div = div | |
| except (StaleElementReferenceException, Exception): | |
| continue | |
| if best_div: | |
| print(f" ✅ 找到内容容器: auto-detected div") | |
| return best_div | |
| except (TimeoutException, Exception): | |
| pass | |
| return None | |
| def _process_html_content(self, html_content: str) -> str: | |
| """处理HTML内容""" | |
| # 使用BeautifulSoup清理HTML | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # 移除不需要的元素 | |
| unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside'] | |
| for tag in soup.find_all(unwanted_tags): | |
| tag.decompose() | |
| # 转换为Markdown | |
| markdown_content = self.html_converter.handle(str(soup)) | |
| return markdown_content | |
| class ProgressManager: | |
| """进度管理器""" | |
| def __init__(self, progress_file: str = 'crawl_progress.json'): | |
| self.progress_file = progress_file | |
| def load_progress(self) -> Dict[str, set]: | |
| """加载爬取进度""" | |
| progress_path = Path(self.progress_file) | |
| if not progress_path.exists(): | |
| return {'completed': set(), 'failed': set()} | |
| try: | |
| with open(progress_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return { | |
| 'completed': set(data.get('completed', [])), | |
| 'failed': set(data.get('failed', [])) | |
| } | |
| except Exception as e: | |
| print(f"⚠️ 读取进度文件失败: {e},将重新开始") | |
| return {'completed': set(), 'failed': set()} | |
| def save_progress(self, progress: Dict[str, set]): | |
| """保存爬取进度""" | |
| try: | |
| data = { | |
| 'completed': list(progress['completed']), | |
| 'failed': list(progress['failed']), | |
| 'last_update': datetime.now().isoformat() | |
| } | |
| with open(self.progress_file, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"⚠️ 保存进度文件失败: {e}") | |
| class BaseArticleCrawler(ABC): | |
| """文章爬虫基类""" | |
| def __init__(self, config: ArticleCrawlerConfig): | |
| self.config = config | |
| self.driver_manager = SeleniumDriverManager(config) | |
| self.content_extractor = ContentExtractor(config) | |
| self.progress_manager = ProgressManager() | |
| # 统计信息 | |
| self.stats = { | |
| 'total_articles': 0, | |
| 'processed': 0, | |
| 'success': 0, | |
| 'failed': 0 | |
| } | |
| @abstractmethod | |
| def fetch_article_links(self) -> List[Dict[str, Any]]: | |
| """获取文章链接列表 - 子类必须实现""" | |
| pass | |
| @abstractmethod | |
| def generate_filename(self, article: Dict[str, Any]) -> str: | |
| """生成文件名 - 子类必须实现""" | |
| pass | |
| @abstractmethod | |
| def format_article_content(self, article: Dict[str, Any], content: str) -> str: | |
| """格式化文章内容 - 子类必须实现""" | |
| pass | |
| def is_article_completed(self, article: Dict[str, Any]) -> bool: | |
| """检查文章是否已完成""" | |
| filename = self.generate_filename(article) | |
| filepath = self.config.output_dir / filename | |
| return filepath.exists() | |
| def save_article(self, article: Dict[str, Any], content: str) -> bool: | |
| """保存文章为Markdown文件""" | |
| try: | |
| filename = self.generate_filename(article) | |
| filepath = self.config.output_dir / filename | |
| # 格式化完整内容 | |
| full_content = self.format_article_content(article, content) | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(full_content) | |
| print(f" 💾 已保存: {filename}") | |
| return True | |
| except Exception as e: | |
| print(f" ❌ 保存失败: {e}") | |
| return False | |
| def process_single_article(self, article: Dict[str, Any]) -> bool: | |
| """处理单篇文章""" | |
| article_title = article.get('title', article.get('url', 'Unknown')) | |
| print(f"📄 处理: {article_title}") | |
| # 提取内容 | |
| content, error = self.content_extractor.extract_content( | |
| self.driver_manager.driver, | |
| article['url'] | |
| ) | |
| if error: | |
| print(f" ❌ 提取失败: {error}") | |
| return False | |
| if not content: | |
| print(f" ❌ 内容为空") | |
| return False | |
| # 保存文章 | |
| success = self.save_article(article, content) | |
| if success: | |
| self.stats['success'] += 1 | |
| else: | |
| self.stats['failed'] += 1 | |
| self.stats['processed'] += 1 | |
| # 显示进度 - 使用剩余文章数计算进度 | |
| remaining_total = getattr(self, '_remaining_total', self.stats['total_articles']) | |
| progress = (self.stats['processed'] / remaining_total) * 100 | |
| print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)") | |
| return success | |
| def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'): | |
| """爬取所有文章""" | |
| # 获取文章链接 | |
| articles = self.fetch_article_links() | |
| if not articles: | |
| print("❌ 未找到文章链接") | |
| return | |
| # 设置进度管理器 | |
| self.progress_manager.progress_file = progress_file | |
| # 加载进度 | |
| progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()} | |
| # 过滤已完成的文章 | |
| remaining_articles = [] | |
| skipped_count = 0 | |
| for article in articles: | |
| article_id = self._get_article_id(article) | |
| # 检查是否已在进度记录中完成 | |
| if article_id in progress['completed']: | |
| skipped_count += 1 | |
| continue | |
| # 检查文件是否已存在 | |
| if self.is_article_completed(article): | |
| progress['completed'].add(article_id) | |
| skipped_count += 1 | |
| continue | |
| remaining_articles.append(article) | |
| self.stats['total_articles'] = len(articles) | |
| remaining_count = len(remaining_articles) | |
| # 设置剩余文章数用于进度计算 | |
| self._remaining_total = remaining_count | |
| print(f"🚀 开始爬取文章内容") | |
| print(f"📊 总文章数: {len(articles)}") | |
| print(f"✅ 已完成: {skipped_count}") | |
| print(f"🔄 待处理: {remaining_count}") | |
| print(f"📁 输出目录: {self.config.output_dir.absolute()}") | |
| print(f"🔧 并发数: {self.config.max_workers}") | |
| print(f"💾 进度文件: {progress_file}") | |
| print("-" * 60) | |
| if remaining_count == 0: | |
| print("🎉 所有文章已完成,无需继续爬取!") | |
| return | |
| # 设置浏览器 | |
| if not self.driver_manager.setup_driver(): | |
| return | |
| try: | |
| # 使用线程池处理文章 | |
| with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: | |
| # 为每个文章创建一个任务 | |
| future_to_article = {} | |
| for article in remaining_articles: | |
| future = executor.submit(self.process_single_article, article) | |
| future_to_article[future] = article | |
| # 处理完成的任务 | |
| for future in as_completed(future_to_article): | |
| article = future_to_article[future] | |
| article_id = self._get_article_id(article) | |
| try: | |
| success = future.result() | |
| if success: | |
| progress['completed'].add(article_id) | |
| # 定期保存进度 | |
| if len(progress['completed']) % 10 == 0: | |
| self.progress_manager.save_progress(progress) | |
| time.sleep(self.config.delay) | |
| else: | |
| progress['failed'].add(article_id) | |
| except Exception as e: | |
| article_title = article.get('title', article.get('url', 'Unknown')) | |
| print(f"❌ 处理文章 {article_title} 时出错: {e}") | |
| progress['failed'].add(article_id) | |
| self.stats['failed'] += 1 | |
| self.stats['processed'] += 1 | |
| finally: | |
| # 最终保存进度 | |
| self.progress_manager.save_progress(progress) | |
| self.driver_manager.cleanup_driver() | |
| # 显示最终统计 | |
| self._print_final_stats(progress) | |
| def _get_article_id(self, article: Dict[str, Any]) -> str: | |
| """获取文章唯一标识""" | |
| return article.get('id') or article.get('code') or article.get('url', '') | |
| def _print_final_stats(self, progress: Dict[str, set]): | |
| """打印最终统计信息""" | |
| total_completed = len(progress['completed']) | |
| total_failed = len(progress['failed']) | |
| print("\n" + "=" * 60) | |
| print(f"📊 文章爬取完成统计") | |
| print("=" * 60) | |
| print(f"总文章数: {self.stats['total_articles']}") | |
| print(f"本次处理: {self.stats['processed']}") | |
| print(f"本次成功: {self.stats['success']}") | |
| print(f"本次失败: {self.stats['failed']}") | |
| print(f"累计完成: {total_completed}") | |
| print(f"累计失败: {total_failed}") | |
| if self.stats['total_articles'] > 0: | |
| print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%") | |
| print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}") | |
| print(f"💾 进度已保存到: {self.progress_manager.progress_file}") | |
| # 示例实现:简单的URL列表爬虫 | |
| class SimpleUrlCrawler(BaseArticleCrawler): | |
| """简单的URL列表爬虫示例""" | |
| def __init__(self, config: ArticleCrawlerConfig, urls: List[str]): | |
| super().__init__(config) | |
| self.urls = urls | |
| def fetch_article_links(self) -> List[Dict[str, Any]]: | |
| """从URL列表生成文章信息""" | |
| articles = [] | |
| for i, url in enumerate(self.urls): | |
| articles.append({ | |
| 'id': str(i), | |
| 'url': url, | |
| 'title': f"Article_{i+1}" | |
| }) | |
| return articles | |
| def generate_filename(self, article: Dict[str, Any]) -> str: | |
| """生成文件名""" | |
| safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip() | |
| safe_title = re.sub(r'[-\s]+', '_', safe_title) | |
| return f"{article['id']}_{safe_title}.md" | |
| def format_article_content(self, article: Dict[str, Any], content: str) -> str: | |
| """格式化文章内容""" | |
| return f"""# {article['title']} | |
| **URL:** {article['url']} | |
| **ID:** {article['id']} | |
| **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| --- | |
| {content} | |
| """ | |
| def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig: | |
| """创建默认配置""" | |
| return ArticleCrawlerConfig(base_url=base_url, **kwargs) | |
| if __name__ == '__main__': | |
| # 示例用法 | |
| config = create_default_config( | |
| base_url="https://example.com", | |
| output_dir="./articles", | |
| delay=2.0, | |
| max_workers=2 | |
| ) | |
| # 示例URL列表 | |
| urls = [ | |
| "https://example.com/article1", | |
| "https://example.com/article2" | |
| ] | |
| crawler = SimpleUrlCrawler(config, urls) | |
| crawler.crawl_articles()#!/usr/bin/env python3 | |
| """ | |
| 通用文章爬虫框架 | |
| 基于Selenium的可扩展文章爬取基础框架 | |
| 作者: AI Assistant | |
| 创建时间: 2025-01-27 | |
| """ | |
| import json | |
| import time | |
| import argparse | |
| from pathlib import Path | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from urllib.parse import urljoin, urlparse | |
| from datetime import datetime | |
| import re | |
| import random | |
| from abc import ABC, abstractmethod | |
| from typing import List, Dict, Optional, Tuple, Any | |
| # Selenium相关导入 | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service as ChromeService | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| # HTML转Markdown | |
| import html2text | |
| from bs4 import BeautifulSoup | |
| class ArticleCrawlerConfig: | |
| """爬虫配置类""" | |
| def __init__(self, | |
| base_url: str, | |
| output_dir: str = "./output", | |
| delay: float = 2.0, | |
| max_workers: int = 3, | |
| headless: bool = True, | |
| show_browser: bool = False, | |
| user_agent: str = None, | |
| cookies: Dict[str, str] = None, | |
| content_selectors: List[str] = None, | |
| timeout: int = 30): | |
| self.base_url = base_url | |
| self.output_dir = Path(output_dir) | |
| self.delay = delay | |
| self.max_workers = max_workers | |
| self.headless = headless | |
| self.show_browser = show_browser | |
| self.timeout = timeout | |
| # 默认用户代理 | |
| self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36' | |
| # Cookie设置 | |
| self.cookies = cookies or {} | |
| # 内容选择器(按优先级排序) | |
| self.content_selectors = content_selectors or [ | |
| "article", | |
| ".article-content", | |
| ".content", | |
| "#content", | |
| ".post-content", | |
| "main", | |
| ".main-content" | |
| ] | |
| # 创建输出目录 | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| class SeleniumDriverManager: | |
| """Selenium浏览器驱动管理器""" | |
| def __init__(self, config: ArticleCrawlerConfig): | |
| self.config = config | |
| self.driver = None | |
| self._setup_chrome_options() | |
| def _setup_chrome_options(self): | |
| """设置Chrome选项""" | |
| self.chrome_options = Options() | |
| # 基础设置 | |
| if self.config.headless and not self.config.show_browser: | |
| self.chrome_options.add_argument('--headless') | |
| # 性能和稳定性选项 | |
| chrome_args = [ | |
| '--no-sandbox', | |
| '--disable-dev-shm-usage', | |
| '--disable-gpu', | |
| '--disable-extensions', | |
| '--disable-plugins', | |
| '--window-size=1920,1080', | |
| # 反检测选项 | |
| '--disable-blink-features=AutomationControlled', | |
| '--disable-web-security', | |
| '--allow-running-insecure-content', | |
| '--disable-features=VizDisplayCompositor' | |
| ] | |
| for arg in chrome_args: | |
| self.chrome_options.add_argument(arg) | |
| # 实验性选项 | |
| self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) | |
| self.chrome_options.add_experimental_option('useAutomationExtension', False) | |
| # 用户代理 | |
| self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}') | |
| def setup_driver(self) -> bool: | |
| """设置Chrome浏览器驱动""" | |
| try: | |
| service = ChromeService(ChromeDriverManager().install()) | |
| self.driver = webdriver.Chrome(service=service, options=self.chrome_options) | |
| self.driver.set_page_load_timeout(self.config.timeout) | |
| self.driver.implicitly_wait(10) | |
| # 执行反检测脚本 | |
| anti_detection_scripts = [ | |
| "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})", | |
| "Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})", | |
| "Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})", | |
| "window.chrome = {runtime: {}}" | |
| ] | |
| for script in anti_detection_scripts: | |
| self.driver.execute_script(script) | |
| # 添加Cookie | |
| if self.config.cookies: | |
| self._add_cookies() | |
| mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式" | |
| print(f"✅ Chrome浏览器已启动 ({mode_desc})") | |
| return True | |
| except Exception as e: | |
| print(f"❌ 浏览器启动失败: {e}") | |
| return False | |
| def _add_cookies(self): | |
| """添加Cookie""" | |
| # 先访问基础域名 | |
| domain = urlparse(self.config.base_url).netloc | |
| self.driver.get(self.config.base_url) | |
| for name, value in self.config.cookies.items(): | |
| try: | |
| self.driver.add_cookie({ | |
| 'name': name, | |
| 'value': value, | |
| 'domain': f'.{domain}' | |
| }) | |
| except Exception as e: | |
| print(f"⚠️ 添加Cookie失败 {name}: {e}") | |
| def cleanup_driver(self): | |
| """清理浏览器驱动""" | |
| if self.driver: | |
| try: | |
| self.driver.quit() | |
| print("✅ 浏览器已关闭") | |
| except Exception as e: | |
| print(f"⚠️ 关闭浏览器时出错: {e}") | |
| class ContentExtractor: | |
| """内容提取器""" | |
| def __init__(self, config: ArticleCrawlerConfig): | |
| self.config = config | |
| self.html_converter = html2text.HTML2Text() | |
| self.html_converter.ignore_links = False | |
| self.html_converter.ignore_images = False | |
| self.html_converter.body_width = 0 | |
| def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]: | |
| """提取页面内容""" | |
| for attempt in range(max_retries): | |
| try: | |
| # 随机延迟 | |
| delay = random.uniform(self.config.delay, self.config.delay * 2) | |
| time.sleep(delay) | |
| driver.get(url) | |
| # 等待页面加载 | |
| WebDriverWait(driver, 20).until( | |
| EC.presence_of_element_located((By.TAG_NAME, "body")) | |
| ) | |
| # 额外等待确保页面完全加载 | |
| time.sleep(random.uniform(1, 3)) | |
| # 尝试多种选择器找到文章内容 | |
| content_element = self._find_content_element(driver) | |
| if not content_element: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "No content container found" | |
| # 获取HTML内容,处理 stale element 异常 | |
| try: | |
| html_content = content_element.get_attribute('outerHTML') | |
| except StaleElementReferenceException: | |
| # 元素已过期,重新查找 | |
| print(f" ⚠️ 元素已过期,重新定位") | |
| content_element = self._find_content_element(driver) | |
| if not content_element: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "Content element became stale and could not be relocated" | |
| html_content = content_element.get_attribute('outerHTML') | |
| if not html_content or len(html_content.strip()) < 100: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "Content too short" | |
| # 清理和转换内容 | |
| markdown_content = self._process_html_content(html_content) | |
| return markdown_content.strip(), None | |
| except TimeoutException: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}") | |
| continue | |
| else: | |
| return None, "Page load timeout" | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}") | |
| continue | |
| else: | |
| return None, f"Extraction error: {str(e)}" | |
| return None, "Max retries exceeded" | |
| def _find_content_element(self, driver: webdriver.Chrome): | |
| """查找内容元素""" | |
| # 尝试预定义的选择器 | |
| for selector in self.config.content_selectors: | |
| try: | |
| # 等待元素出现 | |
| WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, selector)) | |
| ) | |
| elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
| if elements: | |
| print(f" ✅ 找到内容容器: {selector}") | |
| return elements[0] | |
| except (TimeoutException, NoSuchElementException, StaleElementReferenceException): | |
| continue | |
| # 如果没找到特定容器,尝试找到包含最多文本的div | |
| try: | |
| # 等待页面基本加载完成 | |
| WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.TAG_NAME, "div")) | |
| ) | |
| divs = driver.find_elements(By.TAG_NAME, "div") | |
| max_text_length = 0 | |
| best_div = None | |
| for div in divs: | |
| try: | |
| text_length = len(div.text.strip()) | |
| if text_length > max_text_length and text_length > 100: | |
| max_text_length = text_length | |
| best_div = div | |
| except (StaleElementReferenceException, Exception): | |
| continue | |
| if best_div: | |
| print(f" ✅ 找到内容容器: auto-detected div") | |
| return best_div | |
| except (TimeoutException, Exception): | |
| pass | |
| return None | |
| def _process_html_content(self, html_content: str) -> str: | |
| """处理HTML内容""" | |
| # 使用BeautifulSoup清理HTML | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # 移除不需要的元素 | |
| unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside'] | |
| for tag in soup.find_all(unwanted_tags): | |
| tag.decompose() | |
| # 转换为Markdown | |
| markdown_content = self.html_converter.handle(str(soup)) | |
| return markdown_content | |
| class ProgressManager: | |
| """进度管理器""" | |
| def __init__(self, progress_file: str = 'crawl_progress.json'): | |
| self.progress_file = progress_file | |
| def load_progress(self) -> Dict[str, set]: | |
| """加载爬取进度""" | |
| progress_path = Path(self.progress_file) | |
| if not progress_path.exists(): | |
| return {'completed': set(), 'failed': set()} | |
| try: | |
| with open(progress_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return { | |
| 'completed': set(data.get('completed', [])), | |
| 'failed': set(data.get('failed', [])) | |
| } | |
| except Exception as e: | |
| print(f"⚠️ 读取进度文件失败: {e},将重新开始") | |
| return {'completed': set(), 'failed': set()} | |
| def save_progress(self, progress: Dict[str, set]): | |
| """保存爬取进度""" | |
| try: | |
| data = { | |
| 'completed': list(progress['completed']), | |
| 'failed': list(progress['failed']), | |
| 'last_update': datetime.now().isoformat() | |
| } | |
| with open(self.progress_file, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"⚠️ 保存进度文件失败: {e}") | |
| class BaseArticleCrawler(ABC): | |
| """文章爬虫基类""" | |
| def __init__(self, config: ArticleCrawlerConfig): | |
| self.config = config | |
| self.driver_manager = SeleniumDriverManager(config) | |
| self.content_extractor = ContentExtractor(config) | |
| self.progress_manager = ProgressManager() | |
| # 统计信息 | |
| self.stats = { | |
| 'total_articles': 0, | |
| 'processed': 0, | |
| 'success': 0, | |
| 'failed': 0 | |
| } | |
| @abstractmethod | |
| def fetch_article_links(self) -> List[Dict[str, Any]]: | |
| """获取文章链接列表 - 子类必须实现""" | |
| pass | |
| @abstractmethod | |
| def generate_filename(self, article: Dict[str, Any]) -> str: | |
| """生成文件名 - 子类必须实现""" | |
| pass | |
| @abstractmethod | |
| def format_article_content(self, article: Dict[str, Any], content: str) -> str: | |
| """格式化文章内容 - 子类必须实现""" | |
| pass | |
| def is_article_completed(self, article: Dict[str, Any]) -> bool: | |
| """检查文章是否已完成""" | |
| filename = self.generate_filename(article) | |
| filepath = self.config.output_dir / filename | |
| return filepath.exists() | |
| def save_article(self, article: Dict[str, Any], content: str) -> bool: | |
| """保存文章为Markdown文件""" | |
| try: | |
| filename = self.generate_filename(article) | |
| filepath = self.config.output_dir / filename | |
| # 格式化完整内容 | |
| full_content = self.format_article_content(article, content) | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(full_content) | |
| print(f" 💾 已保存: {filename}") | |
| return True | |
| except Exception as e: | |
| print(f" ❌ 保存失败: {e}") | |
| return False | |
| def process_single_article(self, article: Dict[str, Any]) -> bool: | |
| """处理单篇文章""" | |
| article_title = article.get('title', article.get('url', 'Unknown')) | |
| print(f"📄 处理: {article_title}") | |
| # 提取内容 | |
| content, error = self.content_extractor.extract_content( | |
| self.driver_manager.driver, | |
| article['url'] | |
| ) | |
| if error: | |
| print(f" ❌ 提取失败: {error}") | |
| return False | |
| if not content: | |
| print(f" ❌ 内容为空") | |
| return False | |
| # 保存文章 | |
| success = self.save_article(article, content) | |
| if success: | |
| self.stats['success'] += 1 | |
| else: | |
| self.stats['failed'] += 1 | |
| self.stats['processed'] += 1 | |
| # 显示进度 - 使用剩余文章数计算进度 | |
| remaining_total = getattr(self, '_remaining_total', self.stats['total_articles']) | |
| progress = (self.stats['processed'] / remaining_total) * 100 | |
| print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)") | |
| return success | |
| def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'): | |
| """爬取所有文章""" | |
| # 获取文章链接 | |
| articles = self.fetch_article_links() | |
| if not articles: | |
| print("❌ 未找到文章链接") | |
| return | |
| # 设置进度管理器 | |
| self.progress_manager.progress_file = progress_file | |
| # 加载进度 | |
| progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()} | |
| # 过滤已完成的文章 | |
| remaining_articles = [] | |
| skipped_count = 0 | |
| for article in articles: | |
| article_id = self._get_article_id(article) | |
| # 检查是否已在进度记录中完成 | |
| if article_id in progress['completed']: | |
| skipped_count += 1 | |
| continue | |
| # 检查文件是否已存在 | |
| if self.is_article_completed(article): | |
| progress['completed'].add(article_id) | |
| skipped_count += 1 | |
| continue | |
| remaining_articles.append(article) | |
| self.stats['total_articles'] = len(articles) | |
| remaining_count = len(remaining_articles) | |
| # 设置剩余文章数用于进度计算 | |
| self._remaining_total = remaining_count | |
| print(f"🚀 开始爬取文章内容") | |
| print(f"📊 总文章数: {len(articles)}") | |
| print(f"✅ 已完成: {skipped_count}") | |
| print(f"🔄 待处理: {remaining_count}") | |
| print(f"📁 输出目录: {self.config.output_dir.absolute()}") | |
| print(f"🔧 并发数: {self.config.max_workers}") | |
| print(f"💾 进度文件: {progress_file}") | |
| print("-" * 60) | |
| if remaining_count == 0: | |
| print("🎉 所有文章已完成,无需继续爬取!") | |
| return | |
| # 设置浏览器 | |
| if not self.driver_manager.setup_driver(): | |
| return | |
| try: | |
| # 使用线程池处理文章 | |
| with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: | |
| # 为每个文章创建一个任务 | |
| future_to_article = {} | |
| for article in remaining_articles: | |
| future = executor.submit(self.process_single_article, article) | |
| future_to_article[future] = article | |
| # 处理完成的任务 | |
| for future in as_completed(future_to_article): | |
| article = future_to_article[future] | |
| article_id = self._get_article_id(article) | |
| try: | |
| success = future.result() | |
| if success: | |
| progress['completed'].add(article_id) | |
| # 定期保存进度 | |
| if len(progress['completed']) % 10 == 0: | |
| self.progress_manager.save_progress(progress) | |
| time.sleep(self.config.delay) | |
| else: | |
| progress['failed'].add(article_id) | |
| except Exception as e: | |
| article_title = article.get('title', article.get('url', 'Unknown')) | |
| print(f"❌ 处理文章 {article_title} 时出错: {e}") | |
| progress['failed'].add(article_id) | |
| self.stats['failed'] += 1 | |
| self.stats['processed'] += 1 | |
| finally: | |
| # 最终保存进度 | |
| self.progress_manager.save_progress(progress) | |
| self.driver_manager.cleanup_driver() | |
| # 显示最终统计 | |
| self._print_final_stats(progress) | |
| def _get_article_id(self, article: Dict[str, Any]) -> str: | |
| """获取文章唯一标识""" | |
| return article.get('id') or article.get('code') or article.get('url', '') | |
| def _print_final_stats(self, progress: Dict[str, set]): | |
| """打印最终统计信息""" | |
| total_completed = len(progress['completed']) | |
| total_failed = len(progress['failed']) | |
| print("\n" + "=" * 60) | |
| print(f"📊 文章爬取完成统计") | |
| print("=" * 60) | |
| print(f"总文章数: {self.stats['total_articles']}") | |
| print(f"本次处理: {self.stats['processed']}") | |
| print(f"本次成功: {self.stats['success']}") | |
| print(f"本次失败: {self.stats['failed']}") | |
| print(f"累计完成: {total_completed}") | |
| print(f"累计失败: {total_failed}") | |
| if self.stats['total_articles'] > 0: | |
| print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%") | |
| print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}") | |
| print(f"💾 进度已保存到: {self.progress_manager.progress_file}") | |
| # 示例实现:简单的URL列表爬虫 | |
| class SimpleUrlCrawler(BaseArticleCrawler): | |
| """简单的URL列表爬虫示例""" | |
| def __init__(self, config: ArticleCrawlerConfig, urls: List[str]): | |
| super().__init__(config) | |
| self.urls = urls | |
| def fetch_article_links(self) -> List[Dict[str, Any]]: | |
| """从URL列表生成文章信息""" | |
| articles = [] | |
| for i, url in enumerate(self.urls): | |
| articles.append({ | |
| 'id': str(i), | |
| 'url': url, | |
| 'title': f"Article_{i+1}" | |
| }) | |
| return articles | |
| def generate_filename(self, article: Dict[str, Any]) -> str: | |
| """生成文件名""" | |
| safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip() | |
| safe_title = re.sub(r'[-\s]+', '_', safe_title) | |
| return f"{article['id']}_{safe_title}.md" | |
| def format_article_content(self, article: Dict[str, Any], content: str) -> str: | |
| """格式化文章内容""" | |
| return f"""# {article['title']} | |
| **URL:** {article['url']} | |
| **ID:** {article['id']} | |
| **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| --- | |
| {content} | |
| """ | |
| def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig: | |
| """创建默认配置""" | |
| return ArticleCrawlerConfig(base_url=base_url, **kwargs) | |
| if __name__ == '__main__': | |
| # 示例用法 | |
| config = create_default_config( | |
| base_url="https://example.com", | |
| output_dir="./articles", | |
| delay=2.0, | |
| max_workers=2 | |
| ) | |
| # 示例URL列表 | |
| urls = [ | |
| "https://example.com/article1", | |
| "https://example.com/article2" | |
| ] | |
| crawler = SimpleUrlCrawler(config, urls) | |
| crawler.crawl_articles() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment