Created
March 8, 2019 08:40
-
-
Save liispon/f2520b2aba56a21ead9b971945c7ac0a to your computer and use it in GitHub Desktop.
pychrome
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import errno | |
| import base64 | |
| import pychrome | |
| import threading | |
| import logging | |
| import psutil as psutil | |
| from bs4 import BeautifulSoup | |
| from argparse import ArgumentParser | |
| from subprocess import Popen, PIPE | |
| # Cross-compatible import for urlparse | |
| if sys.version_info >= (3, 0): | |
| from urllib.parse import urlparse | |
| if (3, 0) > sys.version_info >= (2, 5): | |
| from urlparse import urlparse | |
| # Log | |
| logging.basicConfig(filename='app.log', | |
| level=logging.DEBUG, | |
| format='(%(threadName)-9s) %(message)s', ) | |
| # Define CLI Arguments | |
| parser = ArgumentParser() | |
| parser.add_argument('--urls', help='e.g. --urls=https://google.com,https://facebook.com,https://ebay.com', | |
| type=lambda s: [str(item) for item in s.split(',')]) | |
| parser.add_argument('--window-size', help='e.g. --window-size=1024,768', | |
| type=lambda s: [int(item) for item in s.split(',')], default='1024,768') | |
| parser.add_argument('--user-agent', | |
| help='e.g. --user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"', | |
| type=str) | |
| parser.add_argument('--timeout', help='e.g. --timeout=60', type=int, default=60) | |
| parser.add_argument('--force-kill', help='e.g. --force-kill', type=bool, default=False) | |
| # Parse Arguments | |
| args = parser.parse_args() | |
| print(args) | |
| class Handler(object): | |
| lock = threading.Lock() | |
| def __init__(self, browser, tab): | |
| self.browser = browser | |
| self.tab = tab | |
| self.start_frame = None | |
| self.is_first_request = True | |
| self.is_first_response = True | |
| self.url = None | |
| if args.user_agent is not None: | |
| self.tab.Network.setUserAgentOverride(userAgent=args.user_agent) | |
| def frame_started_loading(self, frameId): | |
| if not self.start_frame: | |
| self.start_frame = frameId | |
| def frame_stopped_loading(self, frameId): | |
| if self.start_frame == frameId: | |
| self.tab.Page.stopLoading() | |
| with self.lock: | |
| try: | |
| # Activate Tab | |
| self.browser.activate_tab(self.tab.id) | |
| print('Activated Tab for %s' % self.url) | |
| # Document | |
| document = self.tab.DOM.getDocument() | |
| # Full DOM | |
| dom = self.tab.DOM.getOuterHTML(nodeId=document['root']['nodeId']) | |
| # Use Beautiful Soup to Prettify | |
| print('Prettify HTML and write to file.') | |
| soup = BeautifulSoup(dom['outerHTML'], 'html.parser') | |
| prettyHTML = soup.prettify() | |
| outerHtmlFile = '%s/outer.html' % self.url | |
| if not os.path.exists(os.path.dirname(outerHtmlFile)): | |
| try: | |
| os.makedirs(os.path.dirname(outerHtmlFile)) | |
| except OSError as exc: # Guard against race condition | |
| if exc.errno != errno.EEXIST: | |
| raise | |
| with open(outerHtmlFile, 'wb') as outerHtml: | |
| outerHtml.write(prettyHTML.encode('utf-8')) | |
| # Full Page Screenshot | |
| print('Take full page screenshot and write binary to file.') | |
| self.tab.Emulation.setDeviceMetricsOverride(width=args.window_size[0], height=args.window_size[1], | |
| deviceScaleFactor=0.0, mobile=False, fitWindow=False) | |
| body = self.tab.DOM.querySelector(nodeId=document['root']['nodeId'], selector='body') | |
| box = self.tab.DOM.getBoxModel(nodeId=body['nodeId']) | |
| self.tab.Emulation.setVisibleSize(width=args.window_size[0], height=box['model']['height']) | |
| self.tab.Emulation.forceViewport(x=0, y=0, scale=1) | |
| screenshot = self.tab.Page.captureScreenshot() | |
| screenshotFile = '%s/screenshot.png' % self.url | |
| if not os.path.exists(os.path.dirname(screenshotFile)): | |
| try: | |
| os.makedirs(os.path.dirname(screenshotFile)) | |
| except OSError as exc: # Guard against race condition | |
| if exc.errno != errno.EEXIST: | |
| raise | |
| with open(screenshotFile, 'wb') as ss: | |
| ss.write(base64.b64decode(screenshot['data'])) | |
| finally: | |
| print('Tab stop.') | |
| self.tab.stop() | |
| def request_will_be_sent(self, **kwargs): | |
| if self.is_first_request: | |
| self.is_first_request = False | |
| # Set the URL we're making the request to. | |
| self.url = urlparse(kwargs.get('request').get('url')).hostname.replace('www.', '') | |
| print('Loading: %s' % self.url) | |
| def response_received(self, **kwargs): | |
| if self.is_first_response: | |
| self.is_first_response = False | |
| print('Response from %s' % self.url) | |
| print('Prettify JSON headers and write to file.') | |
| headersFile = '%s/headers.json' % self.url | |
| if not os.path.exists(os.path.dirname(headersFile)): | |
| try: | |
| os.makedirs(os.path.dirname(headersFile)) | |
| except OSError as exc: # Guard against race condition | |
| if exc.errno != errno.EEXIST: | |
| raise | |
| with open(headersFile, 'w') as h: | |
| h.write(json.dumps(kwargs.get('response').get('headers'), indent=2)) | |
| def detached(**kwargs): | |
| print('Detached') | |
| # Close all tabs utility | |
| def close_all_tabs(browser): | |
| if len(browser.list_tab()) == 0: | |
| return | |
| for tab in browser.list_tab(): | |
| try: | |
| tab.stop() | |
| except pychrome.RuntimeException: | |
| pass | |
| browser.close_tab(tab) | |
| time.sleep(1) | |
| assert len(browser.list_tab()) == 0 | |
| def main(): | |
| chromeArguments = ['/usr/bin/google-chrome', '--headless', '--hide-scrollbars', '--disable-gpu', | |
| '--remote-debugging-port=9222'] | |
| # Find Chrome utility function. | |
| def find_chrome(): | |
| for process in psutil.process_iter(): | |
| if process.name() == 'chrome' and chromeArguments == process.cmdline(): | |
| return process | |
| return False | |
| # Chrome | |
| if find_chrome(): | |
| print('A Google Chrome process already exists with the arguments we need... we\'ll use that.') | |
| else: | |
| print('Starting Chrome.') | |
| devnull = open(os.devnull, 'wb') | |
| Popen(chromeArguments, | |
| shell=False, | |
| stdout=PIPE, | |
| stderr=devnull) | |
| # We have to block for 1s to prevent a race condition. | |
| time.sleep(1) | |
| print('Chrome is running... Let\'s interact with in through the chrome dev tools protocol.') | |
| browser = pychrome.Browser() | |
| close_all_tabs(browser) | |
| tabs = [] | |
| for i in range(len(args.urls)): | |
| tabs.append(browser.new_tab()) | |
| for i, tab in enumerate(tabs): | |
| eh = Handler(browser, tab) | |
| tab.Page.frameStartedLoading = eh.frame_started_loading | |
| tab.Page.frameStoppedLoading = eh.frame_stopped_loading | |
| tab.Network.requestWillBeSent = eh.request_will_be_sent | |
| tab.Network.responseReceived = eh.response_received | |
| tab.Inspector.detached = eh.detached | |
| tab.Network.enable() | |
| tab.Page.stopLoading() | |
| tab.Page.enable() | |
| tab.Page.navigate(url=args.urls[i]) | |
| for i, tab in enumerate(tabs): | |
| success = tab.wait(args.timeout) | |
| if not success: | |
| print('Timeout.') | |
| print('Close Tab.') | |
| browser.close_tab(tab) | |
| if args.force_kill: | |
| print('Force Kill Chrome.') | |
| find_chrome().kill() | |
| print('Complete.') | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment