import boto3 from elasticsearch import Elasticsearch import sys import getopt import configparser import os os.environ['AWS_PROFILE'] = "default" os.environ['AWS_DEFAULT_REGION'] = "us-east-2" # Call with arguments -u -s def main(argv): opts, args = getopt.getopt(argv, "u:s:") user = "_" search = "_" for opt, arg in opts: if opt == '-u': user = arg if opt == '-s': search = arg search_with_personalization(user, search) def search_with_personalization(user, search): config = configparser.ConfigParser() config.read('config.conf') categories = get_category_recommendations(config, user) ranked_categories = get_category_ranking(config, user, categories) products = get_product_recommendations(config, user) ranked_products = get_product_ranking(config, user, products) query_es(search, ranked_categories, ranked_products) def _log_info(msg): print(msg) def get_product_recommendations(config, user): _log_info('Retrieving product recommendations') return _get_recommendations(user, config['DEFAULT']['product_recommendations_campaignArn']) def get_category_recommendations(config, user): _log_info('Retrieving category (category) recommendations') return _get_recommendations(user, config['DEFAULT']['category_recommendations_campaignArn']) def _get_recommendations(user, campaign): personalize = boto3.client('personalize-runtime', 'us-east-2') response = personalize.get_recommendations(campaignArn=campaign, userId=user) answer = [] _log_info('Recommended items:') for item in response['itemList']: item_id = item['itemId'] answer.append(item_id) _log_info('itemId:' + item_id) return answer def get_product_ranking(config, user, input_list): _log_info('Retrieving product ranking') answer = _get_ranking(config['DEFAULT']['product_rankings_campaignArn'], user, input_list, float(config['DEFAULT']['product_ranking_start']), float(config['DEFAULT']['product_ranking_steps_down'])) return answer def get_category_ranking(config, user, input_list): _log_info('Retrieving category (category) ranking') answer = _get_ranking(config['DEFAULT']['category_rankings_campaignArn'], user, input_list, float(config['DEFAULT']['cat_ranking_start']), int(config['DEFAULT']['cat_ranking_steps_down'])) return answer def _get_ranking(campaign, user, input_list, start, steps_down): personalize = boto3.client('personalize-runtime', 'us-east-2') response = personalize.get_personalized_ranking(campaignArn=campaign, userId=user, inputList=input_list) answer = {} i = 0 _log_info('Ranked items:') for item in response['personalizedRanking']: k = start - (i*steps_down) answer[item['itemId']] = k _log_info('Ranking ' + item['itemId'] + ' to ' + str(k)) i = i+1 return answer def transform_category_boost(category_boost_pairs): root = [] for key in category_boost_pairs.keys(): root.append({"match": { "category": { "query": key, "boost": category_boost_pairs[key] } }}) return root def transform_product_id_weights(product_id_weight_pairs): root = [] for key in product_id_weight_pairs.keys(): root.append( { "filter": { "ids": { "values": [ key ] } }, "weight": product_id_weight_pairs[key] } ) return root def arrange_json_array(a, b): a.append(b) return a def query_es(text_search, category_boost_pairs, product_id_weight_pairs): client = Elasticsearch() the_body = { "query": { "function_score": { "query": { "bool": { "should": arrange_json_array( transform_category_boost(category_boost_pairs), { "match": { "keywords": text_search } }) } }, "boost": "5", "functions": transform_product_id_weights(product_id_weight_pairs), "score_mode": "max", "boost_mode": "multiply" } } } response = client.search( index="bank", body=the_body ) if response is None or response['hits'] is None or response['hits']['hits'] is None\ or len(response['hits']['hits']) == 0: _log_info('No search results.') else: _log_info('There are ' + str(len(response['hits']['hits'])) + ' search results.') i = 1 for hit in response['hits']['hits']: _log_info('#' + str(i) + ': Score:' + str(hit['_score']) + ', search result:' + str(hit['_source'])) i = i + 1 if __name__ == "__main__": main(sys.argv[1:])