Skip to content

Instantly share code, notes, and snippets.

@SimpleZn
Last active August 29, 2025 07:57
Show Gist options
  • Select an option

  • Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.

Select an option

Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.
generic_article_crawler.py
# generic article crawler
```python
#!/usr/bin/env python3
"""
通用文章爬虫框架
基于Selenium的可扩展文章爬取基础框架
作者: AI Assistant
创建时间: 2025-01-27
"""
import json
import time
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse
from datetime import datetime
import re
import random
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Tuple, Any
# Selenium相关导入
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
from webdriver_manager.chrome import ChromeDriverManager
# HTML转Markdown
import html2text
from bs4 import BeautifulSoup
class ArticleCrawlerConfig:
"""爬虫配置类"""
def __init__(self,
base_url: str,
output_dir: str = "./output",
delay: float = 2.0,
max_workers: int = 3,
headless: bool = True,
show_browser: bool = False,
user_agent: str = None,
cookies: Dict[str, str] = None,
content_selectors: List[str] = None,
timeout: int = 30):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.delay = delay
self.max_workers = max_workers
self.headless = headless
self.show_browser = show_browser
self.timeout = timeout
# 默认用户代理
self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
# Cookie设置
self.cookies = cookies or {}
# 内容选择器(按优先级排序)
self.content_selectors = content_selectors or [
"article",
".article-content",
".content",
"#content",
".post-content",
"main",
".main-content"
]
# 创建输出目录
self.output_dir.mkdir(parents=True, exist_ok=True)
class SeleniumDriverManager:
"""Selenium浏览器驱动管理器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver = None
self._setup_chrome_options()
def _setup_chrome_options(self):
"""设置Chrome选项"""
self.chrome_options = Options()
# 基础设置
if self.config.headless and not self.config.show_browser:
self.chrome_options.add_argument('--headless')
# 性能和稳定性选项
chrome_args = [
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-plugins',
'--window-size=1920,1080',
# 反检测选项
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--allow-running-insecure-content',
'--disable-features=VizDisplayCompositor'
]
for arg in chrome_args:
self.chrome_options.add_argument(arg)
# 实验性选项
self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
self.chrome_options.add_experimental_option('useAutomationExtension', False)
# 用户代理
self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}')
def setup_driver(self) -> bool:
"""设置Chrome浏览器驱动"""
try:
service = ChromeService(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
self.driver.set_page_load_timeout(self.config.timeout)
self.driver.implicitly_wait(10)
# 执行反检测脚本
anti_detection_scripts = [
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})",
"Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})",
"Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})",
"window.chrome = {runtime: {}}"
]
for script in anti_detection_scripts:
self.driver.execute_script(script)
# 添加Cookie
if self.config.cookies:
self._add_cookies()
mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式"
print(f"✅ Chrome浏览器已启动 ({mode_desc})")
return True
except Exception as e:
print(f"❌ 浏览器启动失败: {e}")
return False
def _add_cookies(self):
"""添加Cookie"""
# 先访问基础域名
domain = urlparse(self.config.base_url).netloc
self.driver.get(self.config.base_url)
for name, value in self.config.cookies.items():
try:
self.driver.add_cookie({
'name': name,
'value': value,
'domain': f'.{domain}'
})
except Exception as e:
print(f"⚠️ 添加Cookie失败 {name}: {e}")
def cleanup_driver(self):
"""清理浏览器驱动"""
if self.driver:
try:
self.driver.quit()
print("✅ 浏览器已关闭")
except Exception as e:
print(f"⚠️ 关闭浏览器时出错: {e}")
class ContentExtractor:
"""内容提取器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.html_converter = html2text.HTML2Text()
self.html_converter.ignore_links = False
self.html_converter.ignore_images = False
self.html_converter.body_width = 0
def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]:
"""提取页面内容"""
for attempt in range(max_retries):
try:
# 随机延迟
delay = random.uniform(self.config.delay, self.config.delay * 2)
time.sleep(delay)
driver.get(url)
# 等待页面加载
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 额外等待确保页面完全加载
time.sleep(random.uniform(1, 3))
# 尝试多种选择器找到文章内容
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "No content container found"
# 获取HTML内容,处理 stale element 异常
try:
html_content = content_element.get_attribute('outerHTML')
except StaleElementReferenceException:
# 元素已过期,重新查找
print(f" ⚠️ 元素已过期,重新定位")
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content element became stale and could not be relocated"
html_content = content_element.get_attribute('outerHTML')
if not html_content or len(html_content.strip()) < 100:
if attempt < max_retries - 1:
print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content too short"
# 清理和转换内容
markdown_content = self._process_html_content(html_content)
return markdown_content.strip(), None
except TimeoutException:
if attempt < max_retries - 1:
print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Page load timeout"
except Exception as e:
if attempt < max_retries - 1:
print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}")
continue
else:
return None, f"Extraction error: {str(e)}"
return None, "Max retries exceeded"
def _find_content_element(self, driver: webdriver.Chrome):
"""查找内容元素"""
# 尝试预定义的选择器
for selector in self.config.content_selectors:
try:
# 等待元素出现
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
elements = driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
print(f" ✅ 找到内容容器: {selector}")
return elements[0]
except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
continue
# 如果没找到特定容器,尝试找到包含最多文本的div
try:
# 等待页面基本加载完成
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "div"))
)
divs = driver.find_elements(By.TAG_NAME, "div")
max_text_length = 0
best_div = None
for div in divs:
try:
text_length = len(div.text.strip())
if text_length > max_text_length and text_length > 100:
max_text_length = text_length
best_div = div
except (StaleElementReferenceException, Exception):
continue
if best_div:
print(f" ✅ 找到内容容器: auto-detected div")
return best_div
except (TimeoutException, Exception):
pass
return None
def _process_html_content(self, html_content: str) -> str:
"""处理HTML内容"""
# 使用BeautifulSoup清理HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 移除不需要的元素
unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside']
for tag in soup.find_all(unwanted_tags):
tag.decompose()
# 转换为Markdown
markdown_content = self.html_converter.handle(str(soup))
return markdown_content
class ProgressManager:
"""进度管理器"""
def __init__(self, progress_file: str = 'crawl_progress.json'):
self.progress_file = progress_file
def load_progress(self) -> Dict[str, set]:
"""加载爬取进度"""
progress_path = Path(self.progress_file)
if not progress_path.exists():
return {'completed': set(), 'failed': set()}
try:
with open(progress_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return {
'completed': set(data.get('completed', [])),
'failed': set(data.get('failed', []))
}
except Exception as e:
print(f"⚠️ 读取进度文件失败: {e},将重新开始")
return {'completed': set(), 'failed': set()}
def save_progress(self, progress: Dict[str, set]):
"""保存爬取进度"""
try:
data = {
'completed': list(progress['completed']),
'failed': list(progress['failed']),
'last_update': datetime.now().isoformat()
}
with open(self.progress_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"⚠️ 保存进度文件失败: {e}")
class BaseArticleCrawler(ABC):
"""文章爬虫基类"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver_manager = SeleniumDriverManager(config)
self.content_extractor = ContentExtractor(config)
self.progress_manager = ProgressManager()
# 统计信息
self.stats = {
'total_articles': 0,
'processed': 0,
'success': 0,
'failed': 0
}
@abstractmethod
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""获取文章链接列表 - 子类必须实现"""
pass
@abstractmethod
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名 - 子类必须实现"""
pass
@abstractmethod
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容 - 子类必须实现"""
pass
def is_article_completed(self, article: Dict[str, Any]) -> bool:
"""检查文章是否已完成"""
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
return filepath.exists()
def save_article(self, article: Dict[str, Any], content: str) -> bool:
"""保存文章为Markdown文件"""
try:
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
# 格式化完整内容
full_content = self.format_article_content(article, content)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(full_content)
print(f" 💾 已保存: {filename}")
return True
except Exception as e:
print(f" ❌ 保存失败: {e}")
return False
def process_single_article(self, article: Dict[str, Any]) -> bool:
"""处理单篇文章"""
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"📄 处理: {article_title}")
# 提取内容
content, error = self.content_extractor.extract_content(
self.driver_manager.driver,
article['url']
)
if error:
print(f" ❌ 提取失败: {error}")
return False
if not content:
print(f" ❌ 内容为空")
return False
# 保存文章
success = self.save_article(article, content)
if success:
self.stats['success'] += 1
else:
self.stats['failed'] += 1
self.stats['processed'] += 1
# 显示进度 - 使用剩余文章数计算进度
remaining_total = getattr(self, '_remaining_total', self.stats['total_articles'])
progress = (self.stats['processed'] / remaining_total) * 100
print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)")
return success
def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'):
"""爬取所有文章"""
# 获取文章链接
articles = self.fetch_article_links()
if not articles:
print("❌ 未找到文章链接")
return
# 设置进度管理器
self.progress_manager.progress_file = progress_file
# 加载进度
progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()}
# 过滤已完成的文章
remaining_articles = []
skipped_count = 0
for article in articles:
article_id = self._get_article_id(article)
# 检查是否已在进度记录中完成
if article_id in progress['completed']:
skipped_count += 1
continue
# 检查文件是否已存在
if self.is_article_completed(article):
progress['completed'].add(article_id)
skipped_count += 1
continue
remaining_articles.append(article)
self.stats['total_articles'] = len(articles)
remaining_count = len(remaining_articles)
# 设置剩余文章数用于进度计算
self._remaining_total = remaining_count
print(f"🚀 开始爬取文章内容")
print(f"📊 总文章数: {len(articles)}")
print(f"✅ 已完成: {skipped_count}")
print(f"🔄 待处理: {remaining_count}")
print(f"📁 输出目录: {self.config.output_dir.absolute()}")
print(f"🔧 并发数: {self.config.max_workers}")
print(f"💾 进度文件: {progress_file}")
print("-" * 60)
if remaining_count == 0:
print("🎉 所有文章已完成,无需继续爬取!")
return
# 设置浏览器
if not self.driver_manager.setup_driver():
return
try:
# 使用线程池处理文章
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
# 为每个文章创建一个任务
future_to_article = {}
for article in remaining_articles:
future = executor.submit(self.process_single_article, article)
future_to_article[future] = article
# 处理完成的任务
for future in as_completed(future_to_article):
article = future_to_article[future]
article_id = self._get_article_id(article)
try:
success = future.result()
if success:
progress['completed'].add(article_id)
# 定期保存进度
if len(progress['completed']) % 10 == 0:
self.progress_manager.save_progress(progress)
time.sleep(self.config.delay)
else:
progress['failed'].add(article_id)
except Exception as e:
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"❌ 处理文章 {article_title} 时出错: {e}")
progress['failed'].add(article_id)
self.stats['failed'] += 1
self.stats['processed'] += 1
finally:
# 最终保存进度
self.progress_manager.save_progress(progress)
self.driver_manager.cleanup_driver()
# 显示最终统计
self._print_final_stats(progress)
def _get_article_id(self, article: Dict[str, Any]) -> str:
"""获取文章唯一标识"""
return article.get('id') or article.get('code') or article.get('url', '')
def _print_final_stats(self, progress: Dict[str, set]):
"""打印最终统计信息"""
total_completed = len(progress['completed'])
total_failed = len(progress['failed'])
print("\n" + "=" * 60)
print(f"📊 文章爬取完成统计")
print("=" * 60)
print(f"总文章数: {self.stats['total_articles']}")
print(f"本次处理: {self.stats['processed']}")
print(f"本次成功: {self.stats['success']}")
print(f"本次失败: {self.stats['failed']}")
print(f"累计完成: {total_completed}")
print(f"累计失败: {total_failed}")
if self.stats['total_articles'] > 0:
print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%")
print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}")
print(f"💾 进度已保存到: {self.progress_manager.progress_file}")
# 示例实现:简单的URL列表爬虫
class SimpleUrlCrawler(BaseArticleCrawler):
"""简单的URL列表爬虫示例"""
def __init__(self, config: ArticleCrawlerConfig, urls: List[str]):
super().__init__(config)
self.urls = urls
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""从URL列表生成文章信息"""
articles = []
for i, url in enumerate(self.urls):
articles.append({
'id': str(i),
'url': url,
'title': f"Article_{i+1}"
})
return articles
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名"""
safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip()
safe_title = re.sub(r'[-\s]+', '_', safe_title)
return f"{article['id']}_{safe_title}.md"
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容"""
return f"""# {article['title']}
**URL:** {article['url']}
**ID:** {article['id']}
**爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
{content}
"""
def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig:
"""创建默认配置"""
return ArticleCrawlerConfig(base_url=base_url, **kwargs)
if __name__ == '__main__':
# 示例用法
config = create_default_config(
base_url="https://example.com",
output_dir="./articles",
delay=2.0,
max_workers=2
)
# 示例URL列表
urls = [
"https://example.com/article1",
"https://example.com/article2"
]
crawler = SimpleUrlCrawler(config, urls)
crawler.crawl_articles()#!/usr/bin/env python3
"""
通用文章爬虫框架
基于Selenium的可扩展文章爬取基础框架
作者: AI Assistant
创建时间: 2025-01-27
"""
import json
import time
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse
from datetime import datetime
import re
import random
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Tuple, Any
# Selenium相关导入
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
from webdriver_manager.chrome import ChromeDriverManager
# HTML转Markdown
import html2text
from bs4 import BeautifulSoup
class ArticleCrawlerConfig:
"""爬虫配置类"""
def __init__(self,
base_url: str,
output_dir: str = "./output",
delay: float = 2.0,
max_workers: int = 3,
headless: bool = True,
show_browser: bool = False,
user_agent: str = None,
cookies: Dict[str, str] = None,
content_selectors: List[str] = None,
timeout: int = 30):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.delay = delay
self.max_workers = max_workers
self.headless = headless
self.show_browser = show_browser
self.timeout = timeout
# 默认用户代理
self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
# Cookie设置
self.cookies = cookies or {}
# 内容选择器(按优先级排序)
self.content_selectors = content_selectors or [
"article",
".article-content",
".content",
"#content",
".post-content",
"main",
".main-content"
]
# 创建输出目录
self.output_dir.mkdir(parents=True, exist_ok=True)
class SeleniumDriverManager:
"""Selenium浏览器驱动管理器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver = None
self._setup_chrome_options()
def _setup_chrome_options(self):
"""设置Chrome选项"""
self.chrome_options = Options()
# 基础设置
if self.config.headless and not self.config.show_browser:
self.chrome_options.add_argument('--headless')
# 性能和稳定性选项
chrome_args = [
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-plugins',
'--window-size=1920,1080',
# 反检测选项
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--allow-running-insecure-content',
'--disable-features=VizDisplayCompositor'
]
for arg in chrome_args:
self.chrome_options.add_argument(arg)
# 实验性选项
self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
self.chrome_options.add_experimental_option('useAutomationExtension', False)
# 用户代理
self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}')
def setup_driver(self) -> bool:
"""设置Chrome浏览器驱动"""
try:
service = ChromeService(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
self.driver.set_page_load_timeout(self.config.timeout)
self.driver.implicitly_wait(10)
# 执行反检测脚本
anti_detection_scripts = [
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})",
"Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})",
"Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})",
"window.chrome = {runtime: {}}"
]
for script in anti_detection_scripts:
self.driver.execute_script(script)
# 添加Cookie
if self.config.cookies:
self._add_cookies()
mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式"
print(f"✅ Chrome浏览器已启动 ({mode_desc})")
return True
except Exception as e:
print(f"❌ 浏览器启动失败: {e}")
return False
def _add_cookies(self):
"""添加Cookie"""
# 先访问基础域名
domain = urlparse(self.config.base_url).netloc
self.driver.get(self.config.base_url)
for name, value in self.config.cookies.items():
try:
self.driver.add_cookie({
'name': name,
'value': value,
'domain': f'.{domain}'
})
except Exception as e:
print(f"⚠️ 添加Cookie失败 {name}: {e}")
def cleanup_driver(self):
"""清理浏览器驱动"""
if self.driver:
try:
self.driver.quit()
print("✅ 浏览器已关闭")
except Exception as e:
print(f"⚠️ 关闭浏览器时出错: {e}")
class ContentExtractor:
"""内容提取器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.html_converter = html2text.HTML2Text()
self.html_converter.ignore_links = False
self.html_converter.ignore_images = False
self.html_converter.body_width = 0
def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]:
"""提取页面内容"""
for attempt in range(max_retries):
try:
# 随机延迟
delay = random.uniform(self.config.delay, self.config.delay * 2)
time.sleep(delay)
driver.get(url)
# 等待页面加载
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 额外等待确保页面完全加载
time.sleep(random.uniform(1, 3))
# 尝试多种选择器找到文章内容
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "No content container found"
# 获取HTML内容,处理 stale element 异常
try:
html_content = content_element.get_attribute('outerHTML')
except StaleElementReferenceException:
# 元素已过期,重新查找
print(f" ⚠️ 元素已过期,重新定位")
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content element became stale and could not be relocated"
html_content = content_element.get_attribute('outerHTML')
if not html_content or len(html_content.strip()) < 100:
if attempt < max_retries - 1:
print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content too short"
# 清理和转换内容
markdown_content = self._process_html_content(html_content)
return markdown_content.strip(), None
except TimeoutException:
if attempt < max_retries - 1:
print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Page load timeout"
except Exception as e:
if attempt < max_retries - 1:
print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}")
continue
else:
return None, f"Extraction error: {str(e)}"
return None, "Max retries exceeded"
def _find_content_element(self, driver: webdriver.Chrome):
"""查找内容元素"""
# 尝试预定义的选择器
for selector in self.config.content_selectors:
try:
# 等待元素出现
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
elements = driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
print(f" ✅ 找到内容容器: {selector}")
return elements[0]
except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
continue
# 如果没找到特定容器,尝试找到包含最多文本的div
try:
# 等待页面基本加载完成
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "div"))
)
divs = driver.find_elements(By.TAG_NAME, "div")
max_text_length = 0
best_div = None
for div in divs:
try:
text_length = len(div.text.strip())
if text_length > max_text_length and text_length > 100:
max_text_length = text_length
best_div = div
except (StaleElementReferenceException, Exception):
continue
if best_div:
print(f" ✅ 找到内容容器: auto-detected div")
return best_div
except (TimeoutException, Exception):
pass
return None
def _process_html_content(self, html_content: str) -> str:
"""处理HTML内容"""
# 使用BeautifulSoup清理HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 移除不需要的元素
unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside']
for tag in soup.find_all(unwanted_tags):
tag.decompose()
# 转换为Markdown
markdown_content = self.html_converter.handle(str(soup))
return markdown_content
class ProgressManager:
"""进度管理器"""
def __init__(self, progress_file: str = 'crawl_progress.json'):
self.progress_file = progress_file
def load_progress(self) -> Dict[str, set]:
"""加载爬取进度"""
progress_path = Path(self.progress_file)
if not progress_path.exists():
return {'completed': set(), 'failed': set()}
try:
with open(progress_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return {
'completed': set(data.get('completed', [])),
'failed': set(data.get('failed', []))
}
except Exception as e:
print(f"⚠️ 读取进度文件失败: {e},将重新开始")
return {'completed': set(), 'failed': set()}
def save_progress(self, progress: Dict[str, set]):
"""保存爬取进度"""
try:
data = {
'completed': list(progress['completed']),
'failed': list(progress['failed']),
'last_update': datetime.now().isoformat()
}
with open(self.progress_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"⚠️ 保存进度文件失败: {e}")
class BaseArticleCrawler(ABC):
"""文章爬虫基类"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver_manager = SeleniumDriverManager(config)
self.content_extractor = ContentExtractor(config)
self.progress_manager = ProgressManager()
# 统计信息
self.stats = {
'total_articles': 0,
'processed': 0,
'success': 0,
'failed': 0
}
@abstractmethod
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""获取文章链接列表 - 子类必须实现"""
pass
@abstractmethod
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名 - 子类必须实现"""
pass
@abstractmethod
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容 - 子类必须实现"""
pass
def is_article_completed(self, article: Dict[str, Any]) -> bool:
"""检查文章是否已完成"""
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
return filepath.exists()
def save_article(self, article: Dict[str, Any], content: str) -> bool:
"""保存文章为Markdown文件"""
try:
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
# 格式化完整内容
full_content = self.format_article_content(article, content)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(full_content)
print(f" 💾 已保存: {filename}")
return True
except Exception as e:
print(f" ❌ 保存失败: {e}")
return False
def process_single_article(self, article: Dict[str, Any]) -> bool:
"""处理单篇文章"""
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"📄 处理: {article_title}")
# 提取内容
content, error = self.content_extractor.extract_content(
self.driver_manager.driver,
article['url']
)
if error:
print(f" ❌ 提取失败: {error}")
return False
if not content:
print(f" ❌ 内容为空")
return False
# 保存文章
success = self.save_article(article, content)
if success:
self.stats['success'] += 1
else:
self.stats['failed'] += 1
self.stats['processed'] += 1
# 显示进度 - 使用剩余文章数计算进度
remaining_total = getattr(self, '_remaining_total', self.stats['total_articles'])
progress = (self.stats['processed'] / remaining_total) * 100
print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)")
return success
def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'):
"""爬取所有文章"""
# 获取文章链接
articles = self.fetch_article_links()
if not articles:
print("❌ 未找到文章链接")
return
# 设置进度管理器
self.progress_manager.progress_file = progress_file
# 加载进度
progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()}
# 过滤已完成的文章
remaining_articles = []
skipped_count = 0
for article in articles:
article_id = self._get_article_id(article)
# 检查是否已在进度记录中完成
if article_id in progress['completed']:
skipped_count += 1
continue
# 检查文件是否已存在
if self.is_article_completed(article):
progress['completed'].add(article_id)
skipped_count += 1
continue
remaining_articles.append(article)
self.stats['total_articles'] = len(articles)
remaining_count = len(remaining_articles)
# 设置剩余文章数用于进度计算
self._remaining_total = remaining_count
print(f"🚀 开始爬取文章内容")
print(f"📊 总文章数: {len(articles)}")
print(f"✅ 已完成: {skipped_count}")
print(f"🔄 待处理: {remaining_count}")
print(f"📁 输出目录: {self.config.output_dir.absolute()}")
print(f"🔧 并发数: {self.config.max_workers}")
print(f"💾 进度文件: {progress_file}")
print("-" * 60)
if remaining_count == 0:
print("🎉 所有文章已完成,无需继续爬取!")
return
# 设置浏览器
if not self.driver_manager.setup_driver():
return
try:
# 使用线程池处理文章
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
# 为每个文章创建一个任务
future_to_article = {}
for article in remaining_articles:
future = executor.submit(self.process_single_article, article)
future_to_article[future] = article
# 处理完成的任务
for future in as_completed(future_to_article):
article = future_to_article[future]
article_id = self._get_article_id(article)
try:
success = future.result()
if success:
progress['completed'].add(article_id)
# 定期保存进度
if len(progress['completed']) % 10 == 0:
self.progress_manager.save_progress(progress)
time.sleep(self.config.delay)
else:
progress['failed'].add(article_id)
except Exception as e:
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"❌ 处理文章 {article_title} 时出错: {e}")
progress['failed'].add(article_id)
self.stats['failed'] += 1
self.stats['processed'] += 1
finally:
# 最终保存进度
self.progress_manager.save_progress(progress)
self.driver_manager.cleanup_driver()
# 显示最终统计
self._print_final_stats(progress)
def _get_article_id(self, article: Dict[str, Any]) -> str:
"""获取文章唯一标识"""
return article.get('id') or article.get('code') or article.get('url', '')
def _print_final_stats(self, progress: Dict[str, set]):
"""打印最终统计信息"""
total_completed = len(progress['completed'])
total_failed = len(progress['failed'])
print("\n" + "=" * 60)
print(f"📊 文章爬取完成统计")
print("=" * 60)
print(f"总文章数: {self.stats['total_articles']}")
print(f"本次处理: {self.stats['processed']}")
print(f"本次成功: {self.stats['success']}")
print(f"本次失败: {self.stats['failed']}")
print(f"累计完成: {total_completed}")
print(f"累计失败: {total_failed}")
if self.stats['total_articles'] > 0:
print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%")
print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}")
print(f"💾 进度已保存到: {self.progress_manager.progress_file}")
# 示例实现:简单的URL列表爬虫
class SimpleUrlCrawler(BaseArticleCrawler):
"""简单的URL列表爬虫示例"""
def __init__(self, config: ArticleCrawlerConfig, urls: List[str]):
super().__init__(config)
self.urls = urls
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""从URL列表生成文章信息"""
articles = []
for i, url in enumerate(self.urls):
articles.append({
'id': str(i),
'url': url,
'title': f"Article_{i+1}"
})
return articles
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名"""
safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip()
safe_title = re.sub(r'[-\s]+', '_', safe_title)
return f"{article['id']}_{safe_title}.md"
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容"""
return f"""# {article['title']}
**URL:** {article['url']}
**ID:** {article['id']}
**爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
{content}
"""
def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig:
"""创建默认配置"""
return ArticleCrawlerConfig(base_url=base_url, **kwargs)
if __name__ == '__main__':
# 示例用法
config = create_default_config(
base_url="https://example.com",
output_dir="./articles",
delay=2.0,
max_workers=2
)
# 示例URL列表
urls = [
"https://example.com/article1",
"https://example.com/article2"
]
crawler = SimpleUrlCrawler(config, urls)
crawler.crawl_articles()
```
# 使用指南
# 通用文章爬虫框架使用指南
## 概述
本框架提供了一个基于Selenium的通用文章爬虫基础架构,可以轻松扩展用于不同网站的文章爬取。框架采用面向对象设计,支持断点续传、并发处理、内容提取和格式化等功能。
## 框架结构
### 核心组件
1. **ArticleCrawlerConfig** - 爬虫配置类
2. **SeleniumDriverManager** - 浏览器驱动管理器
3. **ContentExtractor** - 内容提取器
4. **ProgressManager** - 进度管理器
5. **BaseArticleCrawler** - 抽象基类
### 文件结构
```
scripts/
├── generic_article_crawler.py # 通用框架核心
├── binance_faq_generic_crawler.py # Binance FAQ实现示例
├── crawler_framework_guide.md # 本使用指南
└── your_custom_crawler.py # 你的自定义爬虫
```
## 快速开始
### 1. 创建自定义爬虫
继承 `BaseArticleCrawler` 类并实现必需的抽象方法:
```python
from generic_article_crawler import BaseArticleCrawler, ArticleCrawlerConfig
class YourCustomCrawler(BaseArticleCrawler):
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""获取文章链接列表 - 必须实现"""
# 返回文章列表,每个文章包含 id, url, title 等字段
pass
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名 - 必须实现"""
# 根据文章信息生成安全的文件名
pass
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容 - 必须实现"""
# 添加元数据和格式化内容
pass
```
### 2. 配置爬虫
```python
from generic_article_crawler import ArticleCrawlerConfig
config = ArticleCrawlerConfig(
base_url="https://example.com",
output_dir="./articles",
delay=2.0,
max_workers=3,
headless=True,
user_agent="Your User Agent",
cookies={"key": "value"},
content_selectors=[".article-content", "article", ".content"]
)
```
### 3. 运行爬虫
```python
crawler = YourCustomCrawler(config)
crawler.crawl_articles(resume=True, progress_file='progress.json')
```
## 详细实现指南
### 实现 fetch_article_links()
这个方法负责获取所有需要爬取的文章链接。返回格式:
```python
def fetch_article_links(self) -> List[Dict[str, Any]]:
articles = []
# 方法1: 从API获取
response = requests.get("https://api.example.com/articles")
data = response.json()
for item in data['articles']:
articles.append({
'id': item['id'],
'url': item['url'],
'title': item['title'],
# 其他自定义字段
})
# 方法2: 从HTML页面解析
# 方法3: 从本地文件读取
return articles
```
### 实现 generate_filename()
生成安全的文件名,避免特殊字符:
```python
import re
def generate_filename(self, article: Dict[str, Any]) -> str:
title = article['title']
# 清理特殊字符
safe_title = re.sub(r'[^\w\s-]', '', title).strip()
safe_title = re.sub(r'[-\s]+', '_', safe_title)
# 限制长度
if len(safe_title) > 100:
safe_title = safe_title[:100]
return f"{article['id']}_{safe_title}.md"
```
### 实现 format_article_content()
格式化最终的文章内容:
```python
from datetime import datetime
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
return f"""# {article['title']}
**ID:** {article['id']}
**URL:** {article['url']}
**爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
{content}
---
*本文档由爬虫自动生成*
"""
```
## 配置选项详解
### ArticleCrawlerConfig 参数
| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| base_url | str | 必需 | 网站基础URL |
| output_dir | str | "./output" | 输出目录 |
| delay | float | 2.0 | 请求间隔(秒) |
| max_workers | int | 3 | 并发线程数 |
| headless | bool | True | 无头模式 |
| show_browser | bool | False | 显示浏览器 |
| user_agent | str | 默认UA | 用户代理 |
| cookies | Dict | {} | Cookie字典 |
| content_selectors | List[str] | 默认选择器 | 内容选择器列表 |
| timeout | int | 30 | 页面加载超时 |
### 内容选择器
框架会按优先级尝试以下选择器:
```python
content_selectors = [
"article", # HTML5 article标签
".article-content", # 常见类名
".content",
"#content",
".post-content",
"main",
".main-content"
]
```
你可以根据目标网站的HTML结构自定义选择器。
## 高级功能
### 断点续传
框架自动支持断点续传:
```python
# 启用断点续传(默认)
crawler.crawl_articles(resume=True, progress_file='progress.json')
# 禁用断点续传,重新开始
crawler.crawl_articles(resume=False)
```
### 自定义Cookie和反检测
```python
config = ArticleCrawlerConfig(
base_url="https://example.com",
cookies={
'session_id': 'your_session_id',
'csrf_token': 'your_csrf_token'
},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
)
```
### 并发控制
```python
# 低并发,适合严格的网站
config = ArticleCrawlerConfig(max_workers=1, delay=5.0)
# 高并发,适合宽松的网站
config = ArticleCrawlerConfig(max_workers=5, delay=1.0)
```
## 实际案例
### 案例1: 博客文章爬虫
```python
class BlogCrawler(BaseArticleCrawler):
def __init__(self, config, blog_url):
super().__init__(config)
self.blog_url = blog_url
def fetch_article_links(self):
# 从博客首页解析文章链接
articles = []
# 实现解析逻辑...
return articles
def generate_filename(self, article):
date = article.get('date', '').replace('-', '_')
title = re.sub(r'[^\w\s-]', '', article['title'])[:50]
return f"{date}_{title}.md"
def format_article_content(self, article, content):
return f"""# {article['title']}
**作者:** {article.get('author', 'Unknown')}
**发布时间:** {article.get('date', 'Unknown')}
**原文链接:** {article['url']}
{content}
"""
```
### 案例2: 新闻网站爬虫
```python
class NewsCrawler(BaseArticleCrawler):
def fetch_article_links(self):
# 从RSS或API获取新闻列表
pass
def generate_filename(self, article):
category = article.get('category', 'general')
timestamp = article.get('timestamp', int(time.time()))
return f"{category}_{timestamp}_{article['id']}.md"
```
## 最佳实践
### 1. 遵守网站规则
- 检查 robots.txt
- 设置合理的延迟时间
- 避免过高的并发数
- 尊重网站的反爬虫措施
### 2. 错误处理
- 实现重试机制
- 记录失败的文章
- 监控爬取状态
### 3. 性能优化
- 根据网站响应调整延迟
- 使用无头模式提高效率
- 定期清理浏览器缓存
### 4. 数据质量
- 验证提取的内容
- 清理HTML标签
- 处理编码问题
## 故障排除
### 常见问题
1. **浏览器启动失败**
- 检查Chrome是否已安装
- 更新ChromeDriver
- 检查系统权限
2. **内容提取失败**
- 检查content_selectors配置
- 查看页面HTML结构
- 调整等待时间
3. **反爬虫检测**
- 增加延迟时间
- 更换User-Agent
- 添加必要的Cookie
- 使用代理IP
4. **内存占用过高**
- 减少并发数
- 定期重启浏览器
- 清理临时文件
### 调试技巧
```python
# 启用浏览器显示模式进行调试
config = ArticleCrawlerConfig(
base_url="https://example.com",
headless=False,
show_browser=True
)
# 单线程模式便于调试
config.max_workers = 1
```
## 扩展开发
### 添加新的内容提取器
```python
class CustomContentExtractor(ContentExtractor):
def extract_content(self, driver, url, max_retries=3):
# 自定义提取逻辑
pass
# 在爬虫中使用
class YourCrawler(BaseArticleCrawler):
def __init__(self, config):
super().__init__(config)
self.content_extractor = CustomContentExtractor(config)
```
### 添加新的进度管理器
```python
class DatabaseProgressManager(ProgressManager):
def load_progress(self):
# 从数据库加载进度
pass
def save_progress(self, progress):
# 保存进度到数据库
pass
```
## 总结
通用文章爬虫框架提供了:
- 🏗️ **模块化设计** - 易于扩展和维护
- 🔄 **断点续传** - 支持大规模爬取任务
- 🚀 **并发处理** - 提高爬取效率
- 🛡️ **反检测** - 内置多种反爬虫措施
- 📊 **进度跟踪** - 实时监控爬取状态
- 🎯 **智能提取** - 自动识别内容区域
- 📝 **格式化输出** - 生成标准Markdown文档
通过继承基类并实现三个核心方法,你可以快速为任何网站创建专用的文章爬虫。框架处理了所有底层细节,让你专注于业务逻辑的实现。
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment