Skip to content

Instantly share code, notes, and snippets.

@en0ne
Created September 6, 2017 22:04
Show Gist options
  • Select an option

  • Save en0ne/baab732e2e26714342f867b9d001336a to your computer and use it in GitHub Desktop.

Select an option

Save en0ne/baab732e2e26714342f867b9d001336a to your computer and use it in GitHub Desktop.
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from collections import namedtuple
import logging
import queue
import threading
import signal
import argparse
logger = logging.getLogger(__name__)
DOWNLOADABLE_CONTENT_TYPES = ['application/javascript', 'text/html', 'text/css']
PARSABLE_CONTENT_TYPES = ['text/html']
QUEUE_GET_TIMEOUT = 10
REQUEST_TIMEOUT = (5, 5)
DownloadTask = namedtuple('DownloadTask', 'url')
ExploreTask = namedtuple('ExploreTask', 'url body parsable err')
class Spider:
def __init__(self):
self.terminate_flag = threading.Event()
self.download_queue = queue.Queue()
self.explore_queue = queue.Queue()
self.visited_links = []
self.remaining_links = []
def terminate(self):
self.terminate_flag.set()
def _download(self, task):
result = ExploreTask(url=task.url, body=None, parsable=False, err=True)
try:
resp = requests.get(task.url, timeout=REQUEST_TIMEOUT)
if resp.status_code == 200:
content_types = [x.strip() for x in resp.headers['content-type'].split(';')]
parsable = bool(set(PARSABLE_CONTENT_TYPES).intersection(content_types))
result = ExploreTask(url=task.url, body=resp.text, parsable=parsable, err=False)
# downloadable = bool(set(DOWNLOADABLE_CONTENT_TYPES).intersection(content_types))
# download to fs
except requests.RequestException as e:
logger.exception(e)
finally:
print(task.url)
self.explore_queue.put(result)
def _download_worker(self):
while not self.terminate_flag.is_set():
try:
task = self.download_queue.get(timeout=QUEUE_GET_TIMEOUT)
self._download(task)
except queue.Empty:
continue
def _parse_links(self, task):
soup = BeautifulSoup(task.body, 'html.parser')
for tag in soup.find_all(['a', 'link', 'script']):
if 'href' in tag.attrs and tag['href'] != '' and tag['href'][0] != '#':
link = urljoin(task.url, tag['href'])
elif 'src' in tag.attrs and tag['src'] != '':
link = urljoin(task.url, tag['src'])
else:
continue
if link in self.visited_links:
continue
if link in self.remaining_links:
continue
if urlparse(link).netloc != urlparse(task.url).netloc:
continue
self.remaining_links.append(link)
self.download_queue.put(DownloadTask(url=link))
def _explore(self, task):
self.remaining_links = list(set(self.remaining_links) - {task.url})
self.visited_links.append(task.url)
if not task.parsable:
return
self._parse_links(task)
def _explore_worker(self):
while True:
task = self.explore_queue.get()
self._explore(task)
if self.terminate_flag.is_set() or len(self.remaining_links) == 0:
break
self.terminate()
def run(self, initial_url, download_workers=20):
self.download_queue.put(DownloadTask(url=initial_url))
explorer = threading.Thread(target=self._explore_worker)
explorer.start()
for _ in range(download_workers):
threading.Thread(target=self._download_worker).start()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Simple Web Spider')
parser.add_argument('url', metavar='url', type=str, help='Initial url')
parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.0')
args = parser.parse_args()
if 'url' not in args:
parser.print_help()
exit(0)
spider = Spider()
spider.run(args.url)
def handler(signum, frame):
print('gracefull stopping')
spider.terminate()
signal.signal(signal.SIGINT, handler)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment