Created
June 25, 2018 07:34
-
-
Save gcetusic/8aa3c613cfce098f2351dc28c9a916fd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import xmldataset | |
| import pandas as pd | |
| import argparse | |
| import time | |
| import re | |
| import os | |
| from datetime import datetime | |
| from multiprocessing import Pool | |
| from glob import glob | |
| def is_stealth_file(filename): | |
| # used to check if file should be processed | |
| # e.g. zade_30_cat_20180305_224136_stealth.xml is a stealth file | |
| if re.search('.+_stealth.*\.xml', filename) is None: | |
| return False | |
| return True | |
| def get_account_from_foldername(foldername): | |
| # extract the account from the folder name | |
| # e.g. 1009_outfit/ has account 1009 | |
| search = re.search('^(?P<account>\d+)_.*', foldername) | |
| if search is not None: | |
| return search.group('account') | |
| def get_channel_from_filename(filename): | |
| # extract the channel from the filename | |
| # e.g. zade_30_cat_20180305_224136.xml has channel 30 | |
| search = re.search('zade_(?P<channel>\d+)_cat_.*', filename) | |
| if search is not None: | |
| return search.group('channel') | |
| def get_output_filename(account, original_filename, timestring=None): | |
| # generates the filename of the output csv file | |
| # note that if timestring is not specified, the current time is used | |
| # e.g. <DATE 2018-03-05>_<ACCOUNT 1009>_<original filename>.csv | |
| if timestring is None: | |
| timestring = datetime.now().strftime('%Y-%m-%d') | |
| filename = original_filename.rsplit('.xml', 1)[0] | |
| return '{timestring}_{account}_{filename}.csv'.format( | |
| timestring=timestring, | |
| account=account, | |
| filename=filename) | |
| # input_file, output_file, account | |
| def write_file(args): | |
| (input_file, output_file, account, profile, channel) = args | |
| creation_profile = """ | |
| TBCATALOG | |
| creation = dataset:creation_info | |
| """ | |
| xml = open(input_file).read() | |
| creation = None | |
| result = xmldataset.parse_using_profile(xml, creation_profile) | |
| if 'creation_info' in result and len(result['creation_info']): | |
| creation = result['creation_info'][0]['creation'] | |
| result = xmldataset.parse_using_profile(xml, profile) | |
| if 'articles' in result: | |
| df = pd.DataFrame.from_records(result['articles']) | |
| df['CHANNEL'] = channel | |
| df['ACCOUNT'] = account | |
| df['CREATION'] = str(creation) | |
| df['FILENAME'] = input_file | |
| # Order the data so it's the same across all files | |
| df = df[['A_NR', 'A_ID', 'A_STOCK', 'A_VK', 'P_CATEGORY', 'CHANNEL', 'ACCOUNT', 'CREATION', 'FILENAME']] | |
| df.to_csv(output_file, index=False, encoding='utf-8') | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser(description='TB.Cat to CSV transformation.') | |
| parser.add_argument('--accounts', type=lambda accounts: accounts.split(','), help='list of accounts to process') | |
| parser.add_argument('--start', type=lambda start: datetime.strptime(start, '%Y-%m-%d'), help='start date from which to start processing') | |
| #parser.add_argument('--end', type=lambda end: datetime.strptime(end, '%Y-%m-%d'), help='start date from which to start processing') | |
| args = parser.parse_args() | |
| pool = Pool(processes=8) | |
| # TODO: make this dynamic in the script that iterates over files in folders | |
| root_folder = '/tsmall/devtest/devtest/tbone/clients' | |
| save_folders = glob(os.path.join(root_folder, '*/tbcat/out_save')) | |
| ################# | |
| start_date = args.start | |
| # profile for xmldataset library - should not be changed | |
| profile = """ | |
| TBCATALOG | |
| PRODUCTDATA | |
| PRODUCT | |
| P_CATEGORIES | |
| P_CATEGORY = external_dataset:product_information | |
| ARTICLEDATA | |
| ARTICLE | |
| A_NR = dataset:articles | |
| A_ID = dataset:articles | |
| A_STOCK = dataset:articles | |
| A_PRICEDATA | |
| A_PRICE | |
| A_VK = dataset:articles | |
| __EXTERNAL_VALUE__ = product_information:P_CATEGORY:articles | |
| """ | |
| start_time = time.time() | |
| nr_processed_files = 0 | |
| nr_processed_accounts = 0 | |
| print("Scanning for account files...") | |
| for save_folder in save_folders: | |
| account_foldername = save_folder.split(root_folder)[1].split('/tbcat/out_save')[0].strip('/') | |
| account = get_account_from_foldername(account_foldername) | |
| if account is None: | |
| continue | |
| if args.accounts and account not in args.accounts: | |
| continue | |
| print("Processing account {}".format(account)) | |
| account_start_time = time.time() | |
| nr_processed_accounts += 1 | |
| nr_processed_account_files = 0 | |
| year_folders = glob(os.path.join(save_folder, '*')) | |
| for year_folder in year_folders: | |
| year = year_folder.rsplit('/', 1)[1] | |
| if year < start_date.strftime('%Y'): | |
| continue | |
| month_folders = glob(os.path.join(year_folder, '*')) | |
| for month_folder in month_folders: | |
| processing_queue = [] | |
| month = month_folder.rsplit('/', 1)[1] | |
| if year == start_date.strftime('%Y') and month < start_date.strftime('%m'): | |
| continue | |
| day_folders = glob(os.path.join(month_folder, '*')) | |
| for day_folder in day_folders: | |
| day = day_folder.rsplit('/', 1)[1] | |
| timestring = '{year}-{month}-{day}'.format(year=year, month=month, day=day) | |
| if timestring >= start_date.strftime('%Y-%m-%d'): | |
| file_locations = glob(os.path.join(day_folder, '*')) | |
| for location in file_locations: | |
| original_filename = os.path.basename(location) | |
| output_filename = get_output_filename(account, original_filename, | |
| timestring=timestring) | |
| directory = '/tsmall/devtest/output-test/{year}-{month}-{day}'.format( | |
| year=year, month=month, day=day) | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| filename = os.path.basename(location) | |
| channel = get_channel_from_filename(filename) | |
| if not os.path.exists(os.path.join(directory, output_filename)): | |
| if (channel is not None) and (not is_stealth_file(filename)): | |
| processing_queue.append( | |
| (location, os.path.join(directory, output_filename), account, profile, channel)) | |
| res = pool.map(write_file, processing_queue) | |
| nr_processed_files += len(processing_queue) | |
| nr_processed_account_files += len(processing_queue) | |
| account_elapsed_time = time.time() - account_start_time | |
| print("Processed {} new files for account {} in {}".format( | |
| str(nr_processed_account_files), account, time.strftime("%H:%M:%S", time.gmtime(account_elapsed_time)))) | |
| elapsed_time = time.time() - start_time | |
| print("Finished in {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))) | |
| print("Processed {} new files (total)".format(str(nr_processed_files))) | |
| print("Processed {} accounts".format(str(nr_processed_accounts))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment