Skip to content

Instantly share code, notes, and snippets.

@liispon
Created March 8, 2019 08:40
Show Gist options
  • Select an option

  • Save liispon/f2520b2aba56a21ead9b971945c7ac0a to your computer and use it in GitHub Desktop.

Select an option

Save liispon/f2520b2aba56a21ead9b971945c7ac0a to your computer and use it in GitHub Desktop.

Revisions

  1. @matthewlilley matthewlilley created this gist Aug 18, 2017.
    202 changes: 202 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,202 @@
    #! /usr/bin/env python
    # -*- coding: utf-8 -*-

    import os
    import sys
    import json
    import time
    import errno
    import base64
    import pychrome
    import threading
    import logging
    import psutil as psutil
    from bs4 import BeautifulSoup
    from argparse import ArgumentParser
    from subprocess import Popen, PIPE

    # Cross-compatible import for urlparse
    if sys.version_info >= (3, 0):
    from urllib.parse import urlparse
    if (3, 0) > sys.version_info >= (2, 5):
    from urlparse import urlparse

    # Log
    logging.basicConfig(filename='app.log',
    level=logging.DEBUG,
    format='(%(threadName)-9s) %(message)s', )

    # Define CLI Arguments
    parser = ArgumentParser()
    parser.add_argument('--urls', help='e.g. --urls=https://google.com,https://facebook.com,https://ebay.com',
    type=lambda s: [str(item) for item in s.split(',')])
    parser.add_argument('--window-size', help='e.g. --window-size=1024,768',
    type=lambda s: [int(item) for item in s.split(',')], default='1024,768')
    parser.add_argument('--user-agent',
    help='e.g. --user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"',
    type=str)
    parser.add_argument('--timeout', help='e.g. --timeout=60', type=int, default=60)
    parser.add_argument('--force-kill', help='e.g. --force-kill', type=bool, default=False)

    # Parse Arguments
    args = parser.parse_args()
    print(args)


    class Handler(object):
    lock = threading.Lock()

    def __init__(self, browser, tab):
    self.browser = browser
    self.tab = tab
    self.start_frame = None
    self.is_first_request = True
    self.is_first_response = True
    self.url = None
    if args.user_agent is not None:
    self.tab.Network.setUserAgentOverride(userAgent=args.user_agent)

    def frame_started_loading(self, frameId):
    if not self.start_frame:
    self.start_frame = frameId

    def frame_stopped_loading(self, frameId):
    if self.start_frame == frameId:
    self.tab.Page.stopLoading()
    with self.lock:
    try:
    # Activate Tab
    self.browser.activate_tab(self.tab.id)
    print('Activated Tab for %s' % self.url)
    # Document
    document = self.tab.DOM.getDocument()
    # Full DOM
    dom = self.tab.DOM.getOuterHTML(nodeId=document['root']['nodeId'])
    # Use Beautiful Soup to Prettify
    print('Prettify HTML and write to file.')
    soup = BeautifulSoup(dom['outerHTML'], 'html.parser')
    prettyHTML = soup.prettify()
    outerHtmlFile = '%s/outer.html' % self.url
    if not os.path.exists(os.path.dirname(outerHtmlFile)):
    try:
    os.makedirs(os.path.dirname(outerHtmlFile))
    except OSError as exc: # Guard against race condition
    if exc.errno != errno.EEXIST:
    raise
    with open(outerHtmlFile, 'wb') as outerHtml:
    outerHtml.write(prettyHTML.encode('utf-8'))
    # Full Page Screenshot
    print('Take full page screenshot and write binary to file.')
    self.tab.Emulation.setDeviceMetricsOverride(width=args.window_size[0], height=args.window_size[1],
    deviceScaleFactor=0.0, mobile=False, fitWindow=False)
    body = self.tab.DOM.querySelector(nodeId=document['root']['nodeId'], selector='body')
    box = self.tab.DOM.getBoxModel(nodeId=body['nodeId'])
    self.tab.Emulation.setVisibleSize(width=args.window_size[0], height=box['model']['height'])
    self.tab.Emulation.forceViewport(x=0, y=0, scale=1)
    screenshot = self.tab.Page.captureScreenshot()
    screenshotFile = '%s/screenshot.png' % self.url
    if not os.path.exists(os.path.dirname(screenshotFile)):
    try:
    os.makedirs(os.path.dirname(screenshotFile))
    except OSError as exc: # Guard against race condition
    if exc.errno != errno.EEXIST:
    raise
    with open(screenshotFile, 'wb') as ss:
    ss.write(base64.b64decode(screenshot['data']))
    finally:
    print('Tab stop.')
    self.tab.stop()

    def request_will_be_sent(self, **kwargs):
    if self.is_first_request:
    self.is_first_request = False
    # Set the URL we're making the request to.
    self.url = urlparse(kwargs.get('request').get('url')).hostname.replace('www.', '')
    print('Loading: %s' % self.url)

    def response_received(self, **kwargs):
    if self.is_first_response:
    self.is_first_response = False
    print('Response from %s' % self.url)
    print('Prettify JSON headers and write to file.')
    headersFile = '%s/headers.json' % self.url
    if not os.path.exists(os.path.dirname(headersFile)):
    try:
    os.makedirs(os.path.dirname(headersFile))
    except OSError as exc: # Guard against race condition
    if exc.errno != errno.EEXIST:
    raise
    with open(headersFile, 'w') as h:
    h.write(json.dumps(kwargs.get('response').get('headers'), indent=2))

    def detached(**kwargs):
    print('Detached')

    # Close all tabs utility
    def close_all_tabs(browser):
    if len(browser.list_tab()) == 0:
    return
    for tab in browser.list_tab():
    try:
    tab.stop()
    except pychrome.RuntimeException:
    pass
    browser.close_tab(tab)
    time.sleep(1)
    assert len(browser.list_tab()) == 0


    def main():
    chromeArguments = ['/usr/bin/google-chrome', '--headless', '--hide-scrollbars', '--disable-gpu',
    '--remote-debugging-port=9222']

    # Find Chrome utility function.
    def find_chrome():
    for process in psutil.process_iter():
    if process.name() == 'chrome' and chromeArguments == process.cmdline():
    return process
    return False

    # Chrome
    if find_chrome():
    print('A Google Chrome process already exists with the arguments we need... we\'ll use that.')
    else:
    print('Starting Chrome.')
    devnull = open(os.devnull, 'wb')
    Popen(chromeArguments,
    shell=False,
    stdout=PIPE,
    stderr=devnull)
    # We have to block for 1s to prevent a race condition.
    time.sleep(1)
    print('Chrome is running... Let\'s interact with in through the chrome dev tools protocol.')
    browser = pychrome.Browser()
    close_all_tabs(browser)
    tabs = []
    for i in range(len(args.urls)):
    tabs.append(browser.new_tab())
    for i, tab in enumerate(tabs):
    eh = Handler(browser, tab)
    tab.Page.frameStartedLoading = eh.frame_started_loading
    tab.Page.frameStoppedLoading = eh.frame_stopped_loading
    tab.Network.requestWillBeSent = eh.request_will_be_sent
    tab.Network.responseReceived = eh.response_received
    tab.Inspector.detached = eh.detached
    tab.Network.enable()
    tab.Page.stopLoading()
    tab.Page.enable()
    tab.Page.navigate(url=args.urls[i])
    for i, tab in enumerate(tabs):
    success = tab.wait(args.timeout)
    if not success:
    print('Timeout.')
    print('Close Tab.')
    browser.close_tab(tab)
    if args.force_kill:
    print('Force Kill Chrome.')
    find_chrome().kill()
    print('Complete.')


    if __name__ == '__main__':
    main()