Skip to content

Instantly share code, notes, and snippets.

@edxmorgan
Last active December 19, 2023 04:37
Show Gist options
  • Select an option

  • Save edxmorgan/881d3a2f2df1ceaeaf3a1041859b2972 to your computer and use it in GitHub Desktop.

Select an option

Save edxmorgan/881d3a2f2df1ceaeaf3a1041859b2972 to your computer and use it in GitHub Desktop.

Revisions

  1. edxmorgan renamed this gist Jun 30, 2021. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. edxmorgan renamed this gist Jun 30, 2021. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  3. edxmorgan created this gist Jun 30, 2021.
    385 changes: 385 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,385 @@
    # Define here the models for your spider middleware
    #
    # See documentation in:
    # https://docs.scrapy.org/en/latest/topics/spider-middleware.html
    import sys
    import time
    import logging
    from scrapy import signals
    from scrapy.mail import MailSender
    from scrapy.utils.project import get_project_settings
    # useful for handling different item types with a single interface
    from itemadapter import is_item, ItemAdapter
    from scrapy.http import HtmlResponse
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    from selenium.common.exceptions import TimeoutException
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.by import By
    from shutil import which
    import undetected_chromedriver as uc
    if not (sys.platform == "linux" or sys.platform == "linux2"):
    uc.TARGET_VERSION = 90
    from datetime import datetime
    from selenium.webdriver.common.keys import Keys
    from pyvirtualdisplay import Display
    from scrapy.downloadermiddlewares.retry import RetryMiddleware
    from scrapy.utils.response import response_status_message
    import os
    import codecs
    from selenium.webdriver.common.action_chains import ActionChains

    settings = get_project_settings()

    class CouponsRetryMiddleware(RetryMiddleware):

    def process_response(self, request, response, spider):
    if request.meta.get('dont_retry', False):
    return response
    if response.status in self.retry_http_codes:
    reason = response_status_message(response.status)
    return self._retry(request, reason, spider) or response
    if (response.status == 200) and (request.meta.get('myoferToken')) and (not any(item for item in response.meta["cookieJar"] if item["name"] == "token")):
    reason = "Missing token cookie"
    spider.logger.info('Spider %s retrying' % reason)
    return self._retry(request,reason, spider) or response
    return response

    class CouponsSpiderMiddleware:
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the spider middleware does not modify the
    # passed objects.

    @classmethod
    def from_crawler(cls, crawler):
    # This method is used by Scrapy to create your spiders.
    s = cls()
    crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
    return s

    def process_spider_input(self, response, spider):
    # Called for each response that goes through the spider
    # middleware and into the spider.

    # Should return None or raise an exception.
    return None

    def process_spider_output(self, response, result, spider):
    # Called with the results returned from the Spider, after
    # it has processed the response.

    # Must return an iterable of Request, or item objects.
    for i in result:
    yield i

    def process_spider_exception(self, response, exception, spider):
    # Called when a spider or process_spider_input() method
    # (from other spider middleware) raises an exception.

    # Should return either None or an iterable of Request or item objects.
    pass

    def process_start_requests(self, start_requests, spider):
    # Called with the start requests of the spider, and works
    # similarly to the process_spider_output() method, except
    # that it doesn’t have a response associated.

    # Must return only requests (not items).
    for r in start_requests:
    yield r

    def spider_opened(self, spider):
    spider.logger.info('Spider opened: %s' % spider.name)


    class CouponsDownloaderMiddleware:
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the downloader middleware does not modify the
    # passed objects.

    def __init__(self):
    mailfrom=settings.get("MAIL_ADDRESS")
    smtpport=settings.get("MAIL_PORT")
    smtpuser=settings.get("MAIL_USER")
    smtppass=settings.get("MAIL_PASSWORD")
    smtphost=settings.get("SMTP_HOST")

    self.mailer = MailSender(mailfrom=mailfrom,smtphost=smtphost,
    smtpport=smtpport,smtpuser=smtpuser,smtppass=smtppass)


    @classmethod
    def from_crawler(cls, crawler):
    # This method is used by Scrapy to create your spiders.
    s = cls()
    s.cookie = ""
    if sys.platform == "linux" or sys.platform == "linux2":
    s.display = Display(visible=0, size=(800, 600))
    s.display.start()
    logging.info("Virtual Display Initiated")
    chrome_options = Options()
    if crawler.spider.undetectable:
    s.driver = uc.Chrome()
    if crawler.spider.proxy:
    proxyauth_plugin_path = s.create_proxyauth_extension(
    proxy_host=crawler.settings.get('SELENIUM_PROXY_HOST'),
    proxy_port=crawler.settings.get('SELENIUM_PROXY_PORT'),
    proxy_username=f"lum-customer-{s.user}-ip-{s.ip}-zone-{s.zone}",
    proxy_password=crawler.settings.get('SELENIUM_PROXY_PASSWORD'),
    scheme='http')

    options = uc.ChromeOptions()
    options.add_extension(proxyauth_plugin_path)
    s.driver = uc.Chrome(options=options)
    else:
    # driver_location = "/usr/bin/chromedriver"
    driver_location = which('chromedriver')
    # binary_location = "/usr/bin/google-chrome"
    userAgent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.56 Safari/537.36"
    # chrome_options.binary_location = binary_location
    chrome_options.add_argument(f'user-agent={userAgent}')
    chrome_options.add_argument("--ignore-certificate-errors")
    chrome_options.add_argument("--ignore-ssl-errors")
    chrome_options.add_argument("--headless" )
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    s.driver = webdriver.Chrome(executable_path=driver_location,chrome_options=chrome_options) # your chosen driver
    crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
    crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
    return s

    def create_proxyauth_extension(self,
    proxy_host,
    proxy_port,
    proxy_username,
    proxy_password,
    scheme='http',
    plugin_path=None):
    """Proxy Auth Extension
    args:
    proxy_host (str): domain or ip address, ie proxy.domain.com
    proxy_port (int): port
    proxy_username (str): auth username
    proxy_password (str): auth password
    kwargs:
    scheme (str): proxy scheme, default http
    plugin_path (str): absolute path of the extension

    return str -> plugin_path
    """
    if plugin_path is None:
    file = './chrome_proxy_helper'
    if not os.path.exists(file):
    os.mkdir(file)
    plugin_path = file + '/%s_%s@%s_%s.zip' % (
    proxy_username, proxy_password, proxy_host, proxy_port)

    manifest_json = """
    {
    "version": "1.0.0",
    "manifest_version": 2,
    "name": "Chrome Proxy",
    "permissions": [
    "proxy",
    "tabs",
    "unlimitedStorage",
    "storage",
    "<all_urls>",
    "webRequest",
    "webRequestBlocking"
    ],
    "background": {
    "scripts": ["background.js"]
    },
    "minimum_chrome_version":"22.0.0"
    }
    """
    background_js = string.Template("""
    var config = {
    mode: "fixed_servers",
    rules: {
    singleProxy: {
    scheme: "${scheme}",
    host: "${host}",
    port: parseInt(${port})
    },
    bypassList: ["foobar.com"]
    }
    };

    chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});

    function callbackFn(details) {
    return {
    authCredentials: {
    username: "${username}",
    password: "${password}"
    }
    };
    }

    chrome.webRequest.onAuthRequired.addListener(
    callbackFn,
    {urls: ["<all_urls>"]},
    ['blocking']
    );
    """).substitute(
    host=proxy_host,
    port=proxy_port,
    username=proxy_username,
    password=proxy_password,
    scheme=scheme,
    )
    with zipfile.ZipFile(plugin_path, 'w') as zp:
    zp.writestr("manifest.json", manifest_json)
    zp.writestr("background.js", background_js)

    return plugin_path

    def popElement(self,interactElement):
    try:
    element = WebDriverWait(self.driver, 10).until(
    EC.presence_of_element_located((By.ID, interactElement)))
    self.driver.execute_script("arguments[0].click();", element)
    except Exception as ex:
    logging.error(ex)
    self.driver.save_screenshot(f"{settings.get('SCREENSHOTS_PATH')}{interactElement}_click_error.png")
    n = os.path.join(settings.get('SCREENSHOTS_PATH'), f"{interactElement}_PageSave.html")
    f = codecs.open(n, "w", "utf-8")
    h = self.driver.page_source
    f.write(h)


    def xpath_pop_element(self, sel):
    try:
    element = WebDriverWait(self.driver, 10).until(
    EC.presence_of_element_located((By.XPATH, sel)))
    self.driver.execute_script("arguments[0].click();", element)
    except Exception as ex:
    logging.error(ex)
    self.driver.save_screenshot(f"{settings.get('SCREENSHOTS_PATH')}{interactElement}_click_error.png")
    n = os.path.join(settings.get('SCREENSHOTS_PATH'), f"{interactElement}_PageSave.html")
    f = codecs.open(n, "w", "utf-8")
    h = self.driver.page_source
    f.write(h)




    def selenium_login(self,usrEId,pwdEId,username,password,spider):
    try:
    usrElement = WebDriverWait(self.driver, 10).until(
    EC.presence_of_element_located((By.ID, usrEId)))
    usrElement.send_keys(username)
    if spider.name == 'ashmoret':
    element = WebDriverWait(self.driver, 10).until(
    EC.presence_of_element_located((By.XPATH, '//*[@id="f_login"]/div[4]/input')))
    self.driver.execute_script("arguments[0].click();", element)

    self.driver.find_element_by_id(pwdEId).send_keys(password,Keys.ENTER)

    self.cookie = self.driver.get_cookies()

    except TimeoutException as timeex:
    logging.error(timeex)
    except NoSuchElementException as noElementex:
    logging.error(noElementex)

    def process_request(self, request, spider):
    # only process tagged request or delete this if you want all
    if not (request.meta.get('selenium') or spider.undetectable):
    return
    if (not request.meta.get('login')) and (spider.name == 'hvr'):
    for k in self.cookie:
    self.driver.add_cookie(k)
    self.driver.get(request.url)
    if request.meta.get('scroll'):
    self.scroll()
    if spider.wait:
    try:
    elementId = spider.elementId
    element_present = EC.presence_of_element_located((By.ID, elementId))

    if request.meta.get('elementId'):
    elementId = request.meta.get('elementId')
    element_present = EC.presence_of_element_located((By.ID, elementId))


    if request.meta.get('elementClass'):
    elementId = request.meta.get('elementClass')
    element_present = EC.presence_of_element_located((By.CLASS_NAME,elementId))


    WebDriverWait(self.driver, 2).until(element_present)
    except TimeoutException:
    spider.logger.error('Spider %s took too long to load' % spider.name)
    return
    if request.meta.get('interactElement'):
    self.popElement(request.meta.get('interactElement'))
    if request.meta.get("interact-xpath"):
    self.xpath_pop_element(request.meta.get("interact-xpath"))
    if request.meta.get('login'):
    self.selenium_login(spider.usrEId,spider.pwdEId,spider.username,spider.password,spider)
    body = self.driver.page_source
    url = request.url
    response = HtmlResponse(url, body=body, encoding='utf-8', request=request)
    response.meta['cookieJar'] = self.driver.get_cookies()
    if request.meta.get("script"):
    response.meta['script_response'] = self.driver.execute_script(request.meta.get("script"))
    return response

    def scroll(self):
    SCROLL_PAUSE_TIME = 2

    # Get scroll height
    last_height = self.driver.execute_script("return document.body.scrollHeight")

    main_scroll_count = 0
    while True:
    # Scroll down to bottom
    self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

    # Wait to load page
    time.sleep(SCROLL_PAUSE_TIME)
    # Calculate new scroll height and compare with last scroll height
    new_height = self.driver.execute_script("return document.body.scrollHeight")
    main_scroll_count = main_scroll_count + 1
    if new_height == last_height:
    break
    last_height = new_height

    def process_response(self, request, response, spider):
    # Called with the response returned from the downloader.

    # Must either;
    # - return a Response object
    # - return a Request object
    # - or raise IgnoreRequest
    return response

    def process_exception(self, request, exception, spider):
    # Called when a download handler or a process_request()
    # (from other downloader middleware) raises an exception.

    # Must either:
    # - return None: continue processing this exception
    # - return a Response object: stops process_exception() chain
    # - return a Request object: stops process_exception() chain
    pass

    def spider_opened(self, spider):
    spider.logger.info('Spider opened: %s' % spider.name)
    return self.mailer.send(to=settings.get("EMAIL_LIST"), cc=settings.get("CC_LIST"), subject=f"IGROUP Coupon Scraping - Spider {spider.name} status",body=f"Spider {spider.name} started at {datetime.now().strftime('%m/%d/%Y, %H:%M:%S')}")

    def spider_closed(self, spider):
    spider.logger.info('Spider closed: %s' % spider.name)
    if self.driver:
    self.driver.close()
    self.driver = None
    if sys.platform == "linux" or sys.platform == "linux2":
    self.display.stop()
    spider.logger.info("Virtual Display killed")
    return self.mailer.send(to=settings.get("EMAIL_LIST"), cc=settings.get("CC_LIST"), subject=f"IGROUP Coupon Scraping - Spider {spider.name} status",body=f"Spider {spider.name} closed at {datetime.now().strftime('%m/%d/%Y, %H:%M:%S')}")