require "bundler/inline" gemfile do source "https://rubygems.org" gem "async" gem "ferrum" gem "state_machines" gem "breaker_machines" gem "concurrent-ruby" gem "nokogiri" gem "pandoc-ruby" end require "async" require "async/queue" require "async/semaphore" require "ferrum" require "uri" require "json" require "timeout" require "state_machines" require "breaker_machines" # Use concurrent-ruby for thread-safe data structures and atomic operations # Even though Async uses cooperative concurrency, this provides defensive programming # and future-proofing in case threading is added later require "concurrent" require "fileutils" require "digest" require "nokogiri" require "pandoc-ruby" class ScrapedPage attr_accessor :url, :html, :scraped_at, :error_message, :retry_count, :last_error_at def initialize(url) @url = url @html = nil @scraped_at = nil @error_message = nil @retry_count = 0 @last_error_at = nil # Initialize state machine manually self.state = "pending" end state_machine :state, initial: :pending do event :start_processing do transition pending: :processing, retrying: :processing end event :complete do transition processing: :completed end event :mark_failed do transition processing: :failed end event :retry_page do transition failed: :retrying end event :give_up do transition [:failed, :retrying] => :permanently_failed end state :pending do def ready_for_processing? true end end state :processing do def ready_for_processing? false end end state :completed do def ready_for_processing? false end def success? true end end state :failed do def ready_for_processing? can_retry? end def can_retry? retry_count < 3 && (last_error_at.nil? || Time.now - last_error_at > backoff_delay) end def backoff_delay # Exponential backoff: 30s, 60s, 120s 30 * (2**retry_count) end end state :retrying do def ready_for_processing? true end end state :permanently_failed do def ready_for_processing? false end def success? false end end end def record_error(error) @error_message = error.message @last_error_at = Time.now @retry_count += 1 end def record_success(html) @html = html @scraped_at = Time.now @error_message = nil end def to_h { url: @url, html: @html, scraped_at: @scraped_at, state: state, error_message: @error_message, retry_count: @retry_count, last_error_at: @last_error_at } end def to_state_h { url: @url, scraped_at: @scraped_at, state: state, error_message: @error_message, retry_count: @retry_count, last_error_at: @last_error_at } end def self.from_h(data) page = new(data["url"]) page.html = data["html"] page.scraped_at = data["scraped_at"] page.error_message = data["error_message"] page.retry_count = data["retry_count"] || 0 page.last_error_at = data["last_error_at"] # Restore state target_state = data["state"]&.to_sym || :pending case target_state when :processing then page.start_processing when :completed then page.start_processing && page.complete when :failed then page.start_processing && page.mark_failed when :retrying then page.start_processing && page.mark_failed && page.retry_page when :permanently_failed then page.start_processing && page.mark_failed && page.give_up end page end end class AsyncScraper include BreakerMachines::DSL attr_reader :browser, :claimed_urls, :completed_urls, :url_queue, :results, :pages # Content size limit - 10MB default MAX_CONTENT_SIZE = 10_000_000 def initialize(*urls, mode: :scrape, max_concurrent: 5, timeout: 10, state_container: nil, max_pages: nil, max_content_size: MAX_CONTENT_SIZE) raise ArgumentError, "urls must be provided" if !urls || urls.empty? raise ArgumentError, "mode must be :scrape or :spider" unless [:scrape, :spider].include?(mode) @initial_urls = urls @mode = mode @max_concurrent = max_concurrent @timeout = timeout @state_container = state_container @max_pages = max_pages @max_content_size = max_content_size @claimed_urls = Concurrent::Set.new # URLs currently being processed @completed_urls = Concurrent::Set.new # URLs that have been processed (success or permanent failure) @discovered_urls = Concurrent::Set.new # All URLs discovered during crawling @results = Concurrent::Array.new @pages = Concurrent::Hash.new # URL -> ScrapedPage mapping @semaphore = Async::Semaphore.new(max_concurrent) @pages_scraped = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for scraped pages @active_jobs = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for active jobs @work_available = true # Flag to signal workers when to stop # Initialize state machine manually self.state = "idle" load_state if @state_container end state_machine :state, initial: :idle do event :start do transition idle: :running, stopped: :running, error: :running end event :pause do transition running: :paused end event :resume do transition paused: :running end event :stop do transition [:running, :paused, :error] => :stopped end event :error_occurred do transition [:running, :paused] => :error end event :reset do transition any => :idle end state :idle do def can_start? true end end state :running do def active? true end def can_process_pages? true end end state :paused do def active? true end def can_process_pages? false end end state :stopped do def active? false end def can_process_pages? false end end state :error do def active? false end def can_process_pages? false end end end # Define circuit breaker for page scraping circuit :page_scraping do threshold failures: 5, within: 1.minute # Trip after 5 failures in 1 minute reset_after 10.seconds # Wait 10 seconds before trying again fallback { nil } # Return nil when circuit is open # Additional configuration for robustness on_open { |circuit| puts "Circuit breaker opened for page scraping - too many failures" } on_close { |circuit| puts "Circuit breaker closed for page scraping - service recovered" } on_half_open { |circuit| puts "Circuit breaker half-open for page scraping - testing recovery" } end def run(&block) start # Transition to running state Sync(annotation: "AsyncScraper#run") do |task| # Start browser @browser = Ferrum::Browser.new( headless: true, timeout: @timeout, pending_connection_errors: false ) begin @url_queue = Async::Queue.new # Add initial URLs to queue if not resuming if @discovered_urls.empty? @initial_urls.each { |url| @discovered_urls.add(url) } end # Count URLs to process urls_to_process = @discovered_urls.reject { |url| @completed_urls.include?(url) } retryable_pages = @pages.values.select { |page| page.ready_for_processing? } # Push URLs to queue urls_to_process.each { |url| @url_queue.push(url) } retryable_pages.each do |page| @url_queue.push(page.url) @discovered_urls.add(page.url) end # If no work to do, complete immediately total_queued = urls_to_process.size + retryable_pages.size if total_queued == 0 puts "No URLs to process - all work completed" return @results end # Start orchestrator in background orchestrator = Async do orchestrate_work(&block) end # Start worker pool using simplified worker loop workers = @max_concurrent.times.map do |worker_id| Async do worker_loop(worker_id, &block) end end # Wait for orchestrator to signal completion with timeout begin orchestrator.wait rescue => e puts "Orchestrator error: #{e.message}" end # Ensure work is stopped @work_available = false # Send stop signals to all workers @max_concurrent.times { begin @url_queue.push(nil) rescue # Queue might be closed end } # Wait for all workers to complete with individual timeouts workers.each_with_index do |worker, idx| Timeout.timeout(2) do # Reduced to 2 seconds worker.wait end rescue Timeout::Error puts "Worker #{idx} timed out, force stopping" begin worker.stop rescue nil end end # Ensure all async tasks are completed if @active_jobs.value > 0 puts "Waiting for #{@active_jobs.value} active jobs to complete..." start_time = Time.now while @active_jobs.value > 0 && (Time.now - start_time) < 2 # Reduced to 2 seconds sleep(0.1) end if @active_jobs.value > 0 puts "Forcibly stopping #{@active_jobs.value} remaining jobs" # Note: We can't force reset atomic counter, jobs should clean up end end ensure save_state if @state_container # Force close the queue and drain any remaining items begin @url_queue&.close while @url_queue&.pop(timeout: 0.1); end rescue # Ignore errors during cleanup end @browser&.quit stop # Transition to stopped state end @results end end private # Orchestrator - monitors work and decides when to stop def orchestrate_work(&block) loop do sleep(0.1) # Check every 100ms for faster response # Check if we should stop if should_stop_all_work? puts "Orchestrator: Stopping all work" @work_available = false # Clear the queue to prevent workers from picking up more work begin while @url_queue.any? @url_queue.pop end rescue # Queue might be closed end break end end end # Check if all work is complete def should_stop_all_work? # Stop if we've hit page limit and no active jobs if max_pages_reached? return @active_jobs.value == 0 end # Stop if all pages are processed and no active jobs return true if @active_jobs.value == 0 && @url_queue.empty? && should_close_queue? # Otherwise keep going false rescue true # Stop on any error end # Simplified worker loop def worker_loop(worker_id, &block) while @work_available # Check page limit before getting URL if max_pages_reached? puts "Worker #{worker_id}: Page limit reached, stopping" break end url = @url_queue.pop break if url.nil? || url == @stop_signal # Stop signal # Skip if already completed if @completed_urls.include?(url) puts "Worker #{worker_id}: Skipping #{url} (already completed)" next end # Try to claim this URL for processing - atomic operation unless @claimed_urls.add?(url) puts "Worker #{worker_id}: Another worker claimed #{url}" next end # Double-check page limit before processing if max_pages_reached? puts "Worker #{worker_id}: Page limit reached after claiming URL" break end # Get or create page object page = @pages[url] ||= ScrapedPage.new(url) next unless page.ready_for_processing? # Process the page with semaphore @semaphore.async do @active_jobs.increment begin # Final check inside semaphore if !max_pages_reached? scrape_page(page, worker_id, &block) else puts "Worker #{worker_id}: Skipping page due to limit" end ensure @active_jobs.decrement end end end rescue => e puts "Worker #{worker_id} error: #{e.message}" end def scrape_page(page_obj, worker_id, &block) url = page_obj.url # Transition page to processing state page_obj.start_processing # Note: URL already added to visited_urls in worker_loop # Use circuit breaker for page scraping result = circuit(:page_scraping).wrap do browser_page = @browser.create_page browser_page.go_to(url) # Wait for page to load browser_page.network.wait_for_idle # Scroll to bottom to trigger lazy loading browser_page.execute("window.scrollTo(0, document.body.scrollHeight)") sleep(0.5) # Brief pause for content to load # Scroll back to top for consistency browser_page.execute("window.scrollTo(0, 0)") sleep(0.2) # Brief pause # Get page HTML html = browser_page.body # Check content size limit if html.bytesize > @max_content_size raise StandardError.new("Content size exceeds limit: #{html.bytesize} bytes > #{@max_content_size} bytes") end # Record success and transition state page_obj.record_success(html) page_obj.complete # Move from claimed to completed @claimed_urls.delete(url) @completed_urls.add(url) # Increment pages scraped counter @pages_scraped.increment # Create result result_data = { url: url, html: html, scraped_at: page_obj.scraped_at } # Yield to block if provided (for Rails integration) yield(result_data) if block_given? @results << result_data puts "Worker #{worker_id}: Scraped #{url} (#{@pages_scraped.value}#{@max_pages ? "/#{@max_pages}" : ""}) [#{page_obj.state}]" # If spider mode and haven't hit limit, find and queue new URLs if @mode == :spider && !max_pages_reached? extract_and_queue_links(html, url) end result_data ensure browser_page&.close end # Handle circuit breaker result if result.nil? # Circuit breaker is open or call failed page_obj.record_error(StandardError.new("Circuit breaker open or request failed")) page_obj.mark_failed puts "Failed to scrape #{url}: Circuit breaker open or error occurred [#{page_obj.state}]" # Check if page can be retried if page_obj.failed? && page_obj.can_retry? page_obj.retry_page # Release the claim so it can be retried @claimed_urls.delete(url) @url_queue.push(url) # Re-queue for retry puts "Re-queued #{url} for retry (attempt #{page_obj.retry_count + 1}) [#{page_obj.state}]" elsif page_obj.retry_count >= 3 page_obj.give_up # Move from claimed to completed (permanently failed) @claimed_urls.delete(url) @completed_urls.add(url) puts "Giving up on #{url} after #{page_obj.retry_count} attempts [#{page_obj.state}]" end end # Save state periodically save_state if @state_container && @completed_urls.size % 10 == 0 rescue => e # Handle unexpected errors page_obj.record_error(e) page_obj.mark_failed puts "Unexpected error scraping #{url}: #{e.message} [#{page_obj.state}]" # Retry logic for unexpected errors if page_obj.can_retry? page_obj.retry_page # Release the claim so it can be retried @claimed_urls.delete(url) @url_queue.push(url) puts "Re-queued #{url} for retry after error [#{page_obj.state}]" else page_obj.give_up # Move from claimed to completed (permanently failed) @claimed_urls.delete(url) @completed_urls.add(url) puts "Giving up on #{url} after errors [#{page_obj.state}]" end end def extract_and_queue_links(html, current_url) current_uri = URI.parse(current_url) # Use Nokogiri for better performance and accuracy doc = Nokogiri::HTML(html) links = doc.css("a[href]").map { |anchor| anchor["href"] }.compact links.each do |href| next if href.empty? || href.start_with?("#", "javascript:", "mailto:", "tel:", "ftp:") # Convert to absolute URL absolute_url = URI.join(current_url, href).to_s # Remove fragment identifier (#comment-XX, #section, etc.) - treat as same page clean_url = absolute_url.split("#").first # Check if URL is within the same host as current page uri = URI.parse(clean_url) next unless uri.host == current_uri.host # Skip asset files (CSS, JS, images, fonts, etc.) path = uri.path.downcase next if path.match?(/\.(css|js|png|jpg|jpeg|gif|svg|ico|pdf|zip|woff|woff2|ttf|eot|mp4|mp3|avi|mov)(\?|$)/) # Add to discovered set and queue only if it's a new URL if @discovered_urls.add?(clean_url) # Only add to queue if newly discovered @url_queue.push(clean_url) end end rescue => e puts "Error extracting links from #{current_url}: #{e.message}" end def max_pages_reached? @max_pages && @pages_scraped.value >= @max_pages end def should_close_queue? # Check if there are any pages that can still be retried has_retryable_pages = @pages.values.any? { |page| page.ready_for_processing? } if @host # Spider mode: close when no more URLs can be discovered # This happens when all pages are either completed or permanently failed # and no pages are ready for retry all_pages_processed = @pages.values.all? { |page| page.completed? || page.permanently_failed? } all_pages_processed && !has_retryable_pages else # Constrained mode: close when all initial URLs have been processed all_initial_processed = @initial_urls.all? do |url| page = @pages[url] page && (page.completed? || page.permanently_failed?) end all_initial_processed && !has_retryable_pages end end def save_state return unless @state_container state = { host: @host, initial_urls: @initial_urls, completed_urls: @completed_urls.to_a, discovered_urls: @discovered_urls.to_a, max_concurrent: @max_concurrent, timeout: @timeout, max_pages: @max_pages, pages_scraped: @pages_scraped.value, scraper_state: self.state, pages: @pages.transform_values(&:to_state_h) } @state_container.write_state(state) end def load_state return unless @state_container state = @state_container.read_state return unless state @completed_urls = Concurrent::Set.new(state["completed_urls"] || []) # Backward compatibility: check for both new and old key names discovered_urls_data = state["discovered_urls"] || state["pending_urls"] || [] @discovered_urls = Concurrent::Set.new(discovered_urls_data) @pages_scraped = Concurrent::AtomicFixnum.new(state["pages_scraped"] || 0) # Restore page states if state["pages"] @pages = state["pages"].transform_values { |page_data| ScrapedPage.from_h(page_data) } end # Note: scraper state will be restored when run() is called completed_pages = @pages.values.count { |p| p.completed? } failed_pages = @pages.values.count { |p| p.permanently_failed? } retry_pages = @pages.values.count { |p| p.retrying? } puts "Resuming with #{@completed_urls.size} completed, #{@discovered_urls.size} discovered URLs, #{@pages_scraped.value} pages scraped" puts "Page states: #{completed_pages} completed, #{failed_pages} failed, #{retry_pages} retrying" end end class AsyncMarkdownScraper attr_reader :scraper def initialize(*urls, **scraper_options) @scraper = AsyncScraper.new(*urls, **scraper_options) end def run(&block) @scraper.run do |page_data| # Convert to markdown with frontmatter markdown_data = process_page(page_data) # Yield the processed data to the block yield(markdown_data) if block_given? end end private def process_page(page_data) html = page_data[:html] url = page_data[:url] puts "Processing: #{url} (#{html.length} chars)" # Extract frontmatter frontmatter = extract_meta_frontmatter(html, url) puts " → Extracted frontmatter" # Convert HTML to clean markdown markdown_content = html_to_clean_markdown(html) puts " → Converted to markdown (#{markdown_content.length} chars)" # Add markdown and frontmatter keys to the original page data page_data.merge(markdown: markdown_content, frontmatter: frontmatter) end # Helper method to extract meta information as YAML front matter def extract_meta_frontmatter(html, url) doc = Nokogiri::HTML(html) frontmatter = ["---"] # Extract title title = doc.at("title")&.text&.strip frontmatter << "title: \"#{title.gsub('"', '\\"')}\"" if title && !title.empty? # Extract meta description description = doc.at('meta[name="description"]')&.[]("content")&.strip frontmatter << "description: \"#{description.gsub('"', '\\"')}\"" if description && !description.empty? # Extract meta keywords keywords = doc.at('meta[name="keywords"]')&.[]("content")&.strip if keywords && !keywords.empty? keywords_array = keywords.split(",").map(&:strip).reject(&:empty?) frontmatter << "keywords:" keywords_array.each { |keyword| frontmatter << " - \"#{keyword.gsub('"', '\\"')}\"" } end # Extract author information author = doc.at('meta[name="author"]')&.[]("content")&.strip frontmatter << "author: \"#{author.gsub('"', '\\"')}\"" if author && !author.empty? # Extract language lang = doc.at("html")&.[]("lang") || doc.at('meta[http-equiv="content-language"]')&.[]("content") frontmatter << "language: \"#{lang}\"" if lang && !lang.empty? # Extract canonical URL canonical = doc.at('link[rel="canonical"]')&.[]("href")&.strip frontmatter << "canonical_url: \"#{canonical}\"" if canonical && !canonical.empty? # Extract robots meta robots = doc.at('meta[name="robots"]')&.[]("content")&.strip frontmatter << "robots: \"#{robots}\"" if robots && !robots.empty? # Add URL for reference frontmatter << "url: \"#{url}\"" # Add scraped timestamp frontmatter << "scraped_at: \"#{Time.now.iso8601}\"" frontmatter << "---" frontmatter << "" frontmatter.join("\n") end # Helper method to convert HTML to clean markdown def html_to_clean_markdown(html) doc = Nokogiri::HTML(html) # Remove UI and navigation elements first doc.css("script, style, svg, noscript, iframe, object, embed, applet, form, nav, footer").remove doc.css("[data-testid*='nav'], [data-testid*='footer']").remove doc.xpath("//comment()").remove # Use pandoc with HTML reader options to flatten nested divs and spans PandocRuby.convert( doc.to_html, :from => "html-native_divs-native_spans-empty_paragraphs", :to => "markdown-raw_html-auto_identifiers", "wrap" => "none" ) end end # Example state containers class FileStateContainer def initialize(file_path) @file_path = file_path end def read_state return nil unless File.exist?(@file_path) JSON.parse(File.read(@file_path)) rescue JSON::ParserError nil end def write_state(state) File.write(@file_path, JSON.pretty_generate(state)) puts "State saved to #{@file_path}" end end class RedisStateContainer def initialize(redis_client, key) @redis = redis_client @key = key end def read_state json = @redis.get(@key) json ? JSON.parse(json) : nil rescue JSON::ParserError nil end def write_state(state) @redis.set(@key, JSON.generate(state)) puts "State saved to Redis key: #{@key}" end end class RailsModelStateContainer def initialize(model_instance) @model = model_instance end def read_state return nil unless @model.scraper_state.present? JSON.parse(@model.scraper_state) rescue JSON::ParserError nil end def write_state(state) @model.update!(scraper_state: JSON.generate(state)) puts "State saved to model ID: #{@model.id}" end end # Example usage demonstrating proper resource management: if __FILE__ == $0 # Helper method that scopes resources properly (like an application would) def run_scraping_example(name, &block) puts "=== #{name} ===" begin block.call puts "✓ #{name} completed successfully" rescue => e puts "✗ #{name} failed: #{e.message}" ensure # Force cleanup of any lingering resources ObjectSpace.each_object(Ferrum::Browser) { |browser| begin browser.quit rescue nil end } ObjectSpace.each_object(Async::Task) { |task| begin task.stop rescue nil end } GC.start sleep(0.1) # Brief pause for cleanup end puts end # Example 1: Basic scraping of specific URLs run_scraping_example("Basic Scraping Example") do results = [] AsyncScraper.new( "https://radioactive-labs.github.io/plutonium-core/", "https://radioactive-labs.github.io/plutonium-core/guide/", max_concurrent: 2, max_pages: 2 ).run do |page_data| results << page_data puts "Scraped: #{page_data[:url]} (#{page_data[:html].length} chars)" end puts "Processed #{results.size} pages" end # Example 2: Spider mode with markdown conversion using AsyncMarkdownScraper run_scraping_example("Spider Mode with AsyncMarkdownScraper Example") do state_container = FileStateContainer.new("scraper_state.json") output_folder = "scraped_pages" # Create output folder FileUtils.mkdir_p(output_folder) # Helper method to generate filename from URL def generate_filename(url) uri = URI.parse(url) # Build filename from URL components parts = [] parts << uri.host.gsub(/[^a-z0-9\-_]/i, "-") if uri.host if uri.path && uri.path != "/" path_part = uri.path.gsub(/[^a-z0-9\-_]/i, "-").squeeze("-") parts << path_part unless path_part.empty? else parts << "index" end filename = parts.join("-").gsub(/^-|-$/, "") filename = "page" if filename.empty? "#{filename}.md" end AsyncMarkdownScraper.new( "https://radioactive-labs.github.io/plutonium-core/", mode: :spider, max_concurrent: 3, max_pages: 5, state_container: state_container, max_content_size: 5_000_000 ).run do |page_data| # Combine frontmatter and markdown content = page_data[:frontmatter] + page_data[:markdown] # Generate filename and save filename = generate_filename(page_data[:url]) file_path = File.join(output_folder, filename) File.write(file_path, content) puts "Saved: #{page_data[:url]} → #{filename}" end puts "All pages saved to: #{output_folder}/" end # Example 3: Resumable scraping with state persistence run_scraping_example("Resumable Scraping Example") do state_container = FileStateContainer.new("resumable_scraper_state.json") AsyncScraper.new( "https://radioactive-labs.github.io/plutonium-core/", mode: :spider, max_concurrent: 2, max_pages: 3, state_container: state_container ).run do |page_data| puts "Processed: #{page_data[:url]}" end puts "State saved - can resume later by running the same command" end puts "🎉 All examples completed!" puts "💡 Notice: Each scraper was scoped to its own block" puts "🔄 Resources automatically cleaned up between examples" # Clean exit (like application shutdown) exit(0) end