Skip to content

Instantly share code, notes, and snippets.

@julik
Forked from thedumbtechguy/async_scraper.rb
Created August 1, 2025 11:13
Show Gist options
  • Select an option

  • Save julik/0439916bc8bc440bd5139a7dacdac2a8 to your computer and use it in GitHub Desktop.

Select an option

Save julik/0439916bc8bc440bd5139a7dacdac2a8 to your computer and use it in GitHub Desktop.

Revisions

  1. @thedumbtechguy thedumbtechguy created this gist Jul 31, 2025.
    1,006 changes: 1,006 additions & 0 deletions async_scraper.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,1006 @@
    require "bundler/inline"

    gemfile do
    source "https://rubygems.org"
    gem "async"
    gem "ferrum"
    gem "state_machines"
    gem "breaker_machines"
    gem "concurrent-ruby"
    gem "nokogiri"
    gem "pandoc-ruby"
    end

    require "async"
    require "async/queue"
    require "async/semaphore"
    require "ferrum"
    require "uri"
    require "json"
    require "timeout"
    require "state_machines"
    require "breaker_machines"
    # Use concurrent-ruby for thread-safe data structures and atomic operations
    # Even though Async uses cooperative concurrency, this provides defensive programming
    # and future-proofing in case threading is added later
    require "concurrent"
    require "fileutils"
    require "digest"
    require "nokogiri"
    require "pandoc-ruby"

    class ScrapedPage
    attr_accessor :url, :html, :scraped_at, :error_message, :retry_count, :last_error_at

    def initialize(url)
    @url = url
    @html = nil
    @scraped_at = nil
    @error_message = nil
    @retry_count = 0
    @last_error_at = nil
    # Initialize state machine manually
    self.state = "pending"
    end

    state_machine :state, initial: :pending do
    event :start_processing do
    transition pending: :processing, retrying: :processing
    end

    event :complete do
    transition processing: :completed
    end

    event :mark_failed do
    transition processing: :failed
    end

    event :retry_page do
    transition failed: :retrying
    end

    event :give_up do
    transition [:failed, :retrying] => :permanently_failed
    end

    state :pending do
    def ready_for_processing?
    true
    end
    end

    state :processing do
    def ready_for_processing?
    false
    end
    end

    state :completed do
    def ready_for_processing?
    false
    end

    def success?
    true
    end
    end

    state :failed do
    def ready_for_processing?
    can_retry?
    end

    def can_retry?
    retry_count < 3 && (last_error_at.nil? || Time.now - last_error_at > backoff_delay)
    end

    def backoff_delay
    # Exponential backoff: 30s, 60s, 120s
    30 * (2**retry_count)
    end
    end

    state :retrying do
    def ready_for_processing?
    true
    end
    end

    state :permanently_failed do
    def ready_for_processing?
    false
    end

    def success?
    false
    end
    end
    end

    def record_error(error)
    @error_message = error.message
    @last_error_at = Time.now
    @retry_count += 1
    end

    def record_success(html)
    @html = html
    @scraped_at = Time.now
    @error_message = nil
    end

    def to_h
    {
    url: @url,
    html: @html,
    scraped_at: @scraped_at,
    state: state,
    error_message: @error_message,
    retry_count: @retry_count,
    last_error_at: @last_error_at
    }
    end

    def to_state_h
    {
    url: @url,
    scraped_at: @scraped_at,
    state: state,
    error_message: @error_message,
    retry_count: @retry_count,
    last_error_at: @last_error_at
    }
    end

    def self.from_h(data)
    page = new(data["url"])
    page.html = data["html"]
    page.scraped_at = data["scraped_at"]
    page.error_message = data["error_message"]
    page.retry_count = data["retry_count"] || 0
    page.last_error_at = data["last_error_at"]

    # Restore state
    target_state = data["state"]&.to_sym || :pending
    case target_state
    when :processing then page.start_processing
    when :completed then page.start_processing && page.complete
    when :failed then page.start_processing && page.mark_failed
    when :retrying then page.start_processing && page.mark_failed && page.retry_page
    when :permanently_failed then page.start_processing && page.mark_failed && page.give_up
    end

    page
    end
    end

    class AsyncScraper
    include BreakerMachines::DSL

    attr_reader :browser, :claimed_urls, :completed_urls, :url_queue, :results, :pages

    # Content size limit - 10MB default
    MAX_CONTENT_SIZE = 10_000_000

    def initialize(*urls, mode: :scrape, max_concurrent: 5, timeout: 10, state_container: nil, max_pages: nil, max_content_size: MAX_CONTENT_SIZE)
    raise ArgumentError, "urls must be provided" if !urls || urls.empty?
    raise ArgumentError, "mode must be :scrape or :spider" unless [:scrape, :spider].include?(mode)

    @initial_urls = urls
    @mode = mode
    @max_concurrent = max_concurrent
    @timeout = timeout
    @state_container = state_container
    @max_pages = max_pages
    @max_content_size = max_content_size

    @claimed_urls = Concurrent::Set.new # URLs currently being processed
    @completed_urls = Concurrent::Set.new # URLs that have been processed (success or permanent failure)
    @discovered_urls = Concurrent::Set.new # All URLs discovered during crawling
    @results = Concurrent::Array.new
    @pages = Concurrent::Hash.new # URL -> ScrapedPage mapping
    @semaphore = Async::Semaphore.new(max_concurrent)
    @pages_scraped = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for scraped pages
    @active_jobs = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for active jobs
    @work_available = true # Flag to signal workers when to stop

    # Initialize state machine manually
    self.state = "idle"

    load_state if @state_container
    end

    state_machine :state, initial: :idle do
    event :start do
    transition idle: :running, stopped: :running, error: :running
    end

    event :pause do
    transition running: :paused
    end

    event :resume do
    transition paused: :running
    end

    event :stop do
    transition [:running, :paused, :error] => :stopped
    end

    event :error_occurred do
    transition [:running, :paused] => :error
    end

    event :reset do
    transition any => :idle
    end

    state :idle do
    def can_start?
    true
    end
    end

    state :running do
    def active?
    true
    end

    def can_process_pages?
    true
    end
    end

    state :paused do
    def active?
    true
    end

    def can_process_pages?
    false
    end
    end

    state :stopped do
    def active?
    false
    end

    def can_process_pages?
    false
    end
    end

    state :error do
    def active?
    false
    end

    def can_process_pages?
    false
    end
    end
    end

    # Define circuit breaker for page scraping
    circuit :page_scraping do
    threshold failures: 5, within: 1.minute # Trip after 5 failures in 1 minute
    reset_after 10.seconds # Wait 10 seconds before trying again
    fallback { nil } # Return nil when circuit is open

    # Additional configuration for robustness
    on_open { |circuit| puts "Circuit breaker opened for page scraping - too many failures" }
    on_close { |circuit| puts "Circuit breaker closed for page scraping - service recovered" }
    on_half_open { |circuit| puts "Circuit breaker half-open for page scraping - testing recovery" }
    end

    def run(&block)
    start # Transition to running state

    Sync(annotation: "AsyncScraper#run") do |task|
    # Start browser
    @browser = Ferrum::Browser.new(
    headless: true,
    timeout: @timeout,
    pending_connection_errors: false
    )

    begin
    @url_queue = Async::Queue.new

    # Add initial URLs to queue if not resuming
    if @discovered_urls.empty?
    @initial_urls.each { |url| @discovered_urls.add(url) }
    end

    # Count URLs to process
    urls_to_process = @discovered_urls.reject { |url| @completed_urls.include?(url) }
    retryable_pages = @pages.values.select { |page| page.ready_for_processing? }

    # Push URLs to queue
    urls_to_process.each { |url| @url_queue.push(url) }
    retryable_pages.each do |page|
    @url_queue.push(page.url)
    @discovered_urls.add(page.url)
    end

    # If no work to do, complete immediately
    total_queued = urls_to_process.size + retryable_pages.size
    if total_queued == 0
    puts "No URLs to process - all work completed"
    return @results
    end

    # Start orchestrator in background
    orchestrator = Async do
    orchestrate_work(&block)
    end

    # Start worker pool using simplified worker loop
    workers = @max_concurrent.times.map do |worker_id|
    Async do
    worker_loop(worker_id, &block)
    end
    end

    # Wait for orchestrator to signal completion with timeout
    begin
    orchestrator.wait
    rescue => e
    puts "Orchestrator error: #{e.message}"
    end

    # Ensure work is stopped
    @work_available = false

    # Send stop signals to all workers
    @max_concurrent.times {
    begin
    @url_queue.push(nil)
    rescue
    # Queue might be closed
    end
    }

    # Wait for all workers to complete with individual timeouts
    workers.each_with_index do |worker, idx|
    Timeout.timeout(2) do # Reduced to 2 seconds
    worker.wait
    end
    rescue Timeout::Error
    puts "Worker #{idx} timed out, force stopping"
    begin
    worker.stop
    rescue
    nil
    end
    end

    # Ensure all async tasks are completed
    if @active_jobs.value > 0
    puts "Waiting for #{@active_jobs.value} active jobs to complete..."
    start_time = Time.now
    while @active_jobs.value > 0 && (Time.now - start_time) < 2 # Reduced to 2 seconds
    sleep(0.1)
    end
    if @active_jobs.value > 0
    puts "Forcibly stopping #{@active_jobs.value} remaining jobs"
    # Note: We can't force reset atomic counter, jobs should clean up
    end
    end
    ensure
    save_state if @state_container

    # Force close the queue and drain any remaining items
    begin
    @url_queue&.close
    while @url_queue&.pop(timeout: 0.1); end
    rescue
    # Ignore errors during cleanup
    end

    @browser&.quit
    stop # Transition to stopped state
    end

    @results
    end
    end

    private

    # Orchestrator - monitors work and decides when to stop
    def orchestrate_work(&block)
    loop do
    sleep(0.1) # Check every 100ms for faster response

    # Check if we should stop
    if should_stop_all_work?
    puts "Orchestrator: Stopping all work"
    @work_available = false

    # Clear the queue to prevent workers from picking up more work
    begin
    while @url_queue.any?
    @url_queue.pop
    end
    rescue
    # Queue might be closed
    end

    break
    end
    end
    end

    # Check if all work is complete
    def should_stop_all_work?
    # Stop if we've hit page limit and no active jobs
    if max_pages_reached?
    return @active_jobs.value == 0
    end

    # Stop if all pages are processed and no active jobs
    return true if @active_jobs.value == 0 && @url_queue.empty? && should_close_queue?

    # Otherwise keep going
    false
    rescue
    true # Stop on any error
    end

    # Simplified worker loop
    def worker_loop(worker_id, &block)
    while @work_available
    # Check page limit before getting URL
    if max_pages_reached?
    puts "Worker #{worker_id}: Page limit reached, stopping"
    break
    end

    url = @url_queue.pop
    break if url.nil? || url == @stop_signal # Stop signal

    # Skip if already completed
    if @completed_urls.include?(url)
    puts "Worker #{worker_id}: Skipping #{url} (already completed)"
    next
    end

    # Try to claim this URL for processing - atomic operation
    unless @claimed_urls.add?(url)
    puts "Worker #{worker_id}: Another worker claimed #{url}"
    next
    end

    # Double-check page limit before processing
    if max_pages_reached?
    puts "Worker #{worker_id}: Page limit reached after claiming URL"
    break
    end

    # Get or create page object
    page = @pages[url] ||= ScrapedPage.new(url)
    next unless page.ready_for_processing?

    # Process the page with semaphore
    @semaphore.async do
    @active_jobs.increment
    begin
    # Final check inside semaphore
    if !max_pages_reached?
    scrape_page(page, worker_id, &block)
    else
    puts "Worker #{worker_id}: Skipping page due to limit"
    end
    ensure
    @active_jobs.decrement
    end
    end
    end
    rescue => e
    puts "Worker #{worker_id} error: #{e.message}"
    end

    def scrape_page(page_obj, worker_id, &block)
    url = page_obj.url

    # Transition page to processing state
    page_obj.start_processing
    # Note: URL already added to visited_urls in worker_loop

    # Use circuit breaker for page scraping
    result = circuit(:page_scraping).wrap do
    browser_page = @browser.create_page

    browser_page.go_to(url)
    # Wait for page to load
    browser_page.network.wait_for_idle

    # Scroll to bottom to trigger lazy loading
    browser_page.execute("window.scrollTo(0, document.body.scrollHeight)")
    sleep(0.5) # Brief pause for content to load

    # Scroll back to top for consistency
    browser_page.execute("window.scrollTo(0, 0)")
    sleep(0.2) # Brief pause

    # Get page HTML
    html = browser_page.body

    # Check content size limit
    if html.bytesize > @max_content_size
    raise StandardError.new("Content size exceeds limit: #{html.bytesize} bytes > #{@max_content_size} bytes")
    end

    # Record success and transition state
    page_obj.record_success(html)
    page_obj.complete

    # Move from claimed to completed
    @claimed_urls.delete(url)
    @completed_urls.add(url)

    # Increment pages scraped counter
    @pages_scraped.increment

    # Create result
    result_data = {
    url: url,
    html: html,
    scraped_at: page_obj.scraped_at
    }

    # Yield to block if provided (for Rails integration)
    yield(result_data) if block_given?
    @results << result_data

    puts "Worker #{worker_id}: Scraped #{url} (#{@pages_scraped.value}#{@max_pages ? "/#{@max_pages}" : ""}) [#{page_obj.state}]"

    # If spider mode and haven't hit limit, find and queue new URLs
    if @mode == :spider && !max_pages_reached?
    extract_and_queue_links(html, url)
    end

    result_data
    ensure
    browser_page&.close
    end

    # Handle circuit breaker result
    if result.nil?
    # Circuit breaker is open or call failed
    page_obj.record_error(StandardError.new("Circuit breaker open or request failed"))
    page_obj.mark_failed
    puts "Failed to scrape #{url}: Circuit breaker open or error occurred [#{page_obj.state}]"

    # Check if page can be retried
    if page_obj.failed? && page_obj.can_retry?
    page_obj.retry_page
    # Release the claim so it can be retried
    @claimed_urls.delete(url)
    @url_queue.push(url) # Re-queue for retry
    puts "Re-queued #{url} for retry (attempt #{page_obj.retry_count + 1}) [#{page_obj.state}]"
    elsif page_obj.retry_count >= 3
    page_obj.give_up
    # Move from claimed to completed (permanently failed)
    @claimed_urls.delete(url)
    @completed_urls.add(url)
    puts "Giving up on #{url} after #{page_obj.retry_count} attempts [#{page_obj.state}]"
    end
    end

    # Save state periodically
    save_state if @state_container && @completed_urls.size % 10 == 0
    rescue => e
    # Handle unexpected errors
    page_obj.record_error(e)
    page_obj.mark_failed
    puts "Unexpected error scraping #{url}: #{e.message} [#{page_obj.state}]"

    # Retry logic for unexpected errors
    if page_obj.can_retry?
    page_obj.retry_page
    # Release the claim so it can be retried
    @claimed_urls.delete(url)
    @url_queue.push(url)
    puts "Re-queued #{url} for retry after error [#{page_obj.state}]"
    else
    page_obj.give_up
    # Move from claimed to completed (permanently failed)
    @claimed_urls.delete(url)
    @completed_urls.add(url)
    puts "Giving up on #{url} after errors [#{page_obj.state}]"
    end
    end

    def extract_and_queue_links(html, current_url)
    current_uri = URI.parse(current_url)

    # Use Nokogiri for better performance and accuracy
    doc = Nokogiri::HTML(html)
    links = doc.css("a[href]").map { |anchor| anchor["href"] }.compact

    links.each do |href|
    next if href.empty? || href.start_with?("#", "javascript:", "mailto:", "tel:", "ftp:")

    # Convert to absolute URL
    absolute_url = URI.join(current_url, href).to_s

    # Remove fragment identifier (#comment-XX, #section, etc.) - treat as same page
    clean_url = absolute_url.split("#").first

    # Check if URL is within the same host as current page
    uri = URI.parse(clean_url)
    next unless uri.host == current_uri.host

    # Skip asset files (CSS, JS, images, fonts, etc.)
    path = uri.path.downcase
    next if path.match?(/\.(css|js|png|jpg|jpeg|gif|svg|ico|pdf|zip|woff|woff2|ttf|eot|mp4|mp3|avi|mov)(\?|$)/)

    # Add to discovered set and queue only if it's a new URL
    if @discovered_urls.add?(clean_url) # Only add to queue if newly discovered
    @url_queue.push(clean_url)
    end
    end
    rescue => e
    puts "Error extracting links from #{current_url}: #{e.message}"
    end

    def max_pages_reached?
    @max_pages && @pages_scraped.value >= @max_pages
    end

    def should_close_queue?
    # Check if there are any pages that can still be retried
    has_retryable_pages = @pages.values.any? { |page| page.ready_for_processing? }

    if @host
    # Spider mode: close when no more URLs can be discovered
    # This happens when all pages are either completed or permanently failed
    # and no pages are ready for retry
    all_pages_processed = @pages.values.all? { |page| page.completed? || page.permanently_failed? }
    all_pages_processed && !has_retryable_pages
    else
    # Constrained mode: close when all initial URLs have been processed
    all_initial_processed = @initial_urls.all? do |url|
    page = @pages[url]
    page && (page.completed? || page.permanently_failed?)
    end
    all_initial_processed && !has_retryable_pages
    end
    end

    def save_state
    return unless @state_container

    state = {
    host: @host,
    initial_urls: @initial_urls,
    completed_urls: @completed_urls.to_a,
    discovered_urls: @discovered_urls.to_a,
    max_concurrent: @max_concurrent,
    timeout: @timeout,
    max_pages: @max_pages,
    pages_scraped: @pages_scraped.value,
    scraper_state: self.state,
    pages: @pages.transform_values(&:to_state_h)
    }

    @state_container.write_state(state)
    end

    def load_state
    return unless @state_container

    state = @state_container.read_state
    return unless state

    @completed_urls = Concurrent::Set.new(state["completed_urls"] || [])
    # Backward compatibility: check for both new and old key names
    discovered_urls_data = state["discovered_urls"] || state["pending_urls"] || []
    @discovered_urls = Concurrent::Set.new(discovered_urls_data)
    @pages_scraped = Concurrent::AtomicFixnum.new(state["pages_scraped"] || 0)

    # Restore page states
    if state["pages"]
    @pages = state["pages"].transform_values { |page_data| ScrapedPage.from_h(page_data) }
    end

    # Note: scraper state will be restored when run() is called

    completed_pages = @pages.values.count { |p| p.completed? }
    failed_pages = @pages.values.count { |p| p.permanently_failed? }
    retry_pages = @pages.values.count { |p| p.retrying? }

    puts "Resuming with #{@completed_urls.size} completed, #{@discovered_urls.size} discovered URLs, #{@pages_scraped.value} pages scraped"
    puts "Page states: #{completed_pages} completed, #{failed_pages} failed, #{retry_pages} retrying"
    end
    end

    class AsyncMarkdownScraper
    attr_reader :scraper

    def initialize(*urls, **scraper_options)
    @scraper = AsyncScraper.new(*urls, **scraper_options)
    end

    def run(&block)
    @scraper.run do |page_data|
    # Convert to markdown with frontmatter
    markdown_data = process_page(page_data)

    # Yield the processed data to the block
    yield(markdown_data) if block_given?
    end
    end

    private

    def process_page(page_data)
    html = page_data[:html]
    url = page_data[:url]

    puts "Processing: #{url} (#{html.length} chars)"

    # Extract frontmatter
    frontmatter = extract_meta_frontmatter(html, url)
    puts " → Extracted frontmatter"

    # Convert HTML to clean markdown
    markdown_content = html_to_clean_markdown(html)
    puts " → Converted to markdown (#{markdown_content.length} chars)"

    # Add markdown and frontmatter keys to the original page data
    page_data.merge(markdown: markdown_content, frontmatter: frontmatter)
    end

    # Helper method to extract meta information as YAML front matter
    def extract_meta_frontmatter(html, url)
    doc = Nokogiri::HTML(html)
    frontmatter = ["---"]

    # Extract title
    title = doc.at("title")&.text&.strip
    frontmatter << "title: \"#{title.gsub('"', '\\"')}\"" if title && !title.empty?

    # Extract meta description
    description = doc.at('meta[name="description"]')&.[]("content")&.strip
    frontmatter << "description: \"#{description.gsub('"', '\\"')}\"" if description && !description.empty?

    # Extract meta keywords
    keywords = doc.at('meta[name="keywords"]')&.[]("content")&.strip
    if keywords && !keywords.empty?
    keywords_array = keywords.split(",").map(&:strip).reject(&:empty?)
    frontmatter << "keywords:"
    keywords_array.each { |keyword| frontmatter << " - \"#{keyword.gsub('"', '\\"')}\"" }
    end

    # Extract author information
    author = doc.at('meta[name="author"]')&.[]("content")&.strip
    frontmatter << "author: \"#{author.gsub('"', '\\"')}\"" if author && !author.empty?

    # Extract language
    lang = doc.at("html")&.[]("lang") || doc.at('meta[http-equiv="content-language"]')&.[]("content")
    frontmatter << "language: \"#{lang}\"" if lang && !lang.empty?

    # Extract canonical URL
    canonical = doc.at('link[rel="canonical"]')&.[]("href")&.strip
    frontmatter << "canonical_url: \"#{canonical}\"" if canonical && !canonical.empty?

    # Extract robots meta
    robots = doc.at('meta[name="robots"]')&.[]("content")&.strip
    frontmatter << "robots: \"#{robots}\"" if robots && !robots.empty?

    # Add URL for reference
    frontmatter << "url: \"#{url}\""

    # Add scraped timestamp
    frontmatter << "scraped_at: \"#{Time.now.iso8601}\""

    frontmatter << "---"
    frontmatter << ""

    frontmatter.join("\n")
    end

    # Helper method to convert HTML to clean markdown
    def html_to_clean_markdown(html)
    doc = Nokogiri::HTML(html)

    # Remove UI and navigation elements first
    doc.css("script, style, svg, noscript, iframe, object, embed, applet, form, nav, footer").remove
    doc.css("[data-testid*='nav'], [data-testid*='footer']").remove
    doc.xpath("//comment()").remove

    # Use pandoc with HTML reader options to flatten nested divs and spans
    PandocRuby.convert(
    doc.to_html,
    :from => "html-native_divs-native_spans-empty_paragraphs",
    :to => "markdown-raw_html-auto_identifiers",
    "wrap" => "none"
    )
    end
    end

    # Example state containers
    class FileStateContainer
    def initialize(file_path)
    @file_path = file_path
    end

    def read_state
    return nil unless File.exist?(@file_path)
    JSON.parse(File.read(@file_path))
    rescue JSON::ParserError
    nil
    end

    def write_state(state)
    File.write(@file_path, JSON.pretty_generate(state))
    puts "State saved to #{@file_path}"
    end
    end

    class RedisStateContainer
    def initialize(redis_client, key)
    @redis = redis_client
    @key = key
    end

    def read_state
    json = @redis.get(@key)
    json ? JSON.parse(json) : nil
    rescue JSON::ParserError
    nil
    end

    def write_state(state)
    @redis.set(@key, JSON.generate(state))
    puts "State saved to Redis key: #{@key}"
    end
    end

    class RailsModelStateContainer
    def initialize(model_instance)
    @model = model_instance
    end

    def read_state
    return nil unless @model.scraper_state.present?
    JSON.parse(@model.scraper_state)
    rescue JSON::ParserError
    nil
    end

    def write_state(state)
    @model.update!(scraper_state: JSON.generate(state))
    puts "State saved to model ID: #{@model.id}"
    end
    end

    # Example usage demonstrating proper resource management:
    if __FILE__ == $0

    # Helper method that scopes resources properly (like an application would)
    def run_scraping_example(name, &block)
    puts "=== #{name} ==="
    begin
    block.call
    puts "✓ #{name} completed successfully"
    rescue => e
    puts "✗ #{name} failed: #{e.message}"
    ensure
    # Force cleanup of any lingering resources
    ObjectSpace.each_object(Ferrum::Browser) { |browser|
    begin
    browser.quit
    rescue
    nil
    end
    }
    ObjectSpace.each_object(Async::Task) { |task|
    begin
    task.stop
    rescue
    nil
    end
    }
    GC.start
    sleep(0.1) # Brief pause for cleanup
    end
    puts
    end

    # Example 1: Basic scraping of specific URLs
    run_scraping_example("Basic Scraping Example") do
    results = []

    AsyncScraper.new(
    "https://radioactive-labs.github.io/plutonium-core/",
    "https://radioactive-labs.github.io/plutonium-core/guide/",
    max_concurrent: 2,
    max_pages: 2
    ).run do |page_data|
    results << page_data
    puts "Scraped: #{page_data[:url]} (#{page_data[:html].length} chars)"
    end

    puts "Processed #{results.size} pages"
    end

    # Example 2: Spider mode with markdown conversion using AsyncMarkdownScraper
    run_scraping_example("Spider Mode with AsyncMarkdownScraper Example") do
    state_container = FileStateContainer.new("scraper_state.json")
    output_folder = "scraped_pages"

    # Create output folder
    FileUtils.mkdir_p(output_folder)

    # Helper method to generate filename from URL
    def generate_filename(url)
    uri = URI.parse(url)

    # Build filename from URL components
    parts = []
    parts << uri.host.gsub(/[^a-z0-9\-_]/i, "-") if uri.host

    if uri.path && uri.path != "/"
    path_part = uri.path.gsub(/[^a-z0-9\-_]/i, "-").squeeze("-")
    parts << path_part unless path_part.empty?
    else
    parts << "index"
    end

    filename = parts.join("-").gsub(/^-|-$/, "")
    filename = "page" if filename.empty?
    "#{filename}.md"
    end

    AsyncMarkdownScraper.new(
    "https://radioactive-labs.github.io/plutonium-core/",
    mode: :spider,
    max_concurrent: 3,
    max_pages: 5,
    state_container: state_container,
    max_content_size: 5_000_000
    ).run do |page_data|
    # Combine frontmatter and markdown
    content = page_data[:frontmatter] + page_data[:markdown]

    # Generate filename and save
    filename = generate_filename(page_data[:url])
    file_path = File.join(output_folder, filename)
    File.write(file_path, content)

    puts "Saved: #{page_data[:url]}#{filename}"
    end

    puts "All pages saved to: #{output_folder}/"
    end

    # Example 3: Resumable scraping with state persistence
    run_scraping_example("Resumable Scraping Example") do
    state_container = FileStateContainer.new("resumable_scraper_state.json")

    AsyncScraper.new(
    "https://radioactive-labs.github.io/plutonium-core/",
    mode: :spider,
    max_concurrent: 2,
    max_pages: 3,
    state_container: state_container
    ).run do |page_data|
    puts "Processed: #{page_data[:url]}"
    end

    puts "State saved - can resume later by running the same command"
    end

    puts "🎉 All examples completed!"
    puts "💡 Notice: Each scraper was scoped to its own block"
    puts "🔄 Resources automatically cleaned up between examples"

    # Clean exit (like application shutdown)
    exit(0)
    end
    442 changes: 442 additions & 0 deletions async_scraper_test.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,442 @@
    require "bundler/inline"

    gemfile do
    source "https://rubygems.org"
    gem "minitest"
    gem "async"
    gem "ferrum"
    gem "state_machines"
    gem "breaker_machines"
    gem "concurrent-ruby"
    gem "nokogiri"
    gem "pandoc-ruby"
    gem "webmock"
    gem "timecop"
    end

    require "minitest/autorun"
    require "minitest/pride"
    require "webmock/minitest"
    require "timecop"
    require_relative "async_scraper"

    class ScrapedPageTest < Minitest::Test
    def setup
    @url = "https://example.com"
    @page = ScrapedPage.new(@url)
    end

    def test_initialize
    assert_equal @url, @page.url
    assert_nil @page.html
    assert_nil @page.scraped_at
    assert_nil @page.error_message
    assert_equal 0, @page.retry_count
    assert_nil @page.last_error_at
    assert_equal "pending", @page.state
    end

    def test_state_machine_starts_in_pending_state
    assert @page.pending?
    assert @page.ready_for_processing?
    end

    def test_transitions_from_pending_to_processing
    @page.start_processing
    assert @page.processing?
    refute @page.ready_for_processing?
    end

    def test_transitions_from_processing_to_completed
    @page.start_processing
    @page.complete
    assert @page.completed?
    assert @page.success?
    refute @page.ready_for_processing?
    end

    def test_transitions_from_processing_to_failed
    @page.start_processing
    @page.mark_failed
    assert @page.failed?
    assert @page.can_retry?
    assert @page.ready_for_processing?
    end

    def test_transitions_from_failed_to_retrying
    @page.start_processing
    @page.mark_failed
    @page.retry_page
    assert @page.retrying?
    assert @page.ready_for_processing?
    end

    def test_transitions_to_permanently_failed_after_giving_up
    @page.start_processing
    @page.mark_failed
    @page.give_up
    assert @page.permanently_failed?
    refute @page.success?
    refute @page.ready_for_processing?
    end

    def test_record_error
    error = StandardError.new("Test error")
    Timecop.freeze do
    @page.record_error(error)
    assert_equal "Test error", @page.error_message
    assert_equal Time.now, @page.last_error_at
    assert_equal 1, @page.retry_count
    end
    end

    def test_record_success
    html = "<html><body>Test</body></html>"
    Timecop.freeze do
    @page.record_success(html)
    assert_equal html, @page.html
    assert_equal Time.now, @page.scraped_at
    assert_nil @page.error_message
    end
    end

    def test_can_retry_when_count_less_than_3
    @page.start_processing
    @page.mark_failed
    assert @page.can_retry?
    end

    def test_prevents_retry_when_count_reaches_3
    @page.start_processing
    @page.mark_failed
    @page.instance_variable_set(:@retry_count, 3)
    refute @page.can_retry?
    end

    def test_respects_backoff_delay
    @page.start_processing
    @page.mark_failed

    Timecop.freeze do
    @page.record_error(StandardError.new("Error"))
    refute @page.can_retry?

    # After first error, retry_count is 1, so backoff is 30 * (2**1) = 60 seconds
    Timecop.travel(61) # Past the 60 second backoff
    assert @page.can_retry?
    end
    end

    def test_implements_exponential_backoff
    @page.start_processing
    @page.mark_failed

    assert_equal 30, @page.send(:backoff_delay)
    @page.instance_variable_set(:@retry_count, 1)
    assert_equal 60, @page.send(:backoff_delay)
    @page.instance_variable_set(:@retry_count, 2)
    assert_equal 120, @page.send(:backoff_delay)
    end

    def test_recreates_page_from_hash_data
    data = {
    "url" => @url,
    "html" => "<html>test</html>",
    "scraped_at" => Time.now,
    "state" => "completed",
    "error_message" => nil,
    "retry_count" => 0,
    "last_error_at" => nil
    }

    restored_page = ScrapedPage.from_h(data)
    assert_equal @url, restored_page.url
    assert_equal "<html>test</html>", restored_page.html
    assert restored_page.completed?
    end
    end

    class AsyncScraperTest < Minitest::Test
    def setup
    @urls = ["https://example.com", "https://example.com/page2"]
    @scraper = AsyncScraper.new(*@urls, max_concurrent: 2, timeout: 5)
    end

    def test_sets_configuration_correctly
    assert_equal @urls, @scraper.instance_variable_get(:@initial_urls)
    assert_equal 2, @scraper.instance_variable_get(:@max_concurrent)
    assert_equal 5, @scraper.instance_variable_get(:@timeout)
    assert_equal "idle", @scraper.state
    end

    def test_raises_error_for_empty_urls
    assert_raises(ArgumentError) { AsyncScraper.new }
    end

    def test_raises_error_for_invalid_mode
    assert_raises(ArgumentError) { AsyncScraper.new("https://example.com", mode: :invalid) }
    end

    def test_accepts_spider_mode
    spider_scraper = AsyncScraper.new("https://example.com", mode: :spider)
    assert_equal :spider, spider_scraper.instance_variable_get(:@mode)
    end

    def test_sets_default_values
    default_scraper = AsyncScraper.new("https://example.com")
    assert_equal :scrape, default_scraper.instance_variable_get(:@mode)
    assert_equal 5, default_scraper.instance_variable_get(:@max_concurrent)
    assert_equal 10, default_scraper.instance_variable_get(:@timeout)
    assert_equal AsyncScraper::MAX_CONTENT_SIZE, default_scraper.instance_variable_get(:@max_content_size)
    end

    def test_starts_in_idle_state
    assert @scraper.idle?
    assert @scraper.can_start?
    end

    def test_transitions_to_running_when_started
    @scraper.start
    assert @scraper.running?
    assert @scraper.active?
    assert @scraper.can_process_pages?
    end

    def test_can_pause_and_resume
    @scraper.start
    @scraper.pause
    assert @scraper.paused?
    assert @scraper.active?
    refute @scraper.can_process_pages?

    @scraper.resume
    assert @scraper.running?
    assert @scraper.can_process_pages?
    end

    def test_can_stop_from_any_active_state
    @scraper.start
    @scraper.stop
    assert @scraper.stopped?
    refute @scraper.active?
    refute @scraper.can_process_pages?
    end

    def test_handles_error_state
    @scraper.start
    @scraper.error_occurred
    assert @scraper.error?
    refute @scraper.active?
    refute @scraper.can_process_pages?
    end

    def test_can_reset_to_idle
    @scraper.start
    @scraper.stop
    @scraper.reset
    assert @scraper.idle?
    end

    def test_returns_false_when_no_max_pages_set
    refute @scraper.send(:max_pages_reached?)
    end

    def test_returns_true_when_max_pages_reached
    limited_scraper = AsyncScraper.new("https://example.com", max_pages: 2)
    limited_scraper.instance_variable_get(:@pages_scraped).increment
    limited_scraper.instance_variable_get(:@pages_scraped).increment
    assert limited_scraper.send(:max_pages_reached?)
    end

    def test_extracts_and_queues_same_host_links_only
    html = <<~HTML
    <html>
    <body>
    <a href="/page1">Page 1</a>
    <a href="https://example.com/page2">Page 2</a>
    <a href="https://other-site.com/page">Other Site</a>
    </body>
    </html>
    HTML

    current_url = "https://example.com"
    @scraper.instance_variable_set(:@url_queue, Async::Queue.new)

    @scraper.send(:extract_and_queue_links, html, current_url)

    discovered_urls = @scraper.instance_variable_get(:@discovered_urls)
    assert_includes discovered_urls, "https://example.com/page1"
    refute_includes discovered_urls, "https://other-site.com/page"
    end

    def test_filters_out_non_content_urls
    html = <<~HTML
    <html>
    <body>
    <a href="/style.css">CSS File</a>
    <a href="/image.jpg">Image</a>
    </body>
    </html>
    HTML

    current_url = "https://example.com"
    @scraper.instance_variable_set(:@url_queue, Async::Queue.new)

    @scraper.send(:extract_and_queue_links, html, current_url)

    discovered_urls = @scraper.instance_variable_get(:@discovered_urls)
    refute_includes discovered_urls, "https://example.com/style.css"
    refute_includes discovered_urls, "https://example.com/image.jpg"
    end

    def test_ignores_special_link_types
    html = <<~HTML
    <html>
    <body>
    <a href="#section">Section Link</a>
    <a href="javascript:void(0)">JS Link</a>
    <a href="mailto:[email protected]">Email</a>
    </body>
    </html>
    HTML

    current_url = "https://example.com"
    @scraper.instance_variable_set(:@url_queue, Async::Queue.new)

    @scraper.send(:extract_and_queue_links, html, current_url)

    discovered_urls = @scraper.instance_variable_get(:@discovered_urls)
    refute_includes discovered_urls, "#section"
    refute_includes discovered_urls, "javascript:void(0)"
    refute_includes discovered_urls, "mailto:[email protected]"
    end
    end

    class FileStateContainerTest < Minitest::Test
    def setup
    @file_path = "/tmp/test_scraper_state.json"
    @container = FileStateContainer.new(@file_path)
    @test_state = {"test" => "data", "number" => 42}
    end

    def teardown
    File.delete(@file_path) if File.exist?(@file_path)
    end

    def test_writes_and_reads_state_correctly
    @container.write_state(@test_state)
    assert File.exist?(@file_path)

    read_state = @container.read_state
    assert_equal @test_state, read_state
    end

    def test_returns_nil_when_file_doesnt_exist
    assert_nil @container.read_state
    end

    def test_handles_corrupted_json_gracefully
    File.write(@file_path, "invalid json")
    assert_nil @container.read_state
    end
    end

    class AsyncMarkdownScraperTest < Minitest::Test
    def setup
    @urls = ["https://example.com"]
    @markdown_scraper = AsyncMarkdownScraper.new(*@urls, max_concurrent: 1)
    end

    def test_creates_underlying_async_scraper
    assert_instance_of AsyncScraper, @markdown_scraper.scraper
    end

    def test_passes_options_to_async_scraper
    scraper = AsyncMarkdownScraper.new("https://example.com", mode: :spider, max_pages: 5)
    underlying_scraper = scraper.scraper
    assert_equal :spider, underlying_scraper.instance_variable_get(:@mode)
    assert_equal 5, underlying_scraper.instance_variable_get(:@max_pages)
    end

    def test_extracts_frontmatter_correctly
    html = <<~HTML
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <title>Test Page Title</title>
    <meta name="description" content="Test page description">
    <meta name="keywords" content="test, page, example">
    <meta name="author" content="Test Author">
    <meta name="robots" content="index, follow">
    <link rel="canonical" href="https://example.com/canonical">
    </head>
    <body>
    <h1>Content</h1>
    </body>
    </html>
    HTML

    url = "https://example.com/test"
    frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, html, url)

    assert_includes frontmatter, "title: \"Test Page Title\""
    assert_includes frontmatter, "description: \"Test page description\""
    assert_includes frontmatter, "- \"test\""
    assert_includes frontmatter, "- \"page\""
    assert_includes frontmatter, "- \"example\""
    assert_includes frontmatter, "author: \"Test Author\""
    assert_includes frontmatter, "language: \"en\""
    assert_includes frontmatter, "canonical_url: \"https://example.com/canonical\""
    assert_includes frontmatter, "robots: \"index, follow\""
    assert_includes frontmatter, "url: \"https://example.com/test\""
    assert_includes frontmatter, "scraped_at:"
    assert frontmatter.start_with?("---\n")
    assert_includes frontmatter, "\n---\n"
    end

    def test_handles_missing_meta_tags_gracefully
    minimal_html = "<html><head><title>Minimal</title></head><body></body></html>"
    url = "https://example.com/test"
    frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, minimal_html, url)

    assert_includes frontmatter, "title: \"Minimal\""
    assert_includes frontmatter, "url: \"https://example.com/test\""
    refute_includes frontmatter, "description:"
    refute_includes frontmatter, "keywords:"
    end

    def test_escapes_quotes_in_content
    html_with_quotes = "<html><head><title>Title with \"quotes\"</title></head><body></body></html>"
    url = "https://example.com/test"
    frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, html_with_quotes, url)

    assert_includes frontmatter, "title: \"Title with \\\"quotes\\\"\""
    end

    def test_removes_unwanted_elements_and_converts_to_markdown
    html = <<~HTML
    <html>
    <head>
    <title>Test</title>
    <script>alert("remove me");</script>
    <style>body { color: red; }</style>
    </head>
    <body>
    <nav>Navigation</nav>
    <h1>Main Title</h1>
    <p>Paragraph content</p>
    <footer>Footer content</footer>
    </body>
    </html>
    HTML

    markdown = @markdown_scraper.send(:html_to_clean_markdown, html)

    assert_includes markdown, "# Main Title"
    assert_includes markdown, "Paragraph content"
    refute_includes markdown, "alert(\"remove me\")"
    refute_includes markdown, "color: red"
    refute_includes markdown, "Navigation"
    refute_includes markdown, "Footer content"
    end
    end