#!/usr/bin/env python3 """ Warm the caches of your website by crawling each page or sitemap index defined in sitemap.xml. To use, download this file and make it executable. Then run: ./cache-warmer.py --threads 4 --interval 10 --file https://example.com/sitemap.xml -v ./cache-warmer.py --threads 4 --interval 10 --file /data/web/public/sitemap.xml -v """ import argparse from multiprocessing.pool import ThreadPool import os.path import re import sys import time import requests import subprocess results = [] start = time.time() USERAGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36' def parse_options(): parser = argparse.ArgumentParser(description="""Cache crawler based on a sitemap.xml file""") parser.add_argument('-t', '--threads', help='How many threads to use', default=10, required=False, type=int) parser.add_argument('-i', '--interval', help='How many seconds to wait after each round', default=1, required=False, type=int) parser.add_argument('-f', '--file', help='The sitemap xml file', required=True, type=str) parser.add_argument('-v', '--verbose', help='Be more verbose', action='store_true', default=False) args = parser.parse_args() if not args.file.startswith('http') and not os.path.isfile(args.file): parser.error('Could not find sitemap file %s' % args.file) return args def crawl_url(url, verbose=False, interval=1): if verbose: print("Crawling {}".format(url)) time.sleep(interval) a = requests.get(url, headers={"user-agent": USERAGENT}) return {'exit': 0 if a.ok() else 1, 'out': a.text, 'url': url} def make_results(): errcount = 0 exec_time = format(time.time() - start, '.4f') for item in results: if item['exit'] == 0: continue else: errcount += 1 print("Errors detected in %s:\n%s\n" % (item['url'], item['out'])) print("=" * 50) if errcount == 0: print("All DONE! - All urls are warmed! - done in %s " % exec_time) return 0 else: print("%d Errors detected! - done in %ss" % (errcount, exec_time)) return 1 def get_sitemap_urls(p): if p.startswith('http'): r = requests.get(p, headers={'User-Agent': USERAGENT}) c = str(r.content) else: with open(p) as fh: c = fh.read() urls = [] if 'sitemapindex' in c: sitemaps = re.findall('(.*?)?', c) for s in sitemaps: urls.extend(get_sitemap_urls(s)) return urls return re.findall('(.*?)?', c) def callback(output): results.append(output) def main(): args = parse_options() sitemap_urls = get_sitemap_urls(args.file) if args.verbose: print("Crawling {} urls with {} threads\n[Please Wait!]".format(len(sitemap_urls), args.threads)) print("=" * 50) pool = ThreadPool(args.threads) for url in sitemap_urls: pool.apply_async(crawl_url, args=(url, args.verbose, args.interval), callback=callback) pool.close() pool.join() sys.exit(make_results()) if __name__ == "__main__": main()