Skip to content

Instantly share code, notes, and snippets.

@SimpleZn
Last active August 29, 2025 07:57
Show Gist options
  • Select an option

  • Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.

Select an option

Save SimpleZn/bd2d2d619beb937b80b59c97bfa10927 to your computer and use it in GitHub Desktop.
generic_article_crawler.py
#!/usr/bin/env python3
"""
通用文章爬虫框架
基于Selenium的可扩展文章爬取基础框架
作者: AI Assistant
创建时间: 2025-01-27
"""
import json
import time
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse
from datetime import datetime
import re
import random
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Tuple, Any
# Selenium相关导入
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
from webdriver_manager.chrome import ChromeDriverManager
# HTML转Markdown
import html2text
from bs4 import BeautifulSoup
class ArticleCrawlerConfig:
"""爬虫配置类"""
def __init__(self,
base_url: str,
output_dir: str = "./output",
delay: float = 2.0,
max_workers: int = 3,
headless: bool = True,
show_browser: bool = False,
user_agent: str = None,
cookies: Dict[str, str] = None,
content_selectors: List[str] = None,
timeout: int = 30):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.delay = delay
self.max_workers = max_workers
self.headless = headless
self.show_browser = show_browser
self.timeout = timeout
# 默认用户代理
self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
# Cookie设置
self.cookies = cookies or {}
# 内容选择器(按优先级排序)
self.content_selectors = content_selectors or [
"article",
".article-content",
".content",
"#content",
".post-content",
"main",
".main-content"
]
# 创建输出目录
self.output_dir.mkdir(parents=True, exist_ok=True)
class SeleniumDriverManager:
"""Selenium浏览器驱动管理器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver = None
self._setup_chrome_options()
def _setup_chrome_options(self):
"""设置Chrome选项"""
self.chrome_options = Options()
# 基础设置
if self.config.headless and not self.config.show_browser:
self.chrome_options.add_argument('--headless')
# 性能和稳定性选项
chrome_args = [
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-plugins',
'--window-size=1920,1080',
# 反检测选项
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--allow-running-insecure-content',
'--disable-features=VizDisplayCompositor'
]
for arg in chrome_args:
self.chrome_options.add_argument(arg)
# 实验性选项
self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
self.chrome_options.add_experimental_option('useAutomationExtension', False)
# 用户代理
self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}')
def setup_driver(self) -> bool:
"""设置Chrome浏览器驱动"""
try:
service = ChromeService(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
self.driver.set_page_load_timeout(self.config.timeout)
self.driver.implicitly_wait(10)
# 执行反检测脚本
anti_detection_scripts = [
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})",
"Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})",
"Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})",
"window.chrome = {runtime: {}}"
]
for script in anti_detection_scripts:
self.driver.execute_script(script)
# 添加Cookie
if self.config.cookies:
self._add_cookies()
mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式"
print(f"✅ Chrome浏览器已启动 ({mode_desc})")
return True
except Exception as e:
print(f"❌ 浏览器启动失败: {e}")
return False
def _add_cookies(self):
"""添加Cookie"""
# 先访问基础域名
domain = urlparse(self.config.base_url).netloc
self.driver.get(self.config.base_url)
for name, value in self.config.cookies.items():
try:
self.driver.add_cookie({
'name': name,
'value': value,
'domain': f'.{domain}'
})
except Exception as e:
print(f"⚠️ 添加Cookie失败 {name}: {e}")
def cleanup_driver(self):
"""清理浏览器驱动"""
if self.driver:
try:
self.driver.quit()
print("✅ 浏览器已关闭")
except Exception as e:
print(f"⚠️ 关闭浏览器时出错: {e}")
class ContentExtractor:
"""内容提取器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.html_converter = html2text.HTML2Text()
self.html_converter.ignore_links = False
self.html_converter.ignore_images = False
self.html_converter.body_width = 0
def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]:
"""提取页面内容"""
for attempt in range(max_retries):
try:
# 随机延迟
delay = random.uniform(self.config.delay, self.config.delay * 2)
time.sleep(delay)
driver.get(url)
# 等待页面加载
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 额外等待确保页面完全加载
time.sleep(random.uniform(1, 3))
# 尝试多种选择器找到文章内容
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "No content container found"
# 获取HTML内容,处理 stale element 异常
try:
html_content = content_element.get_attribute('outerHTML')
except StaleElementReferenceException:
# 元素已过期,重新查找
print(f" ⚠️ 元素已过期,重新定位")
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content element became stale and could not be relocated"
html_content = content_element.get_attribute('outerHTML')
if not html_content or len(html_content.strip()) < 100:
if attempt < max_retries - 1:
print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content too short"
# 清理和转换内容
markdown_content = self._process_html_content(html_content)
return markdown_content.strip(), None
except TimeoutException:
if attempt < max_retries - 1:
print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Page load timeout"
except Exception as e:
if attempt < max_retries - 1:
print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}")
continue
else:
return None, f"Extraction error: {str(e)}"
return None, "Max retries exceeded"
def _find_content_element(self, driver: webdriver.Chrome):
"""查找内容元素"""
# 尝试预定义的选择器
for selector in self.config.content_selectors:
try:
# 等待元素出现
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
elements = driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
print(f" ✅ 找到内容容器: {selector}")
return elements[0]
except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
continue
# 如果没找到特定容器,尝试找到包含最多文本的div
try:
# 等待页面基本加载完成
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "div"))
)
divs = driver.find_elements(By.TAG_NAME, "div")
max_text_length = 0
best_div = None
for div in divs:
try:
text_length = len(div.text.strip())
if text_length > max_text_length and text_length > 100:
max_text_length = text_length
best_div = div
except (StaleElementReferenceException, Exception):
continue
if best_div:
print(f" ✅ 找到内容容器: auto-detected div")
return best_div
except (TimeoutException, Exception):
pass
return None
def _process_html_content(self, html_content: str) -> str:
"""处理HTML内容"""
# 使用BeautifulSoup清理HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 移除不需要的元素
unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside']
for tag in soup.find_all(unwanted_tags):
tag.decompose()
# 转换为Markdown
markdown_content = self.html_converter.handle(str(soup))
return markdown_content
class ProgressManager:
"""进度管理器"""
def __init__(self, progress_file: str = 'crawl_progress.json'):
self.progress_file = progress_file
def load_progress(self) -> Dict[str, set]:
"""加载爬取进度"""
progress_path = Path(self.progress_file)
if not progress_path.exists():
return {'completed': set(), 'failed': set()}
try:
with open(progress_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return {
'completed': set(data.get('completed', [])),
'failed': set(data.get('failed', []))
}
except Exception as e:
print(f"⚠️ 读取进度文件失败: {e},将重新开始")
return {'completed': set(), 'failed': set()}
def save_progress(self, progress: Dict[str, set]):
"""保存爬取进度"""
try:
data = {
'completed': list(progress['completed']),
'failed': list(progress['failed']),
'last_update': datetime.now().isoformat()
}
with open(self.progress_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"⚠️ 保存进度文件失败: {e}")
class BaseArticleCrawler(ABC):
"""文章爬虫基类"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver_manager = SeleniumDriverManager(config)
self.content_extractor = ContentExtractor(config)
self.progress_manager = ProgressManager()
# 统计信息
self.stats = {
'total_articles': 0,
'processed': 0,
'success': 0,
'failed': 0
}
@abstractmethod
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""获取文章链接列表 - 子类必须实现"""
pass
@abstractmethod
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名 - 子类必须实现"""
pass
@abstractmethod
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容 - 子类必须实现"""
pass
def is_article_completed(self, article: Dict[str, Any]) -> bool:
"""检查文章是否已完成"""
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
return filepath.exists()
def save_article(self, article: Dict[str, Any], content: str) -> bool:
"""保存文章为Markdown文件"""
try:
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
# 格式化完整内容
full_content = self.format_article_content(article, content)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(full_content)
print(f" 💾 已保存: {filename}")
return True
except Exception as e:
print(f" ❌ 保存失败: {e}")
return False
def process_single_article(self, article: Dict[str, Any]) -> bool:
"""处理单篇文章"""
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"📄 处理: {article_title}")
# 提取内容
content, error = self.content_extractor.extract_content(
self.driver_manager.driver,
article['url']
)
if error:
print(f" ❌ 提取失败: {error}")
return False
if not content:
print(f" ❌ 内容为空")
return False
# 保存文章
success = self.save_article(article, content)
if success:
self.stats['success'] += 1
else:
self.stats['failed'] += 1
self.stats['processed'] += 1
# 显示进度 - 使用剩余文章数计算进度
remaining_total = getattr(self, '_remaining_total', self.stats['total_articles'])
progress = (self.stats['processed'] / remaining_total) * 100
print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)")
return success
def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'):
"""爬取所有文章"""
# 获取文章链接
articles = self.fetch_article_links()
if not articles:
print("❌ 未找到文章链接")
return
# 设置进度管理器
self.progress_manager.progress_file = progress_file
# 加载进度
progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()}
# 过滤已完成的文章
remaining_articles = []
skipped_count = 0
for article in articles:
article_id = self._get_article_id(article)
# 检查是否已在进度记录中完成
if article_id in progress['completed']:
skipped_count += 1
continue
# 检查文件是否已存在
if self.is_article_completed(article):
progress['completed'].add(article_id)
skipped_count += 1
continue
remaining_articles.append(article)
self.stats['total_articles'] = len(articles)
remaining_count = len(remaining_articles)
# 设置剩余文章数用于进度计算
self._remaining_total = remaining_count
print(f"🚀 开始爬取文章内容")
print(f"📊 总文章数: {len(articles)}")
print(f"✅ 已完成: {skipped_count}")
print(f"🔄 待处理: {remaining_count}")
print(f"📁 输出目录: {self.config.output_dir.absolute()}")
print(f"🔧 并发数: {self.config.max_workers}")
print(f"💾 进度文件: {progress_file}")
print("-" * 60)
if remaining_count == 0:
print("🎉 所有文章已完成,无需继续爬取!")
return
# 设置浏览器
if not self.driver_manager.setup_driver():
return
try:
# 使用线程池处理文章
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
# 为每个文章创建一个任务
future_to_article = {}
for article in remaining_articles:
future = executor.submit(self.process_single_article, article)
future_to_article[future] = article
# 处理完成的任务
for future in as_completed(future_to_article):
article = future_to_article[future]
article_id = self._get_article_id(article)
try:
success = future.result()
if success:
progress['completed'].add(article_id)
# 定期保存进度
if len(progress['completed']) % 10 == 0:
self.progress_manager.save_progress(progress)
time.sleep(self.config.delay)
else:
progress['failed'].add(article_id)
except Exception as e:
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"❌ 处理文章 {article_title} 时出错: {e}")
progress['failed'].add(article_id)
self.stats['failed'] += 1
self.stats['processed'] += 1
finally:
# 最终保存进度
self.progress_manager.save_progress(progress)
self.driver_manager.cleanup_driver()
# 显示最终统计
self._print_final_stats(progress)
def _get_article_id(self, article: Dict[str, Any]) -> str:
"""获取文章唯一标识"""
return article.get('id') or article.get('code') or article.get('url', '')
def _print_final_stats(self, progress: Dict[str, set]):
"""打印最终统计信息"""
total_completed = len(progress['completed'])
total_failed = len(progress['failed'])
print("\n" + "=" * 60)
print(f"📊 文章爬取完成统计")
print("=" * 60)
print(f"总文章数: {self.stats['total_articles']}")
print(f"本次处理: {self.stats['processed']}")
print(f"本次成功: {self.stats['success']}")
print(f"本次失败: {self.stats['failed']}")
print(f"累计完成: {total_completed}")
print(f"累计失败: {total_failed}")
if self.stats['total_articles'] > 0:
print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%")
print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}")
print(f"💾 进度已保存到: {self.progress_manager.progress_file}")
# 示例实现:简单的URL列表爬虫
class SimpleUrlCrawler(BaseArticleCrawler):
"""简单的URL列表爬虫示例"""
def __init__(self, config: ArticleCrawlerConfig, urls: List[str]):
super().__init__(config)
self.urls = urls
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""从URL列表生成文章信息"""
articles = []
for i, url in enumerate(self.urls):
articles.append({
'id': str(i),
'url': url,
'title': f"Article_{i+1}"
})
return articles
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名"""
safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip()
safe_title = re.sub(r'[-\s]+', '_', safe_title)
return f"{article['id']}_{safe_title}.md"
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容"""
return f"""# {article['title']}
**URL:** {article['url']}
**ID:** {article['id']}
**爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
{content}
"""
def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig:
"""创建默认配置"""
return ArticleCrawlerConfig(base_url=base_url, **kwargs)
if __name__ == '__main__':
# 示例用法
config = create_default_config(
base_url="https://example.com",
output_dir="./articles",
delay=2.0,
max_workers=2
)
# 示例URL列表
urls = [
"https://example.com/article1",
"https://example.com/article2"
]
crawler = SimpleUrlCrawler(config, urls)
crawler.crawl_articles()#!/usr/bin/env python3
"""
通用文章爬虫框架
基于Selenium的可扩展文章爬取基础框架
作者: AI Assistant
创建时间: 2025-01-27
"""
import json
import time
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse
from datetime import datetime
import re
import random
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Tuple, Any
# Selenium相关导入
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
from webdriver_manager.chrome import ChromeDriverManager
# HTML转Markdown
import html2text
from bs4 import BeautifulSoup
class ArticleCrawlerConfig:
"""爬虫配置类"""
def __init__(self,
base_url: str,
output_dir: str = "./output",
delay: float = 2.0,
max_workers: int = 3,
headless: bool = True,
show_browser: bool = False,
user_agent: str = None,
cookies: Dict[str, str] = None,
content_selectors: List[str] = None,
timeout: int = 30):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.delay = delay
self.max_workers = max_workers
self.headless = headless
self.show_browser = show_browser
self.timeout = timeout
# 默认用户代理
self.user_agent = user_agent or 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
# Cookie设置
self.cookies = cookies or {}
# 内容选择器(按优先级排序)
self.content_selectors = content_selectors or [
"article",
".article-content",
".content",
"#content",
".post-content",
"main",
".main-content"
]
# 创建输出目录
self.output_dir.mkdir(parents=True, exist_ok=True)
class SeleniumDriverManager:
"""Selenium浏览器驱动管理器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver = None
self._setup_chrome_options()
def _setup_chrome_options(self):
"""设置Chrome选项"""
self.chrome_options = Options()
# 基础设置
if self.config.headless and not self.config.show_browser:
self.chrome_options.add_argument('--headless')
# 性能和稳定性选项
chrome_args = [
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-plugins',
'--window-size=1920,1080',
# 反检测选项
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--allow-running-insecure-content',
'--disable-features=VizDisplayCompositor'
]
for arg in chrome_args:
self.chrome_options.add_argument(arg)
# 实验性选项
self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
self.chrome_options.add_experimental_option('useAutomationExtension', False)
# 用户代理
self.chrome_options.add_argument(f'--user-agent={self.config.user_agent}')
def setup_driver(self) -> bool:
"""设置Chrome浏览器驱动"""
try:
service = ChromeService(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
self.driver.set_page_load_timeout(self.config.timeout)
self.driver.implicitly_wait(10)
# 执行反检测脚本
anti_detection_scripts = [
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})",
"Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})",
"Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']})",
"window.chrome = {runtime: {}}"
]
for script in anti_detection_scripts:
self.driver.execute_script(script)
# 添加Cookie
if self.config.cookies:
self._add_cookies()
mode_desc = "无头模式" if (self.config.headless and not self.config.show_browser) else "显示模式"
print(f"✅ Chrome浏览器已启动 ({mode_desc})")
return True
except Exception as e:
print(f"❌ 浏览器启动失败: {e}")
return False
def _add_cookies(self):
"""添加Cookie"""
# 先访问基础域名
domain = urlparse(self.config.base_url).netloc
self.driver.get(self.config.base_url)
for name, value in self.config.cookies.items():
try:
self.driver.add_cookie({
'name': name,
'value': value,
'domain': f'.{domain}'
})
except Exception as e:
print(f"⚠️ 添加Cookie失败 {name}: {e}")
def cleanup_driver(self):
"""清理浏览器驱动"""
if self.driver:
try:
self.driver.quit()
print("✅ 浏览器已关闭")
except Exception as e:
print(f"⚠️ 关闭浏览器时出错: {e}")
class ContentExtractor:
"""内容提取器"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.html_converter = html2text.HTML2Text()
self.html_converter.ignore_links = False
self.html_converter.ignore_images = False
self.html_converter.body_width = 0
def extract_content(self, driver: webdriver.Chrome, url: str, max_retries: int = 3) -> Tuple[Optional[str], Optional[str]]:
"""提取页面内容"""
for attempt in range(max_retries):
try:
# 随机延迟
delay = random.uniform(self.config.delay, self.config.delay * 2)
time.sleep(delay)
driver.get(url)
# 等待页面加载
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 额外等待确保页面完全加载
time.sleep(random.uniform(1, 3))
# 尝试多种选择器找到文章内容
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 未找到内容容器,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "No content container found"
# 获取HTML内容,处理 stale element 异常
try:
html_content = content_element.get_attribute('outerHTML')
except StaleElementReferenceException:
# 元素已过期,重新查找
print(f" ⚠️ 元素已过期,重新定位")
content_element = self._find_content_element(driver)
if not content_element:
if attempt < max_retries - 1:
print(f" ⚠️ 重新定位失败,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content element became stale and could not be relocated"
html_content = content_element.get_attribute('outerHTML')
if not html_content or len(html_content.strip()) < 100:
if attempt < max_retries - 1:
print(f" ⚠️ 内容过短,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Content too short"
# 清理和转换内容
markdown_content = self._process_html_content(html_content)
return markdown_content.strip(), None
except TimeoutException:
if attempt < max_retries - 1:
print(f" ⚠️ 页面加载超时,重试 {attempt + 1}/{max_retries}")
continue
else:
return None, "Page load timeout"
except Exception as e:
if attempt < max_retries - 1:
print(f" ⚠️ 提取错误,重试 {attempt + 1}/{max_retries}: {str(e)}")
continue
else:
return None, f"Extraction error: {str(e)}"
return None, "Max retries exceeded"
def _find_content_element(self, driver: webdriver.Chrome):
"""查找内容元素"""
# 尝试预定义的选择器
for selector in self.config.content_selectors:
try:
# 等待元素出现
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
elements = driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
print(f" ✅ 找到内容容器: {selector}")
return elements[0]
except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
continue
# 如果没找到特定容器,尝试找到包含最多文本的div
try:
# 等待页面基本加载完成
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "div"))
)
divs = driver.find_elements(By.TAG_NAME, "div")
max_text_length = 0
best_div = None
for div in divs:
try:
text_length = len(div.text.strip())
if text_length > max_text_length and text_length > 100:
max_text_length = text_length
best_div = div
except (StaleElementReferenceException, Exception):
continue
if best_div:
print(f" ✅ 找到内容容器: auto-detected div")
return best_div
except (TimeoutException, Exception):
pass
return None
def _process_html_content(self, html_content: str) -> str:
"""处理HTML内容"""
# 使用BeautifulSoup清理HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 移除不需要的元素
unwanted_tags = ['script', 'style', 'nav', 'header', 'footer', 'aside']
for tag in soup.find_all(unwanted_tags):
tag.decompose()
# 转换为Markdown
markdown_content = self.html_converter.handle(str(soup))
return markdown_content
class ProgressManager:
"""进度管理器"""
def __init__(self, progress_file: str = 'crawl_progress.json'):
self.progress_file = progress_file
def load_progress(self) -> Dict[str, set]:
"""加载爬取进度"""
progress_path = Path(self.progress_file)
if not progress_path.exists():
return {'completed': set(), 'failed': set()}
try:
with open(progress_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return {
'completed': set(data.get('completed', [])),
'failed': set(data.get('failed', []))
}
except Exception as e:
print(f"⚠️ 读取进度文件失败: {e},将重新开始")
return {'completed': set(), 'failed': set()}
def save_progress(self, progress: Dict[str, set]):
"""保存爬取进度"""
try:
data = {
'completed': list(progress['completed']),
'failed': list(progress['failed']),
'last_update': datetime.now().isoformat()
}
with open(self.progress_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"⚠️ 保存进度文件失败: {e}")
class BaseArticleCrawler(ABC):
"""文章爬虫基类"""
def __init__(self, config: ArticleCrawlerConfig):
self.config = config
self.driver_manager = SeleniumDriverManager(config)
self.content_extractor = ContentExtractor(config)
self.progress_manager = ProgressManager()
# 统计信息
self.stats = {
'total_articles': 0,
'processed': 0,
'success': 0,
'failed': 0
}
@abstractmethod
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""获取文章链接列表 - 子类必须实现"""
pass
@abstractmethod
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名 - 子类必须实现"""
pass
@abstractmethod
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容 - 子类必须实现"""
pass
def is_article_completed(self, article: Dict[str, Any]) -> bool:
"""检查文章是否已完成"""
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
return filepath.exists()
def save_article(self, article: Dict[str, Any], content: str) -> bool:
"""保存文章为Markdown文件"""
try:
filename = self.generate_filename(article)
filepath = self.config.output_dir / filename
# 格式化完整内容
full_content = self.format_article_content(article, content)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(full_content)
print(f" 💾 已保存: {filename}")
return True
except Exception as e:
print(f" ❌ 保存失败: {e}")
return False
def process_single_article(self, article: Dict[str, Any]) -> bool:
"""处理单篇文章"""
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"📄 处理: {article_title}")
# 提取内容
content, error = self.content_extractor.extract_content(
self.driver_manager.driver,
article['url']
)
if error:
print(f" ❌ 提取失败: {error}")
return False
if not content:
print(f" ❌ 内容为空")
return False
# 保存文章
success = self.save_article(article, content)
if success:
self.stats['success'] += 1
else:
self.stats['failed'] += 1
self.stats['processed'] += 1
# 显示进度 - 使用剩余文章数计算进度
remaining_total = getattr(self, '_remaining_total', self.stats['total_articles'])
progress = (self.stats['processed'] / remaining_total) * 100
print(f"📊 进度: {self.stats['processed']}/{remaining_total} ({progress:.1f}%)")
return success
def crawl_articles(self, resume: bool = True, progress_file: str = 'crawl_progress.json'):
"""爬取所有文章"""
# 获取文章链接
articles = self.fetch_article_links()
if not articles:
print("❌ 未找到文章链接")
return
# 设置进度管理器
self.progress_manager.progress_file = progress_file
# 加载进度
progress = self.progress_manager.load_progress() if resume else {'completed': set(), 'failed': set()}
# 过滤已完成的文章
remaining_articles = []
skipped_count = 0
for article in articles:
article_id = self._get_article_id(article)
# 检查是否已在进度记录中完成
if article_id in progress['completed']:
skipped_count += 1
continue
# 检查文件是否已存在
if self.is_article_completed(article):
progress['completed'].add(article_id)
skipped_count += 1
continue
remaining_articles.append(article)
self.stats['total_articles'] = len(articles)
remaining_count = len(remaining_articles)
# 设置剩余文章数用于进度计算
self._remaining_total = remaining_count
print(f"🚀 开始爬取文章内容")
print(f"📊 总文章数: {len(articles)}")
print(f"✅ 已完成: {skipped_count}")
print(f"🔄 待处理: {remaining_count}")
print(f"📁 输出目录: {self.config.output_dir.absolute()}")
print(f"🔧 并发数: {self.config.max_workers}")
print(f"💾 进度文件: {progress_file}")
print("-" * 60)
if remaining_count == 0:
print("🎉 所有文章已完成,无需继续爬取!")
return
# 设置浏览器
if not self.driver_manager.setup_driver():
return
try:
# 使用线程池处理文章
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
# 为每个文章创建一个任务
future_to_article = {}
for article in remaining_articles:
future = executor.submit(self.process_single_article, article)
future_to_article[future] = article
# 处理完成的任务
for future in as_completed(future_to_article):
article = future_to_article[future]
article_id = self._get_article_id(article)
try:
success = future.result()
if success:
progress['completed'].add(article_id)
# 定期保存进度
if len(progress['completed']) % 10 == 0:
self.progress_manager.save_progress(progress)
time.sleep(self.config.delay)
else:
progress['failed'].add(article_id)
except Exception as e:
article_title = article.get('title', article.get('url', 'Unknown'))
print(f"❌ 处理文章 {article_title} 时出错: {e}")
progress['failed'].add(article_id)
self.stats['failed'] += 1
self.stats['processed'] += 1
finally:
# 最终保存进度
self.progress_manager.save_progress(progress)
self.driver_manager.cleanup_driver()
# 显示最终统计
self._print_final_stats(progress)
def _get_article_id(self, article: Dict[str, Any]) -> str:
"""获取文章唯一标识"""
return article.get('id') or article.get('code') or article.get('url', '')
def _print_final_stats(self, progress: Dict[str, set]):
"""打印最终统计信息"""
total_completed = len(progress['completed'])
total_failed = len(progress['failed'])
print("\n" + "=" * 60)
print(f"📊 文章爬取完成统计")
print("=" * 60)
print(f"总文章数: {self.stats['total_articles']}")
print(f"本次处理: {self.stats['processed']}")
print(f"本次成功: {self.stats['success']}")
print(f"本次失败: {self.stats['failed']}")
print(f"累计完成: {total_completed}")
print(f"累计失败: {total_failed}")
if self.stats['total_articles'] > 0:
print(f"总体成功率: {(total_completed/self.stats['total_articles']*100):.1f}%")
print(f"\n📁 文档已保存到: {self.config.output_dir.absolute()}")
print(f"💾 进度已保存到: {self.progress_manager.progress_file}")
# 示例实现:简单的URL列表爬虫
class SimpleUrlCrawler(BaseArticleCrawler):
"""简单的URL列表爬虫示例"""
def __init__(self, config: ArticleCrawlerConfig, urls: List[str]):
super().__init__(config)
self.urls = urls
def fetch_article_links(self) -> List[Dict[str, Any]]:
"""从URL列表生成文章信息"""
articles = []
for i, url in enumerate(self.urls):
articles.append({
'id': str(i),
'url': url,
'title': f"Article_{i+1}"
})
return articles
def generate_filename(self, article: Dict[str, Any]) -> str:
"""生成文件名"""
safe_title = re.sub(r'[^\w\s-]', '', article['title']).strip()
safe_title = re.sub(r'[-\s]+', '_', safe_title)
return f"{article['id']}_{safe_title}.md"
def format_article_content(self, article: Dict[str, Any], content: str) -> str:
"""格式化文章内容"""
return f"""# {article['title']}
**URL:** {article['url']}
**ID:** {article['id']}
**爬取时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
{content}
"""
def create_default_config(base_url: str, **kwargs) -> ArticleCrawlerConfig:
"""创建默认配置"""
return ArticleCrawlerConfig(base_url=base_url, **kwargs)
if __name__ == '__main__':
# 示例用法
config = create_default_config(
base_url="https://example.com",
output_dir="./articles",
delay=2.0,
max_workers=2
)
# 示例URL列表
urls = [
"https://example.com/article1",
"https://example.com/article2"
]
crawler = SimpleUrlCrawler(config, urls)
crawler.crawl_articles()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment