Last active
August 29, 2025 07:57
-
-
Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.
Revisions
-
SimpleZn renamed this gist
Aug 29, 2025 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
SimpleZn revised this gist
Aug 29, 2025 . 1 changed file with 1 addition and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,6 +1,4 @@ #!/usr/bin/env python3 """ 通用文章爬虫框架 @@ -1252,5 +1250,5 @@ if __name__ == '__main__': crawler = SimpleUrlCrawler(config, urls) crawler.crawl_articles() -
SimpleZn revised this gist
Aug 29, 2025 . 1 changed file with 0 additions and 373 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1254,376 +1254,3 @@ if __name__ == '__main__': crawler.crawl_articles() ``` -
SimpleZn created this gist
Aug 29, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,1629 @@ # generic article crawler ```python #!/usr/bin/env python3 """ 通用文章爬虫框架 基于Selenium的可扩展文章爬取基础框架 作者: AI Assistant 创建时间: 2025-01-27 """ import json import time import argparse from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urljoin, urlparse from datetime import datetime import re import random from abc import ABC, abstractmethod from typing import List, Dict, Optional, Tuple, Any # Selenium相关导入 from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException from webdriver_manager.chrome import ChromeDriverManager # HTML转Markdown import html2text from bs4 import BeautifulSoup class ArticleCrawlerConfig: """爬虫配置类""" def __init__(self, base_url: str, output_dir: str = "./output", delay: float = 2.0, max_workers: int = 3, headless: bool = True, show_browser: bool = False, user_agent: str = None, cookies: Dict[str, str] = None, content_selectors: List[str] = None, timeout: int = 30): self.base_url = base_url self.output_dir = Path(output_dir) self.delay = delay self.max_workers = max_workers self.headless = headless self.show_browser = show_browser self.timeout = timeout # 默认用户代理 self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36' # Cookie设置 self.cookies = cookies or {} # 内容选择器(按优先级排序) self.content_selectors = content_selectors or [ "article", ".article-content", ".content", "#content", ".post-content", "main", ".main-content" ] # 创建输出目录 self.output_dir.mkdir(parents=True, exist_ok=True) class SeleniumDriverManager: """Selenium浏览器驱动管理器""" def __init__(self, config: ArticleCrawlerConfig): self.config = config self.driver = None self._setup_chrome_options() def _setup_chrome_options(self): """设置Chrome选项""" self.chrome_options = Options() # 基础设置 if self.config.headless and not self.config.show_browser: self.chrome_options.add_argument('--headless') # 性能和稳定性选项 chrome_args = [ '--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu', '--disable-extensions', '--disable-plugins', '--window-size=1920,1080', # 反检测选项 '--disable-blink-features=AutomationControlled', '--disable-web-security', '--allow-running-insecure-content', '--disable-features=VizDisplayCompositor' ] for arg in chrome_args: self.chrome_options.add_argument(arg) # 实验性选项 self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) self.chrome_options.add_experimental_option('useAutomationExtension', False) # 用户代理 self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}') def setup_driver(self) -> bool: """设置Chrome浏览器驱动""" try: service = ChromeService(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=self.chrome_options) self.driver.set_page_load_timeout(self.config.timeout) self.driver.implicitly_wait(10) # 执行反检测脚本 anti_detection_scripts = [ "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})", "Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})", "Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})", "window.chrome = {runtime: {}}" ] for script in anti_detection_scripts: self.driver.execute_script(script) # 添加Cookie if self.config.cookies: self._add_cookies() mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式" print(f"✅ Chrome浏览器已启动 ({mode_desc})") return True except Exception as e: print(f"❌ 浏览器启动失败: {e}") return False def _add_cookies(self): """添加Cookie""" # 先访问基础域名 domain = urlparse(self.config.base_url).netloc self.driver.get(self.config.base_url) for name, value in self.config.cookies.items(): try: self.driver.add_cookie({ 'name': name, 'value': value, 'domain': f'.{domain}' }) except Exception as e: print(f"⚠️ 添加Cookie失败 {name}: {e}") def cleanup_driver(self): """清理浏览器驱动""" if self.driver: try: self.driver.quit() print("✅ 浏览器已关闭") except Exception as e: print(f"⚠️ 关闭浏览器时出错: {e}") class ContentExtractor: """内容提取器""" def __init__(self, config: ArticleCrawlerConfig): self.config = config self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False self.html_converter.ignore_images = False self.html_converter.body_width = 0 def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]: """提取页面内容""" for attempt in range(max_retries): try: # 随机延迟 delay = random.uniform(self.config.delay, self.config.delay * 2) time.sleep(delay) driver.get(url) # 等待页面加载 WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # 额外等待确保页面完全加载 time.sleep(random.uniform(1, 3)) # 尝试多种选择器找到文章内容 content_element = self._find_content_element(driver) if not content_element: if attempt < max_retries - 1: print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}") continue else: return None, "No content container found" # 获取HTML内容,处理 stale element 异常 try: html_content = content_element.get_attribute('outerHTML') except StaleElementReferenceException: # 元素已过期,重新查找 print(f" ⚠️ 元素已过期,重新定位") content_element = self._find_content_element(driver) if not content_element: if attempt < max_retries - 1: print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}") continue else: return None, "Content element became stale and could not be relocated" html_content = content_element.get_attribute('outerHTML') if not html_content or len(html_content.strip()) < 100: if attempt < max_retries - 1: print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}") continue else: return None, "Content too short" # 清理和转换内容 markdown_content = self._process_html_content(html_content) return markdown_content.strip(), None except TimeoutException: if attempt < max_retries - 1: print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}") continue else: return None, "Page load timeout" except Exception as e: if attempt < max_retries - 1: print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}") continue else: return None, f"Extraction error: {str(e)}" return None, "Max retries exceeded" def _find_content_element(self, driver: webdriver.Chrome): """查找内容元素""" # 尝试预定义的选择器 for selector in self.config.content_selectors: try: # 等待元素出现 WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, selector)) ) elements = driver.find_elements(By.CSS_SELECTOR, selector) if elements: print(f" ✅ 找到内容容器: {selector}") return elements[0] except (TimeoutException, NoSuchElementException, StaleElementReferenceException): continue # 如果没找到特定容器,尝试找到包含最多文本的div try: # 等待页面基本加载完成 WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "div")) ) divs = driver.find_elements(By.TAG_NAME, "div") max_text_length = 0 best_div = None for div in divs: try: text_length = len(div.text.strip()) if text_length > max_text_length and text_length > 100: max_text_length = text_length best_div = div except (StaleElementReferenceException, Exception): continue if best_div: print(f" ✅ 找到内容容器: auto-detected div") return best_div except (TimeoutException, Exception): pass return None def _process_html_content(self, html_content: str) -> str: """处理HTML内容""" # 使用BeautifulSoup清理HTML soup = BeautifulSoup(html_content, 'html.parser') # 移除不需要的元素 unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside'] for tag in soup.find_all(unwanted_tags): tag.decompose() # 转换为Markdown markdown_content = self.html_converter.handle(str(soup)) return markdown_content class ProgressManager: """进度管理器""" def __init__(self, progress_file: str = 'crawl_progress.json'): self.progress_file = progress_file def load_progress(self) -> Dict[str, set]: """加载爬取进度""" progress_path = Path(self.progress_file) if not progress_path.exists(): return {'completed': set(), 'failed': set()} try: with open(progress_path, 'r', encoding='utf-8') as f: data = json.load(f) return { 'completed': set(data.get('completed', [])), 'failed': set(data.get('failed', [])) } except Exception as e: print(f"⚠️ 读取进度文件失败: {e},将重新开始") return {'completed': set(), 'failed': set()} def save_progress(self, progress: Dict[str, set]): """保存爬取进度""" try: data = { 'completed': list(progress['completed']), 'failed': list(progress['failed']), 'last_update': datetime.now().isoformat() } with open(self.progress_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"⚠️ 保存进度文件失败: {e}") class BaseArticleCrawler(ABC): """文章爬虫基类""" def __init__(self, config: ArticleCrawlerConfig): self.config = config self.driver_manager = SeleniumDriverManager(config) self.content_extractor = ContentExtractor(config) self.progress_manager = ProgressManager() # 统计信息 self.stats = { 'total_articles': 0, 'processed': 0, 'success': 0, 'failed': 0 } @abstractmethod def fetch_article_links(self) -> List[Dict[str, Any]]: """获取文章链接列表 - 子类必须实现""" pass @abstractmethod def generate_filename(self, article: Dict[str, Any]) -> str: """生成文件名 - 子类必须实现""" pass @abstractmethod def format_article_content(self, article: Dict[str, Any], content: str) -> str: """格式化文章内容 - 子类必须实现""" pass def is_article_completed(self, article: Dict[str, Any]) -> bool: """检查文章是否已完成""" filename = self.generate_filename(article) filepath = self.config.output_dir / filename return filepath.exists() def save_article(self, article: Dict[str, Any], content: str) -> bool: """保存文章为Markdown文件""" try: filename = self.generate_filename(article) filepath = self.config.output_dir / filename # 格式化完整内容 full_content = self.format_article_content(article, content) with open(filepath, 'w', encoding='utf-8') as f: f.write(full_content) print(f" 💾 已保存: {filename}") return True except Exception as e: print(f" ❌ 保存失败: {e}") return False def process_single_article(self, article: Dict[str, Any]) -> bool: """处理单篇文章""" article_title = article.get('title', article.get('url', 'Unknown')) print(f"📄 处理: {article_title}") # 提取内容 content, error = self.content_extractor.extract_content( self.driver_manager.driver, article['url'] ) if error: print(f" ❌ 提取失败: {error}") return False if not content: print(f" ❌ 内容为空") return False # 保存文章 success = self.save_article(article, content) if success: self.stats['success'] += 1 else: self.stats['failed'] += 1 self.stats['processed'] += 1 # 显示进度 - 使用剩余文章数计算进度 remaining_total = getattr(self, '_remaining_total', self.stats['total_articles']) progress = (self.stats['processed'] / remaining_total) * 100 print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)") return success def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'): """爬取所有文章""" # 获取文章链接 articles = self.fetch_article_links() if not articles: print("❌ 未找到文章链接") return # 设置进度管理器 self.progress_manager.progress_file = progress_file # 加载进度 progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()} # 过滤已完成的文章 remaining_articles = [] skipped_count = 0 for article in articles: article_id = self._get_article_id(article) # 检查是否已在进度记录中完成 if article_id in progress['completed']: skipped_count += 1 continue # 检查文件是否已存在 if self.is_article_completed(article): progress['completed'].add(article_id) skipped_count += 1 continue remaining_articles.append(article) self.stats['total_articles'] = len(articles) remaining_count = len(remaining_articles) # 设置剩余文章数用于进度计算 self._remaining_total = remaining_count print(f"🚀 开始爬取文章内容") print(f"📊 总文章数: {len(articles)}") print(f"✅ 已完成: {skipped_count}") print(f"🔄 待处理: {remaining_count}") print(f"📁 输出目录: {self.config.output_dir.absolute()}") print(f"🔧 并发数: {self.config.max_workers}") print(f"💾 进度文件: {progress_file}") print("-" * 60) if remaining_count == 0: print("🎉 所有文章已完成,无需继续爬取!") return # 设置浏览器 if not self.driver_manager.setup_driver(): return try: # 使用线程池处理文章 with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: # 为每个文章创建一个任务 future_to_article = {} for article in remaining_articles: future = executor.submit(self.process_single_article, article) future_to_article[future] = article # 处理完成的任务 for future in as_completed(future_to_article): article = future_to_article[future] article_id = self._get_article_id(article) try: success = future.result() if success: progress['completed'].add(article_id) # 定期保存进度 if len(progress['completed']) % 10 == 0: self.progress_manager.save_progress(progress) time.sleep(self.config.delay) else: progress['failed'].add(article_id) except Exception as e: article_title = article.get('title', article.get('url', 'Unknown')) print(f"❌ 处理文章 {article_title} 时出错: {e}") progress['failed'].add(article_id) self.stats['failed'] += 1 self.stats['processed'] += 1 finally: # 最终保存进度 self.progress_manager.save_progress(progress) self.driver_manager.cleanup_driver() # 显示最终统计 self._print_final_stats(progress) def _get_article_id(self, article: Dict[str, Any]) -> str: """获取文章唯一标识""" return article.get('id') or article.get('code') or article.get('url', '') def _print_final_stats(self, progress: Dict[str, set]): """打印最终统计信息""" total_completed = len(progress['completed']) total_failed = len(progress['failed']) print("\n" + "=" * 60) print(f"📊 文章爬取完成统计") print("=" * 60) print(f"总文章数: {self.stats['total_articles']}") print(f"本次处理: {self.stats['processed']}") print(f"本次成功: {self.stats['success']}") print(f"本次失败: {self.stats['failed']}") print(f"累计完成: {total_completed}") print(f"累计失败: {total_failed}") if self.stats['total_articles'] > 0: print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%") print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}") print(f"💾 进度已保存到: {self.progress_manager.progress_file}") # 示例实现:简单的URL列表爬虫 class SimpleUrlCrawler(BaseArticleCrawler): """简单的URL列表爬虫示例""" def __init__(self, config: ArticleCrawlerConfig, urls: List[str]): super().__init__(config) self.urls = urls def fetch_article_links(self) -> List[Dict[str, Any]]: """从URL列表生成文章信息""" articles = [] for i, url in enumerate(self.urls): articles.append({ 'id': str(i), 'url': url, 'title': f"Article_{i+1}" }) return articles def generate_filename(self, article: Dict[str, Any]) -> str: """生成文件名""" safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip() safe_title = re.sub(r'[-\s]+', '_', safe_title) return f"{article['id']}_{safe_title}.md" def format_article_content(self, article: Dict[str, Any], content: str) -> str: """格式化文章内容""" return f"""# {article['title']} **URL:** {article['url']} **ID:** {article['id']} **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} --- {content} """ def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig: """创建默认配置""" return ArticleCrawlerConfig(base_url=base_url, **kwargs) if __name__ == '__main__': # 示例用法 config = create_default_config( base_url="https://example.com", output_dir="./articles", delay=2.0, max_workers=2 ) # 示例URL列表 urls = [ "https://example.com/article1", "https://example.com/article2" ] crawler = SimpleUrlCrawler(config, urls) crawler.crawl_articles()#!/usr/bin/env python3 """ 通用文章爬虫框架 基于Selenium的可扩展文章爬取基础框架 作者: AI Assistant 创建时间: 2025-01-27 """ import json import time import argparse from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urljoin, urlparse from datetime import datetime import re import random from abc import ABC, abstractmethod from typing import List, Dict, Optional, Tuple, Any # Selenium相关导入 from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException from webdriver_manager.chrome import ChromeDriverManager # HTML转Markdown import html2text from bs4 import BeautifulSoup class ArticleCrawlerConfig: """爬虫配置类""" def __init__(self, base_url: str, output_dir: str = "./output", delay: float = 2.0, max_workers: int = 3, headless: bool = True, show_browser: bool = False, user_agent: str = None, cookies: Dict[str, str] = None, content_selectors: List[str] = None, timeout: int = 30): self.base_url = base_url self.output_dir = Path(output_dir) self.delay = delay self.max_workers = max_workers self.headless = headless self.show_browser = show_browser self.timeout = timeout # 默认用户代理 self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36' # Cookie设置 self.cookies = cookies or {} # 内容选择器(按优先级排序) self.content_selectors = content_selectors or [ "article", ".article-content", ".content", "#content", ".post-content", "main", ".main-content" ] # 创建输出目录 self.output_dir.mkdir(parents=True, exist_ok=True) class SeleniumDriverManager: """Selenium浏览器驱动管理器""" def __init__(self, config: ArticleCrawlerConfig): self.config = config self.driver = None self._setup_chrome_options() def _setup_chrome_options(self): """设置Chrome选项""" self.chrome_options = Options() # 基础设置 if self.config.headless and not self.config.show_browser: self.chrome_options.add_argument('--headless') # 性能和稳定性选项 chrome_args = [ '--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu', '--disable-extensions', '--disable-plugins', '--window-size=1920,1080', # 反检测选项 '--disable-blink-features=AutomationControlled', '--disable-web-security', '--allow-running-insecure-content', '--disable-features=VizDisplayCompositor' ] for arg in chrome_args: self.chrome_options.add_argument(arg) # 实验性选项 self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) self.chrome_options.add_experimental_option('useAutomationExtension', False) # 用户代理 self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}') def setup_driver(self) -> bool: """设置Chrome浏览器驱动""" try: service = ChromeService(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=self.chrome_options) self.driver.set_page_load_timeout(self.config.timeout) self.driver.implicitly_wait(10) # 执行反检测脚本 anti_detection_scripts = [ "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})", "Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})", "Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})", "window.chrome = {runtime: {}}" ] for script in anti_detection_scripts: self.driver.execute_script(script) # 添加Cookie if self.config.cookies: self._add_cookies() mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式" print(f"✅ Chrome浏览器已启动 ({mode_desc})") return True except Exception as e: print(f"❌ 浏览器启动失败: {e}") return False def _add_cookies(self): """添加Cookie""" # 先访问基础域名 domain = urlparse(self.config.base_url).netloc self.driver.get(self.config.base_url) for name, value in self.config.cookies.items(): try: self.driver.add_cookie({ 'name': name, 'value': value, 'domain': f'.{domain}' }) except Exception as e: print(f"⚠️ 添加Cookie失败 {name}: {e}") def cleanup_driver(self): """清理浏览器驱动""" if self.driver: try: self.driver.quit() print("✅ 浏览器已关闭") except Exception as e: print(f"⚠️ 关闭浏览器时出错: {e}") class ContentExtractor: """内容提取器""" def __init__(self, config: ArticleCrawlerConfig): self.config = config self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False self.html_converter.ignore_images = False self.html_converter.body_width = 0 def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]: """提取页面内容""" for attempt in range(max_retries): try: # 随机延迟 delay = random.uniform(self.config.delay, self.config.delay * 2) time.sleep(delay) driver.get(url) # 等待页面加载 WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # 额外等待确保页面完全加载 time.sleep(random.uniform(1, 3)) # 尝试多种选择器找到文章内容 content_element = self._find_content_element(driver) if not content_element: if attempt < max_retries - 1: print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}") continue else: return None, "No content container found" # 获取HTML内容,处理 stale element 异常 try: html_content = content_element.get_attribute('outerHTML') except StaleElementReferenceException: # 元素已过期,重新查找 print(f" ⚠️ 元素已过期,重新定位") content_element = self._find_content_element(driver) if not content_element: if attempt < max_retries - 1: print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}") continue else: return None, "Content element became stale and could not be relocated" html_content = content_element.get_attribute('outerHTML') if not html_content or len(html_content.strip()) < 100: if attempt < max_retries - 1: print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}") continue else: return None, "Content too short" # 清理和转换内容 markdown_content = self._process_html_content(html_content) return markdown_content.strip(), None except TimeoutException: if attempt < max_retries - 1: print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}") continue else: return None, "Page load timeout" except Exception as e: if attempt < max_retries - 1: print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}") continue else: return None, f"Extraction error: {str(e)}" return None, "Max retries exceeded" def _find_content_element(self, driver: webdriver.Chrome): """查找内容元素""" # 尝试预定义的选择器 for selector in self.config.content_selectors: try: # 等待元素出现 WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, selector)) ) elements = driver.find_elements(By.CSS_SELECTOR, selector) if elements: print(f" ✅ 找到内容容器: {selector}") return elements[0] except (TimeoutException, NoSuchElementException, StaleElementReferenceException): continue # 如果没找到特定容器,尝试找到包含最多文本的div try: # 等待页面基本加载完成 WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "div")) ) divs = driver.find_elements(By.TAG_NAME, "div") max_text_length = 0 best_div = None for div in divs: try: text_length = len(div.text.strip()) if text_length > max_text_length and text_length > 100: max_text_length = text_length best_div = div except (StaleElementReferenceException, Exception): continue if best_div: print(f" ✅ 找到内容容器: auto-detected div") return best_div except (TimeoutException, Exception): pass return None def _process_html_content(self, html_content: str) -> str: """处理HTML内容""" # 使用BeautifulSoup清理HTML soup = BeautifulSoup(html_content, 'html.parser') # 移除不需要的元素 unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside'] for tag in soup.find_all(unwanted_tags): tag.decompose() # 转换为Markdown markdown_content = self.html_converter.handle(str(soup)) return markdown_content class ProgressManager: """进度管理器""" def __init__(self, progress_file: str = 'crawl_progress.json'): self.progress_file = progress_file def load_progress(self) -> Dict[str, set]: """加载爬取进度""" progress_path = Path(self.progress_file) if not progress_path.exists(): return {'completed': set(), 'failed': set()} try: with open(progress_path, 'r', encoding='utf-8') as f: data = json.load(f) return { 'completed': set(data.get('completed', [])), 'failed': set(data.get('failed', [])) } except Exception as e: print(f"⚠️ 读取进度文件失败: {e},将重新开始") return {'completed': set(), 'failed': set()} def save_progress(self, progress: Dict[str, set]): """保存爬取进度""" try: data = { 'completed': list(progress['completed']), 'failed': list(progress['failed']), 'last_update': datetime.now().isoformat() } with open(self.progress_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"⚠️ 保存进度文件失败: {e}") class BaseArticleCrawler(ABC): """文章爬虫基类""" def __init__(self, config: ArticleCrawlerConfig): self.config = config self.driver_manager = SeleniumDriverManager(config) self.content_extractor = ContentExtractor(config) self.progress_manager = ProgressManager() # 统计信息 self.stats = { 'total_articles': 0, 'processed': 0, 'success': 0, 'failed': 0 } @abstractmethod def fetch_article_links(self) -> List[Dict[str, Any]]: """获取文章链接列表 - 子类必须实现""" pass @abstractmethod def generate_filename(self, article: Dict[str, Any]) -> str: """生成文件名 - 子类必须实现""" pass @abstractmethod def format_article_content(self, article: Dict[str, Any], content: str) -> str: """格式化文章内容 - 子类必须实现""" pass def is_article_completed(self, article: Dict[str, Any]) -> bool: """检查文章是否已完成""" filename = self.generate_filename(article) filepath = self.config.output_dir / filename return filepath.exists() def save_article(self, article: Dict[str, Any], content: str) -> bool: """保存文章为Markdown文件""" try: filename = self.generate_filename(article) filepath = self.config.output_dir / filename # 格式化完整内容 full_content = self.format_article_content(article, content) with open(filepath, 'w', encoding='utf-8') as f: f.write(full_content) print(f" 💾 已保存: {filename}") return True except Exception as e: print(f" ❌ 保存失败: {e}") return False def process_single_article(self, article: Dict[str, Any]) -> bool: """处理单篇文章""" article_title = article.get('title', article.get('url', 'Unknown')) print(f"📄 处理: {article_title}") # 提取内容 content, error = self.content_extractor.extract_content( self.driver_manager.driver, article['url'] ) if error: print(f" ❌ 提取失败: {error}") return False if not content: print(f" ❌ 内容为空") return False # 保存文章 success = self.save_article(article, content) if success: self.stats['success'] += 1 else: self.stats['failed'] += 1 self.stats['processed'] += 1 # 显示进度 - 使用剩余文章数计算进度 remaining_total = getattr(self, '_remaining_total', self.stats['total_articles']) progress = (self.stats['processed'] / remaining_total) * 100 print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)") return success def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'): """爬取所有文章""" # 获取文章链接 articles = self.fetch_article_links() if not articles: print("❌ 未找到文章链接") return # 设置进度管理器 self.progress_manager.progress_file = progress_file # 加载进度 progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()} # 过滤已完成的文章 remaining_articles = [] skipped_count = 0 for article in articles: article_id = self._get_article_id(article) # 检查是否已在进度记录中完成 if article_id in progress['completed']: skipped_count += 1 continue # 检查文件是否已存在 if self.is_article_completed(article): progress['completed'].add(article_id) skipped_count += 1 continue remaining_articles.append(article) self.stats['total_articles'] = len(articles) remaining_count = len(remaining_articles) # 设置剩余文章数用于进度计算 self._remaining_total = remaining_count print(f"🚀 开始爬取文章内容") print(f"📊 总文章数: {len(articles)}") print(f"✅ 已完成: {skipped_count}") print(f"🔄 待处理: {remaining_count}") print(f"📁 输出目录: {self.config.output_dir.absolute()}") print(f"🔧 并发数: {self.config.max_workers}") print(f"💾 进度文件: {progress_file}") print("-" * 60) if remaining_count == 0: print("🎉 所有文章已完成,无需继续爬取!") return # 设置浏览器 if not self.driver_manager.setup_driver(): return try: # 使用线程池处理文章 with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: # 为每个文章创建一个任务 future_to_article = {} for article in remaining_articles: future = executor.submit(self.process_single_article, article) future_to_article[future] = article # 处理完成的任务 for future in as_completed(future_to_article): article = future_to_article[future] article_id = self._get_article_id(article) try: success = future.result() if success: progress['completed'].add(article_id) # 定期保存进度 if len(progress['completed']) % 10 == 0: self.progress_manager.save_progress(progress) time.sleep(self.config.delay) else: progress['failed'].add(article_id) except Exception as e: article_title = article.get('title', article.get('url', 'Unknown')) print(f"❌ 处理文章 {article_title} 时出错: {e}") progress['failed'].add(article_id) self.stats['failed'] += 1 self.stats['processed'] += 1 finally: # 最终保存进度 self.progress_manager.save_progress(progress) self.driver_manager.cleanup_driver() # 显示最终统计 self._print_final_stats(progress) def _get_article_id(self, article: Dict[str, Any]) -> str: """获取文章唯一标识""" return article.get('id') or article.get('code') or article.get('url', '') def _print_final_stats(self, progress: Dict[str, set]): """打印最终统计信息""" total_completed = len(progress['completed']) total_failed = len(progress['failed']) print("\n" + "=" * 60) print(f"📊 文章爬取完成统计") print("=" * 60) print(f"总文章数: {self.stats['total_articles']}") print(f"本次处理: {self.stats['processed']}") print(f"本次成功: {self.stats['success']}") print(f"本次失败: {self.stats['failed']}") print(f"累计完成: {total_completed}") print(f"累计失败: {total_failed}") if self.stats['total_articles'] > 0: print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%") print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}") print(f"💾 进度已保存到: {self.progress_manager.progress_file}") # 示例实现:简单的URL列表爬虫 class SimpleUrlCrawler(BaseArticleCrawler): """简单的URL列表爬虫示例""" def __init__(self, config: ArticleCrawlerConfig, urls: List[str]): super().__init__(config) self.urls = urls def fetch_article_links(self) -> List[Dict[str, Any]]: """从URL列表生成文章信息""" articles = [] for i, url in enumerate(self.urls): articles.append({ 'id': str(i), 'url': url, 'title': f"Article_{i+1}" }) return articles def generate_filename(self, article: Dict[str, Any]) -> str: """生成文件名""" safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip() safe_title = re.sub(r'[-\s]+', '_', safe_title) return f"{article['id']}_{safe_title}.md" def format_article_content(self, article: Dict[str, Any], content: str) -> str: """格式化文章内容""" return f"""# {article['title']} **URL:** {article['url']} **ID:** {article['id']} **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} --- {content} """ def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig: """创建默认配置""" return ArticleCrawlerConfig(base_url=base_url, **kwargs) if __name__ == '__main__': # 示例用法 config = create_default_config( base_url="https://example.com", output_dir="./articles", delay=2.0, max_workers=2 ) # 示例URL列表 urls = [ "https://example.com/article1", "https://example.com/article2" ] crawler = SimpleUrlCrawler(config, urls) crawler.crawl_articles() ``` # 使用指南 # 通用文章爬虫框架使用指南 ## 概述 本框架提供了一个基于Selenium的通用文章爬虫基础架构,可以轻松扩展用于不同网站的文章爬取。框架采用面向对象设计,支持断点续传、并发处理、内容提取和格式化等功能。 ## 框架结构 ### 核心组件 1. **ArticleCrawlerConfig** - 爬虫配置类 2. **SeleniumDriverManager** - 浏览器驱动管理器 3. **ContentExtractor** - 内容提取器 4. **ProgressManager** - 进度管理器 5. **BaseArticleCrawler** - 抽象基类 ### 文件结构 ``` scripts/ ├── generic_article_crawler.py # 通用框架核心 ├── binance_faq_generic_crawler.py # Binance FAQ实现示例 ├── crawler_framework_guide.md # 本使用指南 └── your_custom_crawler.py # 你的自定义爬虫 ``` ## 快速开始 ### 1. 创建自定义爬虫 继承 `BaseArticleCrawler` 类并实现必需的抽象方法: ```python from generic_article_crawler import BaseArticleCrawler, ArticleCrawlerConfig class YourCustomCrawler(BaseArticleCrawler): def fetch_article_links(self) -> List[Dict[str, Any]]: """获取文章链接列表 - 必须实现""" # 返回文章列表,每个文章包含 id, url, title 等字段 pass def generate_filename(self, article: Dict[str, Any]) -> str: """生成文件名 - 必须实现""" # 根据文章信息生成安全的文件名 pass def format_article_content(self, article: Dict[str, Any], content: str) -> str: """格式化文章内容 - 必须实现""" # 添加元数据和格式化内容 pass ``` ### 2. 配置爬虫 ```python from generic_article_crawler import ArticleCrawlerConfig config = ArticleCrawlerConfig( base_url="https://example.com", output_dir="./articles", delay=2.0, max_workers=3, headless=True, user_agent="Your User Agent", cookies={"key": "value"}, content_selectors=[".article-content", "article", ".content"] ) ``` ### 3. 运行爬虫 ```python crawler = YourCustomCrawler(config) crawler.crawl_articles(resume=True, progress_file='progress.json') ``` ## 详细实现指南 ### 实现 fetch_article_links() 这个方法负责获取所有需要爬取的文章链接。返回格式: ```python def fetch_article_links(self) -> List[Dict[str, Any]]: articles = [] # 方法1: 从API获取 response = requests.get("https://api.example.com/articles") data = response.json() for item in data['articles']: articles.append({ 'id': item['id'], 'url': item['url'], 'title': item['title'], # 其他自定义字段 }) # 方法2: 从HTML页面解析 # 方法3: 从本地文件读取 return articles ``` ### 实现 generate_filename() 生成安全的文件名,避免特殊字符: ```python import re def generate_filename(self, article: Dict[str, Any]) -> str: title = article['title'] # 清理特殊字符 safe_title = re.sub(r'[^\w\s-]', '', title).strip() safe_title = re.sub(r'[-\s]+', '_', safe_title) # 限制长度 if len(safe_title) > 100: safe_title = safe_title[:100] return f"{article['id']}_{safe_title}.md" ``` ### 实现 format_article_content() 格式化最终的文章内容: ```python from datetime import datetime def format_article_content(self, article: Dict[str, Any], content: str) -> str: return f"""# {article['title']} **ID:** {article['id']} **URL:** {article['url']} **爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} --- {content} --- *本文档由爬虫自动生成* """ ``` ## 配置选项详解 ### ArticleCrawlerConfig 参数 | 参数 | 类型 | 默认值 | 说明 | |------|------|--------|------| | base_url | str | 必需 | 网站基础URL | | output_dir | str | "./output" | 输出目录 | | delay | float | 2.0 | 请求间隔(秒) | | max_workers | int | 3 | 并发线程数 | | headless | bool | True | 无头模式 | | show_browser | bool | False | 显示浏览器 | | user_agent | str | 默认UA | 用户代理 | | cookies | Dict | {} | Cookie字典 | | content_selectors | List[str] | 默认选择器 | 内容选择器列表 | | timeout | int | 30 | 页面加载超时 | ### 内容选择器 框架会按优先级尝试以下选择器: ```python content_selectors = [ "article", # HTML5 article标签 ".article-content", # 常见类名 ".content", "#content", ".post-content", "main", ".main-content" ] ``` 你可以根据目标网站的HTML结构自定义选择器。 ## 高级功能 ### 断点续传 框架自动支持断点续传: ```python # 启用断点续传(默认) crawler.crawl_articles(resume=True, progress_file='progress.json') # 禁用断点续传,重新开始 crawler.crawl_articles(resume=False) ``` ### 自定义Cookie和反检测 ```python config = ArticleCrawlerConfig( base_url="https://example.com", cookies={ 'session_id': 'your_session_id', 'csrf_token': 'your_csrf_token' }, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ) ``` ### 并发控制 ```python # 低并发,适合严格的网站 config = ArticleCrawlerConfig(max_workers=1, delay=5.0) # 高并发,适合宽松的网站 config = ArticleCrawlerConfig(max_workers=5, delay=1.0) ``` ## 实际案例 ### 案例1: 博客文章爬虫 ```python class BlogCrawler(BaseArticleCrawler): def __init__(self, config, blog_url): super().__init__(config) self.blog_url = blog_url def fetch_article_links(self): # 从博客首页解析文章链接 articles = [] # 实现解析逻辑... return articles def generate_filename(self, article): date = article.get('date', '').replace('-', '_') title = re.sub(r'[^\w\s-]', '', article['title'])[:50] return f"{date}_{title}.md" def format_article_content(self, article, content): return f"""# {article['title']} **作者:** {article.get('author', 'Unknown')} **发布时间:** {article.get('date', 'Unknown')} **原文链接:** {article['url']} {content} """ ``` ### 案例2: 新闻网站爬虫 ```python class NewsCrawler(BaseArticleCrawler): def fetch_article_links(self): # 从RSS或API获取新闻列表 pass def generate_filename(self, article): category = article.get('category', 'general') timestamp = article.get('timestamp', int(time.time())) return f"{category}_{timestamp}_{article['id']}.md" ``` ## 最佳实践 ### 1. 遵守网站规则 - 检查 robots.txt - 设置合理的延迟时间 - 避免过高的并发数 - 尊重网站的反爬虫措施 ### 2. 错误处理 - 实现重试机制 - 记录失败的文章 - 监控爬取状态 ### 3. 性能优化 - 根据网站响应调整延迟 - 使用无头模式提高效率 - 定期清理浏览器缓存 ### 4. 数据质量 - 验证提取的内容 - 清理HTML标签 - 处理编码问题 ## 故障排除 ### 常见问题 1. **浏览器启动失败** - 检查Chrome是否已安装 - 更新ChromeDriver - 检查系统权限 2. **内容提取失败** - 检查content_selectors配置 - 查看页面HTML结构 - 调整等待时间 3. **反爬虫检测** - 增加延迟时间 - 更换User-Agent - 添加必要的Cookie - 使用代理IP 4. **内存占用过高** - 减少并发数 - 定期重启浏览器 - 清理临时文件 ### 调试技巧 ```python # 启用浏览器显示模式进行调试 config = ArticleCrawlerConfig( base_url="https://example.com", headless=False, show_browser=True ) # 单线程模式便于调试 config.max_workers = 1 ``` ## 扩展开发 ### 添加新的内容提取器 ```python class CustomContentExtractor(ContentExtractor): def extract_content(self, driver, url, max_retries=3): # 自定义提取逻辑 pass # 在爬虫中使用 class YourCrawler(BaseArticleCrawler): def __init__(self, config): super().__init__(config) self.content_extractor = CustomContentExtractor(config) ``` ### 添加新的进度管理器 ```python class DatabaseProgressManager(ProgressManager): def load_progress(self): # 从数据库加载进度 pass def save_progress(self, progress): # 保存进度到数据库 pass ``` ## 总结 通用文章爬虫框架提供了: - 🏗️ **模块化设计** - 易于扩展和维护 - 🔄 **断点续传** - 支持大规模爬取任务 - 🚀 **并发处理** - 提高爬取效率 - 🛡️ **反检测** - 内置多种反爬虫措施 - 📊 **进度跟踪** - 实时监控爬取状态 - 🎯 **智能提取** - 自动识别内容区域 - 📝 **格式化输出** - 生成标准Markdown文档 通过继承基类并实现三个核心方法,你可以快速为任何网站创建专用的文章爬虫。框架处理了所有底层细节,让你专注于业务逻辑的实现。