Skip to content

Instantly share code, notes, and snippets.

@liispon
Created March 8, 2019 08:40
Show Gist options
  • Select an option

  • Save liispon/f2520b2aba56a21ead9b971945c7ac0a to your computer and use it in GitHub Desktop.

Select an option

Save liispon/f2520b2aba56a21ead9b971945c7ac0a to your computer and use it in GitHub Desktop.
pychrome
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import json
import time
import errno
import base64
import pychrome
import threading
import logging
import psutil as psutil
from bs4 import BeautifulSoup
from argparse import ArgumentParser
from subprocess import Popen, PIPE
# Cross-compatible import for urlparse
if sys.version_info >= (3, 0):
from urllib.parse import urlparse
if (3, 0) > sys.version_info >= (2, 5):
from urlparse import urlparse
# Log
logging.basicConfig(filename='app.log',
level=logging.DEBUG,
format='(%(threadName)-9s) %(message)s', )
# Define CLI Arguments
parser = ArgumentParser()
parser.add_argument('--urls', help='e.g. --urls=https://google.com,https://facebook.com,https://ebay.com',
type=lambda s: [str(item) for item in s.split(',')])
parser.add_argument('--window-size', help='e.g. --window-size=1024,768',
type=lambda s: [int(item) for item in s.split(',')], default='1024,768')
parser.add_argument('--user-agent',
help='e.g. --user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"',
type=str)
parser.add_argument('--timeout', help='e.g. --timeout=60', type=int, default=60)
parser.add_argument('--force-kill', help='e.g. --force-kill', type=bool, default=False)
# Parse Arguments
args = parser.parse_args()
print(args)
class Handler(object):
lock = threading.Lock()
def __init__(self, browser, tab):
self.browser = browser
self.tab = tab
self.start_frame = None
self.is_first_request = True
self.is_first_response = True
self.url = None
if args.user_agent is not None:
self.tab.Network.setUserAgentOverride(userAgent=args.user_agent)
def frame_started_loading(self, frameId):
if not self.start_frame:
self.start_frame = frameId
def frame_stopped_loading(self, frameId):
if self.start_frame == frameId:
self.tab.Page.stopLoading()
with self.lock:
try:
# Activate Tab
self.browser.activate_tab(self.tab.id)
print('Activated Tab for %s' % self.url)
# Document
document = self.tab.DOM.getDocument()
# Full DOM
dom = self.tab.DOM.getOuterHTML(nodeId=document['root']['nodeId'])
# Use Beautiful Soup to Prettify
print('Prettify HTML and write to file.')
soup = BeautifulSoup(dom['outerHTML'], 'html.parser')
prettyHTML = soup.prettify()
outerHtmlFile = '%s/outer.html' % self.url
if not os.path.exists(os.path.dirname(outerHtmlFile)):
try:
os.makedirs(os.path.dirname(outerHtmlFile))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
with open(outerHtmlFile, 'wb') as outerHtml:
outerHtml.write(prettyHTML.encode('utf-8'))
# Full Page Screenshot
print('Take full page screenshot and write binary to file.')
self.tab.Emulation.setDeviceMetricsOverride(width=args.window_size[0], height=args.window_size[1],
deviceScaleFactor=0.0, mobile=False, fitWindow=False)
body = self.tab.DOM.querySelector(nodeId=document['root']['nodeId'], selector='body')
box = self.tab.DOM.getBoxModel(nodeId=body['nodeId'])
self.tab.Emulation.setVisibleSize(width=args.window_size[0], height=box['model']['height'])
self.tab.Emulation.forceViewport(x=0, y=0, scale=1)
screenshot = self.tab.Page.captureScreenshot()
screenshotFile = '%s/screenshot.png' % self.url
if not os.path.exists(os.path.dirname(screenshotFile)):
try:
os.makedirs(os.path.dirname(screenshotFile))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
with open(screenshotFile, 'wb') as ss:
ss.write(base64.b64decode(screenshot['data']))
finally:
print('Tab stop.')
self.tab.stop()
def request_will_be_sent(self, **kwargs):
if self.is_first_request:
self.is_first_request = False
# Set the URL we're making the request to.
self.url = urlparse(kwargs.get('request').get('url')).hostname.replace('www.', '')
print('Loading: %s' % self.url)
def response_received(self, **kwargs):
if self.is_first_response:
self.is_first_response = False
print('Response from %s' % self.url)
print('Prettify JSON headers and write to file.')
headersFile = '%s/headers.json' % self.url
if not os.path.exists(os.path.dirname(headersFile)):
try:
os.makedirs(os.path.dirname(headersFile))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
with open(headersFile, 'w') as h:
h.write(json.dumps(kwargs.get('response').get('headers'), indent=2))
def detached(**kwargs):
print('Detached')
# Close all tabs utility
def close_all_tabs(browser):
if len(browser.list_tab()) == 0:
return
for tab in browser.list_tab():
try:
tab.stop()
except pychrome.RuntimeException:
pass
browser.close_tab(tab)
time.sleep(1)
assert len(browser.list_tab()) == 0
def main():
chromeArguments = ['/usr/bin/google-chrome', '--headless', '--hide-scrollbars', '--disable-gpu',
'--remote-debugging-port=9222']
# Find Chrome utility function.
def find_chrome():
for process in psutil.process_iter():
if process.name() == 'chrome' and chromeArguments == process.cmdline():
return process
return False
# Chrome
if find_chrome():
print('A Google Chrome process already exists with the arguments we need... we\'ll use that.')
else:
print('Starting Chrome.')
devnull = open(os.devnull, 'wb')
Popen(chromeArguments,
shell=False,
stdout=PIPE,
stderr=devnull)
# We have to block for 1s to prevent a race condition.
time.sleep(1)
print('Chrome is running... Let\'s interact with in through the chrome dev tools protocol.')
browser = pychrome.Browser()
close_all_tabs(browser)
tabs = []
for i in range(len(args.urls)):
tabs.append(browser.new_tab())
for i, tab in enumerate(tabs):
eh = Handler(browser, tab)
tab.Page.frameStartedLoading = eh.frame_started_loading
tab.Page.frameStoppedLoading = eh.frame_stopped_loading
tab.Network.requestWillBeSent = eh.request_will_be_sent
tab.Network.responseReceived = eh.response_received
tab.Inspector.detached = eh.detached
tab.Network.enable()
tab.Page.stopLoading()
tab.Page.enable()
tab.Page.navigate(url=args.urls[i])
for i, tab in enumerate(tabs):
success = tab.wait(args.timeout)
if not success:
print('Timeout.')
print('Close Tab.')
browser.close_tab(tab)
if args.force_kill:
print('Force Kill Chrome.')
find_chrome().kill()
print('Complete.')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment