|
|
@@ -0,0 +1,1006 @@ |
|
|
require "bundler/inline" |
|
|
|
|
|
gemfile do |
|
|
source "https://rubygems.org" |
|
|
gem "async" |
|
|
gem "ferrum" |
|
|
gem "state_machines" |
|
|
gem "breaker_machines" |
|
|
gem "concurrent-ruby" |
|
|
gem "nokogiri" |
|
|
gem "pandoc-ruby" |
|
|
end |
|
|
|
|
|
require "async" |
|
|
require "async/queue" |
|
|
require "async/semaphore" |
|
|
require "ferrum" |
|
|
require "uri" |
|
|
require "json" |
|
|
require "timeout" |
|
|
require "state_machines" |
|
|
require "breaker_machines" |
|
|
# Use concurrent-ruby for thread-safe data structures and atomic operations |
|
|
# Even though Async uses cooperative concurrency, this provides defensive programming |
|
|
# and future-proofing in case threading is added later |
|
|
require "concurrent" |
|
|
require "fileutils" |
|
|
require "digest" |
|
|
require "nokogiri" |
|
|
require "pandoc-ruby" |
|
|
|
|
|
class ScrapedPage |
|
|
attr_accessor :url, :html, :scraped_at, :error_message, :retry_count, :last_error_at |
|
|
|
|
|
def initialize(url) |
|
|
@url = url |
|
|
@html = nil |
|
|
@scraped_at = nil |
|
|
@error_message = nil |
|
|
@retry_count = 0 |
|
|
@last_error_at = nil |
|
|
# Initialize state machine manually |
|
|
self.state = "pending" |
|
|
end |
|
|
|
|
|
state_machine :state, initial: :pending do |
|
|
event :start_processing do |
|
|
transition pending: :processing, retrying: :processing |
|
|
end |
|
|
|
|
|
event :complete do |
|
|
transition processing: :completed |
|
|
end |
|
|
|
|
|
event :mark_failed do |
|
|
transition processing: :failed |
|
|
end |
|
|
|
|
|
event :retry_page do |
|
|
transition failed: :retrying |
|
|
end |
|
|
|
|
|
event :give_up do |
|
|
transition [:failed, :retrying] => :permanently_failed |
|
|
end |
|
|
|
|
|
state :pending do |
|
|
def ready_for_processing? |
|
|
true |
|
|
end |
|
|
end |
|
|
|
|
|
state :processing do |
|
|
def ready_for_processing? |
|
|
false |
|
|
end |
|
|
end |
|
|
|
|
|
state :completed do |
|
|
def ready_for_processing? |
|
|
false |
|
|
end |
|
|
|
|
|
def success? |
|
|
true |
|
|
end |
|
|
end |
|
|
|
|
|
state :failed do |
|
|
def ready_for_processing? |
|
|
can_retry? |
|
|
end |
|
|
|
|
|
def can_retry? |
|
|
retry_count < 3 && (last_error_at.nil? || Time.now - last_error_at > backoff_delay) |
|
|
end |
|
|
|
|
|
def backoff_delay |
|
|
# Exponential backoff: 30s, 60s, 120s |
|
|
30 * (2**retry_count) |
|
|
end |
|
|
end |
|
|
|
|
|
state :retrying do |
|
|
def ready_for_processing? |
|
|
true |
|
|
end |
|
|
end |
|
|
|
|
|
state :permanently_failed do |
|
|
def ready_for_processing? |
|
|
false |
|
|
end |
|
|
|
|
|
def success? |
|
|
false |
|
|
end |
|
|
end |
|
|
end |
|
|
|
|
|
def record_error(error) |
|
|
@error_message = error.message |
|
|
@last_error_at = Time.now |
|
|
@retry_count += 1 |
|
|
end |
|
|
|
|
|
def record_success(html) |
|
|
@html = html |
|
|
@scraped_at = Time.now |
|
|
@error_message = nil |
|
|
end |
|
|
|
|
|
def to_h |
|
|
{ |
|
|
url: @url, |
|
|
html: @html, |
|
|
scraped_at: @scraped_at, |
|
|
state: state, |
|
|
error_message: @error_message, |
|
|
retry_count: @retry_count, |
|
|
last_error_at: @last_error_at |
|
|
} |
|
|
end |
|
|
|
|
|
def to_state_h |
|
|
{ |
|
|
url: @url, |
|
|
scraped_at: @scraped_at, |
|
|
state: state, |
|
|
error_message: @error_message, |
|
|
retry_count: @retry_count, |
|
|
last_error_at: @last_error_at |
|
|
} |
|
|
end |
|
|
|
|
|
def self.from_h(data) |
|
|
page = new(data["url"]) |
|
|
page.html = data["html"] |
|
|
page.scraped_at = data["scraped_at"] |
|
|
page.error_message = data["error_message"] |
|
|
page.retry_count = data["retry_count"] || 0 |
|
|
page.last_error_at = data["last_error_at"] |
|
|
|
|
|
# Restore state |
|
|
target_state = data["state"]&.to_sym || :pending |
|
|
case target_state |
|
|
when :processing then page.start_processing |
|
|
when :completed then page.start_processing && page.complete |
|
|
when :failed then page.start_processing && page.mark_failed |
|
|
when :retrying then page.start_processing && page.mark_failed && page.retry_page |
|
|
when :permanently_failed then page.start_processing && page.mark_failed && page.give_up |
|
|
end |
|
|
|
|
|
page |
|
|
end |
|
|
end |
|
|
|
|
|
class AsyncScraper |
|
|
include BreakerMachines::DSL |
|
|
|
|
|
attr_reader :browser, :claimed_urls, :completed_urls, :url_queue, :results, :pages |
|
|
|
|
|
# Content size limit - 10MB default |
|
|
MAX_CONTENT_SIZE = 10_000_000 |
|
|
|
|
|
def initialize(*urls, mode: :scrape, max_concurrent: 5, timeout: 10, state_container: nil, max_pages: nil, max_content_size: MAX_CONTENT_SIZE) |
|
|
raise ArgumentError, "urls must be provided" if !urls || urls.empty? |
|
|
raise ArgumentError, "mode must be :scrape or :spider" unless [:scrape, :spider].include?(mode) |
|
|
|
|
|
@initial_urls = urls |
|
|
@mode = mode |
|
|
@max_concurrent = max_concurrent |
|
|
@timeout = timeout |
|
|
@state_container = state_container |
|
|
@max_pages = max_pages |
|
|
@max_content_size = max_content_size |
|
|
|
|
|
@claimed_urls = Concurrent::Set.new # URLs currently being processed |
|
|
@completed_urls = Concurrent::Set.new # URLs that have been processed (success or permanent failure) |
|
|
@discovered_urls = Concurrent::Set.new # All URLs discovered during crawling |
|
|
@results = Concurrent::Array.new |
|
|
@pages = Concurrent::Hash.new # URL -> ScrapedPage mapping |
|
|
@semaphore = Async::Semaphore.new(max_concurrent) |
|
|
@pages_scraped = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for scraped pages |
|
|
@active_jobs = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for active jobs |
|
|
@work_available = true # Flag to signal workers when to stop |
|
|
|
|
|
# Initialize state machine manually |
|
|
self.state = "idle" |
|
|
|
|
|
load_state if @state_container |
|
|
end |
|
|
|
|
|
state_machine :state, initial: :idle do |
|
|
event :start do |
|
|
transition idle: :running, stopped: :running, error: :running |
|
|
end |
|
|
|
|
|
event :pause do |
|
|
transition running: :paused |
|
|
end |
|
|
|
|
|
event :resume do |
|
|
transition paused: :running |
|
|
end |
|
|
|
|
|
event :stop do |
|
|
transition [:running, :paused, :error] => :stopped |
|
|
end |
|
|
|
|
|
event :error_occurred do |
|
|
transition [:running, :paused] => :error |
|
|
end |
|
|
|
|
|
event :reset do |
|
|
transition any => :idle |
|
|
end |
|
|
|
|
|
state :idle do |
|
|
def can_start? |
|
|
true |
|
|
end |
|
|
end |
|
|
|
|
|
state :running do |
|
|
def active? |
|
|
true |
|
|
end |
|
|
|
|
|
def can_process_pages? |
|
|
true |
|
|
end |
|
|
end |
|
|
|
|
|
state :paused do |
|
|
def active? |
|
|
true |
|
|
end |
|
|
|
|
|
def can_process_pages? |
|
|
false |
|
|
end |
|
|
end |
|
|
|
|
|
state :stopped do |
|
|
def active? |
|
|
false |
|
|
end |
|
|
|
|
|
def can_process_pages? |
|
|
false |
|
|
end |
|
|
end |
|
|
|
|
|
state :error do |
|
|
def active? |
|
|
false |
|
|
end |
|
|
|
|
|
def can_process_pages? |
|
|
false |
|
|
end |
|
|
end |
|
|
end |
|
|
|
|
|
# Define circuit breaker for page scraping |
|
|
circuit :page_scraping do |
|
|
threshold failures: 5, within: 1.minute # Trip after 5 failures in 1 minute |
|
|
reset_after 10.seconds # Wait 10 seconds before trying again |
|
|
fallback { nil } # Return nil when circuit is open |
|
|
|
|
|
# Additional configuration for robustness |
|
|
on_open { |circuit| puts "Circuit breaker opened for page scraping - too many failures" } |
|
|
on_close { |circuit| puts "Circuit breaker closed for page scraping - service recovered" } |
|
|
on_half_open { |circuit| puts "Circuit breaker half-open for page scraping - testing recovery" } |
|
|
end |
|
|
|
|
|
def run(&block) |
|
|
start # Transition to running state |
|
|
|
|
|
Sync(annotation: "AsyncScraper#run") do |task| |
|
|
# Start browser |
|
|
@browser = Ferrum::Browser.new( |
|
|
headless: true, |
|
|
timeout: @timeout, |
|
|
pending_connection_errors: false |
|
|
) |
|
|
|
|
|
begin |
|
|
@url_queue = Async::Queue.new |
|
|
|
|
|
# Add initial URLs to queue if not resuming |
|
|
if @discovered_urls.empty? |
|
|
@initial_urls.each { |url| @discovered_urls.add(url) } |
|
|
end |
|
|
|
|
|
# Count URLs to process |
|
|
urls_to_process = @discovered_urls.reject { |url| @completed_urls.include?(url) } |
|
|
retryable_pages = @pages.values.select { |page| page.ready_for_processing? } |
|
|
|
|
|
# Push URLs to queue |
|
|
urls_to_process.each { |url| @url_queue.push(url) } |
|
|
retryable_pages.each do |page| |
|
|
@url_queue.push(page.url) |
|
|
@discovered_urls.add(page.url) |
|
|
end |
|
|
|
|
|
# If no work to do, complete immediately |
|
|
total_queued = urls_to_process.size + retryable_pages.size |
|
|
if total_queued == 0 |
|
|
puts "No URLs to process - all work completed" |
|
|
return @results |
|
|
end |
|
|
|
|
|
# Start orchestrator in background |
|
|
orchestrator = Async do |
|
|
orchestrate_work(&block) |
|
|
end |
|
|
|
|
|
# Start worker pool using simplified worker loop |
|
|
workers = @max_concurrent.times.map do |worker_id| |
|
|
Async do |
|
|
worker_loop(worker_id, &block) |
|
|
end |
|
|
end |
|
|
|
|
|
# Wait for orchestrator to signal completion with timeout |
|
|
begin |
|
|
orchestrator.wait |
|
|
rescue => e |
|
|
puts "Orchestrator error: #{e.message}" |
|
|
end |
|
|
|
|
|
# Ensure work is stopped |
|
|
@work_available = false |
|
|
|
|
|
# Send stop signals to all workers |
|
|
@max_concurrent.times { |
|
|
begin |
|
|
@url_queue.push(nil) |
|
|
rescue |
|
|
# Queue might be closed |
|
|
end |
|
|
} |
|
|
|
|
|
# Wait for all workers to complete with individual timeouts |
|
|
workers.each_with_index do |worker, idx| |
|
|
Timeout.timeout(2) do # Reduced to 2 seconds |
|
|
worker.wait |
|
|
end |
|
|
rescue Timeout::Error |
|
|
puts "Worker #{idx} timed out, force stopping" |
|
|
begin |
|
|
worker.stop |
|
|
rescue |
|
|
nil |
|
|
end |
|
|
end |
|
|
|
|
|
# Ensure all async tasks are completed |
|
|
if @active_jobs.value > 0 |
|
|
puts "Waiting for #{@active_jobs.value} active jobs to complete..." |
|
|
start_time = Time.now |
|
|
while @active_jobs.value > 0 && (Time.now - start_time) < 2 # Reduced to 2 seconds |
|
|
sleep(0.1) |
|
|
end |
|
|
if @active_jobs.value > 0 |
|
|
puts "Forcibly stopping #{@active_jobs.value} remaining jobs" |
|
|
# Note: We can't force reset atomic counter, jobs should clean up |
|
|
end |
|
|
end |
|
|
ensure |
|
|
save_state if @state_container |
|
|
|
|
|
# Force close the queue and drain any remaining items |
|
|
begin |
|
|
@url_queue&.close |
|
|
while @url_queue&.pop(timeout: 0.1); end |
|
|
rescue |
|
|
# Ignore errors during cleanup |
|
|
end |
|
|
|
|
|
@browser&.quit |
|
|
stop # Transition to stopped state |
|
|
end |
|
|
|
|
|
@results |
|
|
end |
|
|
end |
|
|
|
|
|
private |
|
|
|
|
|
# Orchestrator - monitors work and decides when to stop |
|
|
def orchestrate_work(&block) |
|
|
loop do |
|
|
sleep(0.1) # Check every 100ms for faster response |
|
|
|
|
|
# Check if we should stop |
|
|
if should_stop_all_work? |
|
|
puts "Orchestrator: Stopping all work" |
|
|
@work_available = false |
|
|
|
|
|
# Clear the queue to prevent workers from picking up more work |
|
|
begin |
|
|
while @url_queue.any? |
|
|
@url_queue.pop |
|
|
end |
|
|
rescue |
|
|
# Queue might be closed |
|
|
end |
|
|
|
|
|
break |
|
|
end |
|
|
end |
|
|
end |
|
|
|
|
|
# Check if all work is complete |
|
|
def should_stop_all_work? |
|
|
# Stop if we've hit page limit and no active jobs |
|
|
if max_pages_reached? |
|
|
return @active_jobs.value == 0 |
|
|
end |
|
|
|
|
|
# Stop if all pages are processed and no active jobs |
|
|
return true if @active_jobs.value == 0 && @url_queue.empty? && should_close_queue? |
|
|
|
|
|
# Otherwise keep going |
|
|
false |
|
|
rescue |
|
|
true # Stop on any error |
|
|
end |
|
|
|
|
|
# Simplified worker loop |
|
|
def worker_loop(worker_id, &block) |
|
|
while @work_available |
|
|
# Check page limit before getting URL |
|
|
if max_pages_reached? |
|
|
puts "Worker #{worker_id}: Page limit reached, stopping" |
|
|
break |
|
|
end |
|
|
|
|
|
url = @url_queue.pop |
|
|
break if url.nil? || url == @stop_signal # Stop signal |
|
|
|
|
|
# Skip if already completed |
|
|
if @completed_urls.include?(url) |
|
|
puts "Worker #{worker_id}: Skipping #{url} (already completed)" |
|
|
next |
|
|
end |
|
|
|
|
|
# Try to claim this URL for processing - atomic operation |
|
|
unless @claimed_urls.add?(url) |
|
|
puts "Worker #{worker_id}: Another worker claimed #{url}" |
|
|
next |
|
|
end |
|
|
|
|
|
# Double-check page limit before processing |
|
|
if max_pages_reached? |
|
|
puts "Worker #{worker_id}: Page limit reached after claiming URL" |
|
|
break |
|
|
end |
|
|
|
|
|
# Get or create page object |
|
|
page = @pages[url] ||= ScrapedPage.new(url) |
|
|
next unless page.ready_for_processing? |
|
|
|
|
|
# Process the page with semaphore |
|
|
@semaphore.async do |
|
|
@active_jobs.increment |
|
|
begin |
|
|
# Final check inside semaphore |
|
|
if !max_pages_reached? |
|
|
scrape_page(page, worker_id, &block) |
|
|
else |
|
|
puts "Worker #{worker_id}: Skipping page due to limit" |
|
|
end |
|
|
ensure |
|
|
@active_jobs.decrement |
|
|
end |
|
|
end |
|
|
end |
|
|
rescue => e |
|
|
puts "Worker #{worker_id} error: #{e.message}" |
|
|
end |
|
|
|
|
|
def scrape_page(page_obj, worker_id, &block) |
|
|
url = page_obj.url |
|
|
|
|
|
# Transition page to processing state |
|
|
page_obj.start_processing |
|
|
# Note: URL already added to visited_urls in worker_loop |
|
|
|
|
|
# Use circuit breaker for page scraping |
|
|
result = circuit(:page_scraping).wrap do |
|
|
browser_page = @browser.create_page |
|
|
|
|
|
browser_page.go_to(url) |
|
|
# Wait for page to load |
|
|
browser_page.network.wait_for_idle |
|
|
|
|
|
# Scroll to bottom to trigger lazy loading |
|
|
browser_page.execute("window.scrollTo(0, document.body.scrollHeight)") |
|
|
sleep(0.5) # Brief pause for content to load |
|
|
|
|
|
# Scroll back to top for consistency |
|
|
browser_page.execute("window.scrollTo(0, 0)") |
|
|
sleep(0.2) # Brief pause |
|
|
|
|
|
# Get page HTML |
|
|
html = browser_page.body |
|
|
|
|
|
# Check content size limit |
|
|
if html.bytesize > @max_content_size |
|
|
raise StandardError.new("Content size exceeds limit: #{html.bytesize} bytes > #{@max_content_size} bytes") |
|
|
end |
|
|
|
|
|
# Record success and transition state |
|
|
page_obj.record_success(html) |
|
|
page_obj.complete |
|
|
|
|
|
# Move from claimed to completed |
|
|
@claimed_urls.delete(url) |
|
|
@completed_urls.add(url) |
|
|
|
|
|
# Increment pages scraped counter |
|
|
@pages_scraped.increment |
|
|
|
|
|
# Create result |
|
|
result_data = { |
|
|
url: url, |
|
|
html: html, |
|
|
scraped_at: page_obj.scraped_at |
|
|
} |
|
|
|
|
|
# Yield to block if provided (for Rails integration) |
|
|
yield(result_data) if block_given? |
|
|
@results << result_data |
|
|
|
|
|
puts "Worker #{worker_id}: Scraped #{url} (#{@pages_scraped.value}#{@max_pages ? "/#{@max_pages}" : ""}) [#{page_obj.state}]" |
|
|
|
|
|
# If spider mode and haven't hit limit, find and queue new URLs |
|
|
if @mode == :spider && !max_pages_reached? |
|
|
extract_and_queue_links(html, url) |
|
|
end |
|
|
|
|
|
result_data |
|
|
ensure |
|
|
browser_page&.close |
|
|
end |
|
|
|
|
|
# Handle circuit breaker result |
|
|
if result.nil? |
|
|
# Circuit breaker is open or call failed |
|
|
page_obj.record_error(StandardError.new("Circuit breaker open or request failed")) |
|
|
page_obj.mark_failed |
|
|
puts "Failed to scrape #{url}: Circuit breaker open or error occurred [#{page_obj.state}]" |
|
|
|
|
|
# Check if page can be retried |
|
|
if page_obj.failed? && page_obj.can_retry? |
|
|
page_obj.retry_page |
|
|
# Release the claim so it can be retried |
|
|
@claimed_urls.delete(url) |
|
|
@url_queue.push(url) # Re-queue for retry |
|
|
puts "Re-queued #{url} for retry (attempt #{page_obj.retry_count + 1}) [#{page_obj.state}]" |
|
|
elsif page_obj.retry_count >= 3 |
|
|
page_obj.give_up |
|
|
# Move from claimed to completed (permanently failed) |
|
|
@claimed_urls.delete(url) |
|
|
@completed_urls.add(url) |
|
|
puts "Giving up on #{url} after #{page_obj.retry_count} attempts [#{page_obj.state}]" |
|
|
end |
|
|
end |
|
|
|
|
|
# Save state periodically |
|
|
save_state if @state_container && @completed_urls.size % 10 == 0 |
|
|
rescue => e |
|
|
# Handle unexpected errors |
|
|
page_obj.record_error(e) |
|
|
page_obj.mark_failed |
|
|
puts "Unexpected error scraping #{url}: #{e.message} [#{page_obj.state}]" |
|
|
|
|
|
# Retry logic for unexpected errors |
|
|
if page_obj.can_retry? |
|
|
page_obj.retry_page |
|
|
# Release the claim so it can be retried |
|
|
@claimed_urls.delete(url) |
|
|
@url_queue.push(url) |
|
|
puts "Re-queued #{url} for retry after error [#{page_obj.state}]" |
|
|
else |
|
|
page_obj.give_up |
|
|
# Move from claimed to completed (permanently failed) |
|
|
@claimed_urls.delete(url) |
|
|
@completed_urls.add(url) |
|
|
puts "Giving up on #{url} after errors [#{page_obj.state}]" |
|
|
end |
|
|
end |
|
|
|
|
|
def extract_and_queue_links(html, current_url) |
|
|
current_uri = URI.parse(current_url) |
|
|
|
|
|
# Use Nokogiri for better performance and accuracy |
|
|
doc = Nokogiri::HTML(html) |
|
|
links = doc.css("a[href]").map { |anchor| anchor["href"] }.compact |
|
|
|
|
|
links.each do |href| |
|
|
next if href.empty? || href.start_with?("#", "javascript:", "mailto:", "tel:", "ftp:") |
|
|
|
|
|
# Convert to absolute URL |
|
|
absolute_url = URI.join(current_url, href).to_s |
|
|
|
|
|
# Remove fragment identifier (#comment-XX, #section, etc.) - treat as same page |
|
|
clean_url = absolute_url.split("#").first |
|
|
|
|
|
# Check if URL is within the same host as current page |
|
|
uri = URI.parse(clean_url) |
|
|
next unless uri.host == current_uri.host |
|
|
|
|
|
# Skip asset files (CSS, JS, images, fonts, etc.) |
|
|
path = uri.path.downcase |
|
|
next if path.match?(/\.(css|js|png|jpg|jpeg|gif|svg|ico|pdf|zip|woff|woff2|ttf|eot|mp4|mp3|avi|mov)(\?|$)/) |
|
|
|
|
|
# Add to discovered set and queue only if it's a new URL |
|
|
if @discovered_urls.add?(clean_url) # Only add to queue if newly discovered |
|
|
@url_queue.push(clean_url) |
|
|
end |
|
|
end |
|
|
rescue => e |
|
|
puts "Error extracting links from #{current_url}: #{e.message}" |
|
|
end |
|
|
|
|
|
def max_pages_reached? |
|
|
@max_pages && @pages_scraped.value >= @max_pages |
|
|
end |
|
|
|
|
|
def should_close_queue? |
|
|
# Check if there are any pages that can still be retried |
|
|
has_retryable_pages = @pages.values.any? { |page| page.ready_for_processing? } |
|
|
|
|
|
if @host |
|
|
# Spider mode: close when no more URLs can be discovered |
|
|
# This happens when all pages are either completed or permanently failed |
|
|
# and no pages are ready for retry |
|
|
all_pages_processed = @pages.values.all? { |page| page.completed? || page.permanently_failed? } |
|
|
all_pages_processed && !has_retryable_pages |
|
|
else |
|
|
# Constrained mode: close when all initial URLs have been processed |
|
|
all_initial_processed = @initial_urls.all? do |url| |
|
|
page = @pages[url] |
|
|
page && (page.completed? || page.permanently_failed?) |
|
|
end |
|
|
all_initial_processed && !has_retryable_pages |
|
|
end |
|
|
end |
|
|
|
|
|
def save_state |
|
|
return unless @state_container |
|
|
|
|
|
state = { |
|
|
host: @host, |
|
|
initial_urls: @initial_urls, |
|
|
completed_urls: @completed_urls.to_a, |
|
|
discovered_urls: @discovered_urls.to_a, |
|
|
max_concurrent: @max_concurrent, |
|
|
timeout: @timeout, |
|
|
max_pages: @max_pages, |
|
|
pages_scraped: @pages_scraped.value, |
|
|
scraper_state: self.state, |
|
|
pages: @pages.transform_values(&:to_state_h) |
|
|
} |
|
|
|
|
|
@state_container.write_state(state) |
|
|
end |
|
|
|
|
|
def load_state |
|
|
return unless @state_container |
|
|
|
|
|
state = @state_container.read_state |
|
|
return unless state |
|
|
|
|
|
@completed_urls = Concurrent::Set.new(state["completed_urls"] || []) |
|
|
# Backward compatibility: check for both new and old key names |
|
|
discovered_urls_data = state["discovered_urls"] || state["pending_urls"] || [] |
|
|
@discovered_urls = Concurrent::Set.new(discovered_urls_data) |
|
|
@pages_scraped = Concurrent::AtomicFixnum.new(state["pages_scraped"] || 0) |
|
|
|
|
|
# Restore page states |
|
|
if state["pages"] |
|
|
@pages = state["pages"].transform_values { |page_data| ScrapedPage.from_h(page_data) } |
|
|
end |
|
|
|
|
|
# Note: scraper state will be restored when run() is called |
|
|
|
|
|
completed_pages = @pages.values.count { |p| p.completed? } |
|
|
failed_pages = @pages.values.count { |p| p.permanently_failed? } |
|
|
retry_pages = @pages.values.count { |p| p.retrying? } |
|
|
|
|
|
puts "Resuming with #{@completed_urls.size} completed, #{@discovered_urls.size} discovered URLs, #{@pages_scraped.value} pages scraped" |
|
|
puts "Page states: #{completed_pages} completed, #{failed_pages} failed, #{retry_pages} retrying" |
|
|
end |
|
|
end |
|
|
|
|
|
class AsyncMarkdownScraper |
|
|
attr_reader :scraper |
|
|
|
|
|
def initialize(*urls, **scraper_options) |
|
|
@scraper = AsyncScraper.new(*urls, **scraper_options) |
|
|
end |
|
|
|
|
|
def run(&block) |
|
|
@scraper.run do |page_data| |
|
|
# Convert to markdown with frontmatter |
|
|
markdown_data = process_page(page_data) |
|
|
|
|
|
# Yield the processed data to the block |
|
|
yield(markdown_data) if block_given? |
|
|
end |
|
|
end |
|
|
|
|
|
private |
|
|
|
|
|
def process_page(page_data) |
|
|
html = page_data[:html] |
|
|
url = page_data[:url] |
|
|
|
|
|
puts "Processing: #{url} (#{html.length} chars)" |
|
|
|
|
|
# Extract frontmatter |
|
|
frontmatter = extract_meta_frontmatter(html, url) |
|
|
puts " → Extracted frontmatter" |
|
|
|
|
|
# Convert HTML to clean markdown |
|
|
markdown_content = html_to_clean_markdown(html) |
|
|
puts " → Converted to markdown (#{markdown_content.length} chars)" |
|
|
|
|
|
# Add markdown and frontmatter keys to the original page data |
|
|
page_data.merge(markdown: markdown_content, frontmatter: frontmatter) |
|
|
end |
|
|
|
|
|
# Helper method to extract meta information as YAML front matter |
|
|
def extract_meta_frontmatter(html, url) |
|
|
doc = Nokogiri::HTML(html) |
|
|
frontmatter = ["---"] |
|
|
|
|
|
# Extract title |
|
|
title = doc.at("title")&.text&.strip |
|
|
frontmatter << "title: \"#{title.gsub('"', '\\"')}\"" if title && !title.empty? |
|
|
|
|
|
# Extract meta description |
|
|
description = doc.at('meta[name="description"]')&.[]("content")&.strip |
|
|
frontmatter << "description: \"#{description.gsub('"', '\\"')}\"" if description && !description.empty? |
|
|
|
|
|
# Extract meta keywords |
|
|
keywords = doc.at('meta[name="keywords"]')&.[]("content")&.strip |
|
|
if keywords && !keywords.empty? |
|
|
keywords_array = keywords.split(",").map(&:strip).reject(&:empty?) |
|
|
frontmatter << "keywords:" |
|
|
keywords_array.each { |keyword| frontmatter << " - \"#{keyword.gsub('"', '\\"')}\"" } |
|
|
end |
|
|
|
|
|
# Extract author information |
|
|
author = doc.at('meta[name="author"]')&.[]("content")&.strip |
|
|
frontmatter << "author: \"#{author.gsub('"', '\\"')}\"" if author && !author.empty? |
|
|
|
|
|
# Extract language |
|
|
lang = doc.at("html")&.[]("lang") || doc.at('meta[http-equiv="content-language"]')&.[]("content") |
|
|
frontmatter << "language: \"#{lang}\"" if lang && !lang.empty? |
|
|
|
|
|
# Extract canonical URL |
|
|
canonical = doc.at('link[rel="canonical"]')&.[]("href")&.strip |
|
|
frontmatter << "canonical_url: \"#{canonical}\"" if canonical && !canonical.empty? |
|
|
|
|
|
# Extract robots meta |
|
|
robots = doc.at('meta[name="robots"]')&.[]("content")&.strip |
|
|
frontmatter << "robots: \"#{robots}\"" if robots && !robots.empty? |
|
|
|
|
|
# Add URL for reference |
|
|
frontmatter << "url: \"#{url}\"" |
|
|
|
|
|
# Add scraped timestamp |
|
|
frontmatter << "scraped_at: \"#{Time.now.iso8601}\"" |
|
|
|
|
|
frontmatter << "---" |
|
|
frontmatter << "" |
|
|
|
|
|
frontmatter.join("\n") |
|
|
end |
|
|
|
|
|
# Helper method to convert HTML to clean markdown |
|
|
def html_to_clean_markdown(html) |
|
|
doc = Nokogiri::HTML(html) |
|
|
|
|
|
# Remove UI and navigation elements first |
|
|
doc.css("script, style, svg, noscript, iframe, object, embed, applet, form, nav, footer").remove |
|
|
doc.css("[data-testid*='nav'], [data-testid*='footer']").remove |
|
|
doc.xpath("//comment()").remove |
|
|
|
|
|
# Use pandoc with HTML reader options to flatten nested divs and spans |
|
|
PandocRuby.convert( |
|
|
doc.to_html, |
|
|
:from => "html-native_divs-native_spans-empty_paragraphs", |
|
|
:to => "markdown-raw_html-auto_identifiers", |
|
|
"wrap" => "none" |
|
|
) |
|
|
end |
|
|
end |
|
|
|
|
|
# Example state containers |
|
|
class FileStateContainer |
|
|
def initialize(file_path) |
|
|
@file_path = file_path |
|
|
end |
|
|
|
|
|
def read_state |
|
|
return nil unless File.exist?(@file_path) |
|
|
JSON.parse(File.read(@file_path)) |
|
|
rescue JSON::ParserError |
|
|
nil |
|
|
end |
|
|
|
|
|
def write_state(state) |
|
|
File.write(@file_path, JSON.pretty_generate(state)) |
|
|
puts "State saved to #{@file_path}" |
|
|
end |
|
|
end |
|
|
|
|
|
class RedisStateContainer |
|
|
def initialize(redis_client, key) |
|
|
@redis = redis_client |
|
|
@key = key |
|
|
end |
|
|
|
|
|
def read_state |
|
|
json = @redis.get(@key) |
|
|
json ? JSON.parse(json) : nil |
|
|
rescue JSON::ParserError |
|
|
nil |
|
|
end |
|
|
|
|
|
def write_state(state) |
|
|
@redis.set(@key, JSON.generate(state)) |
|
|
puts "State saved to Redis key: #{@key}" |
|
|
end |
|
|
end |
|
|
|
|
|
class RailsModelStateContainer |
|
|
def initialize(model_instance) |
|
|
@model = model_instance |
|
|
end |
|
|
|
|
|
def read_state |
|
|
return nil unless @model.scraper_state.present? |
|
|
JSON.parse(@model.scraper_state) |
|
|
rescue JSON::ParserError |
|
|
nil |
|
|
end |
|
|
|
|
|
def write_state(state) |
|
|
@model.update!(scraper_state: JSON.generate(state)) |
|
|
puts "State saved to model ID: #{@model.id}" |
|
|
end |
|
|
end |
|
|
|
|
|
# Example usage demonstrating proper resource management: |
|
|
if __FILE__ == $0 |
|
|
|
|
|
# Helper method that scopes resources properly (like an application would) |
|
|
def run_scraping_example(name, &block) |
|
|
puts "=== #{name} ===" |
|
|
begin |
|
|
block.call |
|
|
puts "✓ #{name} completed successfully" |
|
|
rescue => e |
|
|
puts "✗ #{name} failed: #{e.message}" |
|
|
ensure |
|
|
# Force cleanup of any lingering resources |
|
|
ObjectSpace.each_object(Ferrum::Browser) { |browser| |
|
|
begin |
|
|
browser.quit |
|
|
rescue |
|
|
nil |
|
|
end |
|
|
} |
|
|
ObjectSpace.each_object(Async::Task) { |task| |
|
|
begin |
|
|
task.stop |
|
|
rescue |
|
|
nil |
|
|
end |
|
|
} |
|
|
GC.start |
|
|
sleep(0.1) # Brief pause for cleanup |
|
|
end |
|
|
puts |
|
|
end |
|
|
|
|
|
# Example 1: Basic scraping of specific URLs |
|
|
run_scraping_example("Basic Scraping Example") do |
|
|
results = [] |
|
|
|
|
|
AsyncScraper.new( |
|
|
"https://radioactive-labs.github.io/plutonium-core/", |
|
|
"https://radioactive-labs.github.io/plutonium-core/guide/", |
|
|
max_concurrent: 2, |
|
|
max_pages: 2 |
|
|
).run do |page_data| |
|
|
results << page_data |
|
|
puts "Scraped: #{page_data[:url]} (#{page_data[:html].length} chars)" |
|
|
end |
|
|
|
|
|
puts "Processed #{results.size} pages" |
|
|
end |
|
|
|
|
|
# Example 2: Spider mode with markdown conversion using AsyncMarkdownScraper |
|
|
run_scraping_example("Spider Mode with AsyncMarkdownScraper Example") do |
|
|
state_container = FileStateContainer.new("scraper_state.json") |
|
|
output_folder = "scraped_pages" |
|
|
|
|
|
# Create output folder |
|
|
FileUtils.mkdir_p(output_folder) |
|
|
|
|
|
# Helper method to generate filename from URL |
|
|
def generate_filename(url) |
|
|
uri = URI.parse(url) |
|
|
|
|
|
# Build filename from URL components |
|
|
parts = [] |
|
|
parts << uri.host.gsub(/[^a-z0-9\-_]/i, "-") if uri.host |
|
|
|
|
|
if uri.path && uri.path != "/" |
|
|
path_part = uri.path.gsub(/[^a-z0-9\-_]/i, "-").squeeze("-") |
|
|
parts << path_part unless path_part.empty? |
|
|
else |
|
|
parts << "index" |
|
|
end |
|
|
|
|
|
filename = parts.join("-").gsub(/^-|-$/, "") |
|
|
filename = "page" if filename.empty? |
|
|
"#{filename}.md" |
|
|
end |
|
|
|
|
|
AsyncMarkdownScraper.new( |
|
|
"https://radioactive-labs.github.io/plutonium-core/", |
|
|
mode: :spider, |
|
|
max_concurrent: 3, |
|
|
max_pages: 5, |
|
|
state_container: state_container, |
|
|
max_content_size: 5_000_000 |
|
|
).run do |page_data| |
|
|
# Combine frontmatter and markdown |
|
|
content = page_data[:frontmatter] + page_data[:markdown] |
|
|
|
|
|
# Generate filename and save |
|
|
filename = generate_filename(page_data[:url]) |
|
|
file_path = File.join(output_folder, filename) |
|
|
File.write(file_path, content) |
|
|
|
|
|
puts "Saved: #{page_data[:url]} → #{filename}" |
|
|
end |
|
|
|
|
|
puts "All pages saved to: #{output_folder}/" |
|
|
end |
|
|
|
|
|
# Example 3: Resumable scraping with state persistence |
|
|
run_scraping_example("Resumable Scraping Example") do |
|
|
state_container = FileStateContainer.new("resumable_scraper_state.json") |
|
|
|
|
|
AsyncScraper.new( |
|
|
"https://radioactive-labs.github.io/plutonium-core/", |
|
|
mode: :spider, |
|
|
max_concurrent: 2, |
|
|
max_pages: 3, |
|
|
state_container: state_container |
|
|
).run do |page_data| |
|
|
puts "Processed: #{page_data[:url]}" |
|
|
end |
|
|
|
|
|
puts "State saved - can resume later by running the same command" |
|
|
end |
|
|
|
|
|
puts "🎉 All examples completed!" |
|
|
puts "💡 Notice: Each scraper was scoped to its own block" |
|
|
puts "🔄 Resources automatically cleaned up between examples" |
|
|
|
|
|
# Clean exit (like application shutdown) |
|
|
exit(0) |
|
|
end |