Created
March 8, 2019 08:40
-
-
Save liispon/f2520b2aba56a21ead9b971945c7ac0a to your computer and use it in GitHub Desktop.
Revisions
-
matthewlilley created this gist
Aug 18, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,202 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- import os import sys import json import time import errno import base64 import pychrome import threading import logging import psutil as psutil from bs4 import BeautifulSoup from argparse import ArgumentParser from subprocess import Popen, PIPE # Cross-compatible import for urlparse if sys.version_info >= (3, 0): from urllib.parse import urlparse if (3, 0) > sys.version_info >= (2, 5): from urlparse import urlparse # Log logging.basicConfig(filename='app.log', level=logging.DEBUG, format='(%(threadName)-9s) %(message)s', ) # Define CLI Arguments parser = ArgumentParser() parser.add_argument('--urls', help='e.g. --urls=https://google.com,https://facebook.com,https://ebay.com', type=lambda s: [str(item) for item in s.split(',')]) parser.add_argument('--window-size', help='e.g. --window-size=1024,768', type=lambda s: [int(item) for item in s.split(',')], default='1024,768') parser.add_argument('--user-agent', help='e.g. --user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"', type=str) parser.add_argument('--timeout', help='e.g. --timeout=60', type=int, default=60) parser.add_argument('--force-kill', help='e.g. --force-kill', type=bool, default=False) # Parse Arguments args = parser.parse_args() print(args) class Handler(object): lock = threading.Lock() def __init__(self, browser, tab): self.browser = browser self.tab = tab self.start_frame = None self.is_first_request = True self.is_first_response = True self.url = None if args.user_agent is not None: self.tab.Network.setUserAgentOverride(userAgent=args.user_agent) def frame_started_loading(self, frameId): if not self.start_frame: self.start_frame = frameId def frame_stopped_loading(self, frameId): if self.start_frame == frameId: self.tab.Page.stopLoading() with self.lock: try: # Activate Tab self.browser.activate_tab(self.tab.id) print('Activated Tab for %s' % self.url) # Document document = self.tab.DOM.getDocument() # Full DOM dom = self.tab.DOM.getOuterHTML(nodeId=document['root']['nodeId']) # Use Beautiful Soup to Prettify print('Prettify HTML and write to file.') soup = BeautifulSoup(dom['outerHTML'], 'html.parser') prettyHTML = soup.prettify() outerHtmlFile = '%s/outer.html' % self.url if not os.path.exists(os.path.dirname(outerHtmlFile)): try: os.makedirs(os.path.dirname(outerHtmlFile)) except OSError as exc: # Guard against race condition if exc.errno != errno.EEXIST: raise with open(outerHtmlFile, 'wb') as outerHtml: outerHtml.write(prettyHTML.encode('utf-8')) # Full Page Screenshot print('Take full page screenshot and write binary to file.') self.tab.Emulation.setDeviceMetricsOverride(width=args.window_size[0], height=args.window_size[1], deviceScaleFactor=0.0, mobile=False, fitWindow=False) body = self.tab.DOM.querySelector(nodeId=document['root']['nodeId'], selector='body') box = self.tab.DOM.getBoxModel(nodeId=body['nodeId']) self.tab.Emulation.setVisibleSize(width=args.window_size[0], height=box['model']['height']) self.tab.Emulation.forceViewport(x=0, y=0, scale=1) screenshot = self.tab.Page.captureScreenshot() screenshotFile = '%s/screenshot.png' % self.url if not os.path.exists(os.path.dirname(screenshotFile)): try: os.makedirs(os.path.dirname(screenshotFile)) except OSError as exc: # Guard against race condition if exc.errno != errno.EEXIST: raise with open(screenshotFile, 'wb') as ss: ss.write(base64.b64decode(screenshot['data'])) finally: print('Tab stop.') self.tab.stop() def request_will_be_sent(self, **kwargs): if self.is_first_request: self.is_first_request = False # Set the URL we're making the request to. self.url = urlparse(kwargs.get('request').get('url')).hostname.replace('www.', '') print('Loading: %s' % self.url) def response_received(self, **kwargs): if self.is_first_response: self.is_first_response = False print('Response from %s' % self.url) print('Prettify JSON headers and write to file.') headersFile = '%s/headers.json' % self.url if not os.path.exists(os.path.dirname(headersFile)): try: os.makedirs(os.path.dirname(headersFile)) except OSError as exc: # Guard against race condition if exc.errno != errno.EEXIST: raise with open(headersFile, 'w') as h: h.write(json.dumps(kwargs.get('response').get('headers'), indent=2)) def detached(**kwargs): print('Detached') # Close all tabs utility def close_all_tabs(browser): if len(browser.list_tab()) == 0: return for tab in browser.list_tab(): try: tab.stop() except pychrome.RuntimeException: pass browser.close_tab(tab) time.sleep(1) assert len(browser.list_tab()) == 0 def main(): chromeArguments = ['/usr/bin/google-chrome', '--headless', '--hide-scrollbars', '--disable-gpu', '--remote-debugging-port=9222'] # Find Chrome utility function. def find_chrome(): for process in psutil.process_iter(): if process.name() == 'chrome' and chromeArguments == process.cmdline(): return process return False # Chrome if find_chrome(): print('A Google Chrome process already exists with the arguments we need... we\'ll use that.') else: print('Starting Chrome.') devnull = open(os.devnull, 'wb') Popen(chromeArguments, shell=False, stdout=PIPE, stderr=devnull) # We have to block for 1s to prevent a race condition. time.sleep(1) print('Chrome is running... Let\'s interact with in through the chrome dev tools protocol.') browser = pychrome.Browser() close_all_tabs(browser) tabs = [] for i in range(len(args.urls)): tabs.append(browser.new_tab()) for i, tab in enumerate(tabs): eh = Handler(browser, tab) tab.Page.frameStartedLoading = eh.frame_started_loading tab.Page.frameStoppedLoading = eh.frame_stopped_loading tab.Network.requestWillBeSent = eh.request_will_be_sent tab.Network.responseReceived = eh.response_received tab.Inspector.detached = eh.detached tab.Network.enable() tab.Page.stopLoading() tab.Page.enable() tab.Page.navigate(url=args.urls[i]) for i, tab in enumerate(tabs): success = tab.wait(args.timeout) if not success: print('Timeout.') print('Close Tab.') browser.close_tab(tab) if args.force_kill: print('Force Kill Chrome.') find_chrome().kill() print('Complete.') if __name__ == '__main__': main()