import concurrent.futures import json import logging import requests # AWS pricing API Endpoint URL = "https://a0.p.awsstatic.com/pricing/1.0/ec2/region/{region}/reserved-instance/linux/index.json" class AWSService: # Retrieving EC2 instance prices from AWS EC2 Pricing API @classmethod def get_ec2_prices(cls, **kwargs): prices_list = [] try: response = requests.get(URL.format(region=kwargs["region"])) try: pricing_data = response.json() prices = pricing_data["prices"] for price in prices: if cls.is_valid_input(price["attributes"], **kwargs): prices_list.append(price) except Exception as exc: cls.exception_handler(exc) except Exception as exc: cls.exception_handler(exc) return prices_list @staticmethod def is_valid_input(base_argument, **kwargs): original_values = { "term_offering_class": base_argument["aws:offerTermOfferingClass"], "term_length": base_argument["aws:offerTermLeaseLength"], "payment_option": base_argument["aws:offerTermPurchaseOption"], "operating_system": base_argument["aws:ec2:operatingSystem"], "tenancy": base_argument["aws:ec2:tenancy"], } user_values = { "term_offering_class": kwargs["term_offering_class"], "term_length": kwargs["term_length"], "payment_option": kwargs["payment_option"], "operating_system": kwargs["operating_system"], "tenancy": kwargs["tenancy"], } for key in original_values.keys(): if original_values.get(key) != user_values.get(key): return False return True @classmethod def save_prices_in_file(cls, prices_list, file_id): prices = json.dumps(prices_list) try: with open(f"./prices/ec2_price_{file_id}.json", "w") as prices_file: prices_file.write(prices) except Exception as exc: cls.exception_handler(exc) @staticmethod def exception_handler(exception): # Store possible exceptions in a map of key as exception class and value as a message exception_map = { json.decoder.JSONDecodeError: "JSON Decode Error", KeyError: "Key not found in JSON response", requests.exceptions.InvalidSchema: "Invalid URL. No connection adapters found", requests.exceptions.HTTPError: "HTTP error", requests.exceptions.ConnectionError: "Failed to establish a new connection", requests.exceptions.RequestException: "Request Exception", FileNotFoundError: "The prices directory is not present", } message = exception_map.get(exception.__class__) if message is None: message = exception logging.exception(message, exc_info=False) # Running get_ec2_prices and save_prices_in_file function using this function # This will enable us to run both functions in a single thread as both functions are dependent on each other # This will create new file for every input using file_id @staticmethod def get_and_save_ec2_prices(*args): list_of_args = list(args) file_id = list_of_args[0] reg = list_of_args[1] term_offer_cls = list_of_args[2] term_len = list_of_args[3] payment_opt = list_of_args[4] prices_list = AWSService.get_ec2_prices( region=reg, term_offering_class=term_offer_cls, term_length=term_len, payment_option=payment_opt, operating_system="Linux", tenancy="Shared", ) # If prices_list is not empty then save the prices_list data into the file if len(prices_list) != 0: AWSService.save_prices_in_file(prices_list, file_id) # List of inputs # list_of_input[i] = ["id", "region", "term_offering_class", "term_length", "payment_option", "operating_system", "tenancy"] list_of_input = [ ["0", "ap-south-1", "standard", "1yr", "No Upfront", "Linux", "Shared"], ["1", "us-east-1", "convertible", "1yr", "No Upfront", "Linux", "Shared"], ] # Get prices for all inputs in parallel with concurrent.futures.ThreadPoolExecutor() as executor: for list_of_arguments in list_of_input: executor.submit(AWSService.get_and_save_ec2_prices, *list_of_arguments)