Last active
July 17, 2018 03:37
-
-
Save garyfeng/72545b10424acb8a081a07c58f46c07d to your computer and use it in GitHub Desktop.
Revisions
-
garyfeng revised this gist
Jul 17, 2018 . 2 changed files with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -474,7 +474,7 @@ def processBooklet_dask(filename, today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script # garyfeng: to resume from a run: # today = "20180709_21" #### grade = sys.argv[1] This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -472,7 +472,7 @@ def processBooklet_dask(filename, # paths today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script # today = "20180711_04" grade = sys.argv[1] inputCSVPath = "{}/".format(grade) -
garyfeng created this gist
Jul 17, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,536 @@ # Functions for extracting the descriptive features for the 2017 operational analysis # Changes from make2017FinalFeatures.py: # - min ############# import StringIO import os import traceback from pdia.durSinceBlockStart import * from pdia.writing2017.addFeatureTextChange import addTextChangeVars from pdia.writing2017.addKeyPressVars import addKeyPressVars from pdia.writing2017.burstFeatures import * from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures from pdia.writing2017.featuresConfig2017 import featureConfig2017 from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress from pdia.writing2017.getData import getData from pdia.writing2017.reducePauseFeatures import reducePauseFeatures from pdia.writing2017.addWordTokens import * # Configs # parser config from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures # 2016 default configuration # 2017 data config # Read data def getData2017(filename, featureConfig=featureConfig2017): """ Simply a wrap of getData with the 2017 config :param filename: the file name to process :param featureConfig: using the 2017 configuration :return: the parsed df """ return getData(filename, featureConfig=featureConfig) def mapStep(df, feaConfig, verbose=False): """ MAP step: creating keystroke level features, adding columns :param df: the data frame for a booklet, contain potentially multiple blocks :param feaConfig: the configuration for data import/parsing :param verbose: if True, saves the interim data """ # asserts if df is None: logger.error("MapStep: input df is None; quitting") return None if not any([(k in df.columns) for k in feaConfig["byVars"]]): # keyword missing return None studentID = df["BookletNumber"].unique()[0] # ##### MAP #### # to handle the feature functions in the featureMap object # ############## def mapBlock(d): # return None if no keystroke log is available if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0: # print("mapBlock: No Observable data for the block") logger.debug("mapBlock: No Observable data for the block") return None d = durSinceBlockStart(d) if d is not None else None #d = addKeyPressVars(d) if d is not None else None #d = addTextChangeVars(d) if d is not None else None d = addFeatureIKI(d) if d is not None else None d = addWordTokens(d) if d is not None else None # garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5 d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None return d try: # the following groupby().apply() is causing occasional python crashes # df = df \ # .groupby(feaConfig["byVars"]) \ # .apply(mapBlock) # taking a stupid method here tmp=[] for b in df["BlockCode"].unique(): tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock)) df = pd.concat(tmp) except Exception as e: logger.error("Error in mapStep") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8") return # saving if verbose: outputFileName = "{}_mapStep.csv".format( df["BookletNumber"].unique()[0] ) # remove df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8") # simplified for human reading outputFileName = "{}_mapStep_simplified.csv".format( df["BookletNumber"].unique()[0] ) rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"]) df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len() colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount', 'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI', 'reconCursorPosition', 'textLength', "textLenReconText", 'textContext', 'intendedWord', 'currentToken', # 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial', 'intraWord', 'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace', 'reconstructedText'] # to get rid of duplicated columns, remove the multiple index first df.loc[rowsToKeep, colsToKeep]\ .to_csv(outputFileName, encoding="utf-8") return df # note we are not catching exceptions here, to save time. # errors are caught at the highest level def reduceStep(df, feaConfig, verbose=False): """ REDUCE step: taking the df after the MAP step, and reduce to features, one block a row. :param df: the df passed from the mapStep :param feaConfig: the configuration file with parameters setting the byVars :param verbose: to be passed to reduce functions to save interim data frame if True :return: a Pandas data frame, with # of rows as blocks, and features as columns """ # asserts if df is None: logger.error("ReduceStep: input df is None; quitting") return None if not any([(k in df.columns) for k in feaConfig["byVars"]]): # keyword missing return None studentID = df["BookletNumber"].unique()[0] # #### Reduce #### # here we begin to summarize the feature columns # ################ # This is obviously a waste of time to repeat some feature steps in these # will deal with this later. For now, this is pleasing to the eyes try: dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceFeatureInitialKeypress(d, verbose=verbose) ).reset_index() #print dfFeaInitialKeypress except Exception as e: logger.error("Error in reduceStep: reduceFeatureInitialKeypress") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceWordBasedFeatures(d, verbose=verbose) ).reset_index() #print dfFeaWordBased except Exception as e: logger.error("Error in reduceStep: reduceWordBasedFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaPauses = df.groupby(feaConfig["byVars"]).apply( lambda d: reducePauseFeatures(d, verbose=verbose) ).reset_index() #print dfFeaPauses except Exception as e: logger.error("Error in reduceStep: reducePauseFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: # garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions dfFeaDelete = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1) ).reset_index() #print dfFeaDelete except Exception as e: logger.error("Error in reduceStep: reduceDeleteFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaEditing = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceEditingFeatures(d, verbose=verbose) ).reset_index() #print dfFeaEditing except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply( lambda d: d\ .loc[d.reconstructedText.notnull()]\ .reconstructedText.iloc[-1].count("`") ).rename("flagDiscrepancyMarkers").reset_index() except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: adminEventList = feaConfig['adminEventList'] nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply( lambda d: d\ .loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \ .shape[0] ).rename("flagAdminRaiseHandEvents").reset_index() except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased, dfFeaPauses, dfFeaDelete, dfFeaEditing, nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1) except Exception as e: logger.error("Error in reduceStep: merging all features") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return return dfSummary def processBooklet(filename, featureConfig, verbose=False, outputFeaturePath = ".", featureSetName = "finalFeatures", ): """ Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving. :param filename: full path to the CSV file :param featureConfig: the dict with config info :param verbose: if true, save intermediate data frames to the current directory :param outputFeaturePath: output path :param featureSetName: name of the final feature set; will be the last part of the output csv file name :return: none """ # output file path and name outputFeatureFileName = os.path.join(outputFeaturePath, os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") # debug logger.info("Processing %s", filename) ############# # Get Data try: df = getData(filename, featureConfig=featureConfig) except: df = None if df is None: logger.error("processBooklet: getData failed for %s", filename) return studentID = df["BookletNumber"].unique()[0] ############# # Map #logger.info("Map %s", filename) try: df = mapStep(df, verbose=verbose, feaConfig=featureConfig) except Exception as e: logger.error("Error in mapStep: %s", filename) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return if df is None: logger.error("processBooklet: mapStep failed for %s", filename) return ############# # Reduce #logger.info("Reduce %s", filename) try: df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) except Exception as e: logger.error("Error in reduceStep: %s", filename) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return if df is None: logger.error("processBooklet: reduceStep failed for %s", filename) return ############# # Save Data # debug logger.info("Saving %s", filename) try: # first drop duplicated rows (occasionally there will be) # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force df \ .loc[:, ~df.columns.duplicated()]\ .drop_duplicates() \ .to_csv(outputFeatureFileName, encoding='utf-8') except Exception as e: logger.error("Error writing to_csv: %s", outputFeatureFileName) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return logger.info("Done. Output= %s", outputFeatureFileName) return def processBooklet_dask(filename, featureConfig, verbose=False, outputFeaturePath = ".", featureSetName = "finalFeatures"): """ processing a writing CSV file, for dask parallel processing. We remove any logger reference here. :param filename: full path to the CSV file :param featureConfig: the dict with config info :param verbose: if true, save intermediate data frames to the current directory :param outputFeaturePath: output path :param featureSetName: name of the final feature set; will be the last part of the output csv file name :return: none """ # output file path and name outputFeatureFileName = os.path.join(outputFeaturePath, os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") ############# # Get Data try: df = getData(filename, featureConfig=featureConfig) except: return if df is None: logger.error("processBooklet: getData failed for %s", filename) return ############# # Map try: df = mapStep(df, verbose=verbose, feaConfig=featureConfig) except: return if df is None: #logger.error("processBooklet: mapStep failed for %s", filename) return ############# # Reduce try: df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) except: return if df is None: return ############# # Save Data try: # first drop duplicated rows (occasionally there will be) # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force df \ .loc[:, ~df.columns.duplicated()]\ .drop_duplicates() \ .to_csv(outputFeatureFileName, encoding='utf-8') except Exception as e: return return import sys if __name__ == '__main__': if len(sys.argv) == 1: print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" exit() if sys.argv[1] not in ["Grade4", "Grade8", "test"]: print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" exit() import glob from pdia import * from pdia.writing2017.make2017Features import * import dask.bag as db from distributed import Client import datetime import time # paths today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script # garyfeng: to resume from a run: today = "20180709_21" #### grade = sys.argv[1] inputCSVPath = "{}/".format(grade) outputFeaturePath = "{}_descFeatures_{}/".format(grade, today) if not os.path.exists(outputFeaturePath): os.makedirs(outputFeaturePath) featureSetName = "descFeature{}".format(today) print "input folder: {}".format(inputCSVPath) print "output folder: {}".format(outputFeaturePath) print "featureSetName: {}".format(featureSetName) ######### # getting the files to process print "======= Scanning for CSV files ============" print datetime.datetime.now() fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv")) if len(fileList)==0: print "\nNo CSV files found in the directory\n" exit() ########## # garyfeng: to resume by ignoring ones with output already. finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName))) finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles] fileList = list(set(fileList) - set(finishedFiles)) ########## print "Total input CSV files: %i" % len(fileList) print datetime.datetime.now() import gc def processIt(filename): processBooklet_dask(filename, featureConfig=featureConfig2017, verbose=False, outputFeaturePath=outputFeaturePath, featureSetName=featureSetName) gc.collect() return print "======= Begin Processing ============" print datetime.datetime.now() print "=====================================" # test with 1 file # processFile(fileList[0]) # Using distributed clients client = Client() # run parallel with dask db.from_sequence(fileList).map(processIt).compute() print "======== End Processing ===========" print datetime.datetime.now() print "==================================="
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,540 @@ # Functions for extracting the descriptive features for the 2017 operational analysis # Changes from make2017FinalFeatures.py: # - min ############# import StringIO import os import traceback from pdia.durSinceBlockStart import * from pdia.writing2017.addFeatureTextChange import addTextChangeVars from pdia.writing2017.addKeyPressVars import addKeyPressVars from pdia.writing2017.burstFeatures import * from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures from pdia.writing2017.featuresConfig2017 import featureConfig2017 from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress from pdia.writing2017.getData import getData from pdia.writing2017.reducePauseFeatures import reducePauseFeatures from pdia.writing2017.addWordTokens import * # Configs # parser config from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures # 2016 default configuration # 2017 data config # Read data def getData2017(filename, featureConfig=featureConfig2017): """ Simply a wrap of getData with the 2017 config :param filename: the file name to process :param featureConfig: using the 2017 configuration :return: the parsed df """ return getData(filename, featureConfig=featureConfig) def mapStep(df, feaConfig, verbose=False): """ MAP step: creating keystroke level features, adding columns :param df: the data frame for a booklet, contain potentially multiple blocks :param feaConfig: the configuration for data import/parsing :param verbose: if True, saves the interim data """ # asserts if df is None: logger.error("MapStep: input df is None; quitting") return None if not any([(k in df.columns) for k in feaConfig["byVars"]]): # keyword missing return None studentID = df["BookletNumber"].unique()[0] # ##### MAP #### # to handle the feature functions in the featureMap object # ############## def mapBlock(d): # return None if no keystroke log is available if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0: # print("mapBlock: No Observable data for the block") logger.debug("mapBlock: No Observable data for the block") return None d = durSinceBlockStart(d) if d is not None else None #d = addKeyPressVars(d) if d is not None else None #d = addTextChangeVars(d) if d is not None else None d = addFeatureIKI(d) if d is not None else None d = addWordTokens(d) if d is not None else None # garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5 d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None return d try: # the following groupby().apply() is causing occasional python crashes # df = df \ # .groupby(feaConfig["byVars"]) \ # .apply(mapBlock) # taking a stupid method here tmp=[] for b in df["BlockCode"].unique(): tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock)) df = pd.concat(tmp) except Exception as e: logger.error("Error in mapStep") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8") return # saving if verbose: outputFileName = "{}_mapStep.csv".format( df["BookletNumber"].unique()[0] ) # remove df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8") # simplified for human reading outputFileName = "{}_mapStep_simplified.csv".format( df["BookletNumber"].unique()[0] ) rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"]) df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len() colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount', 'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI', 'reconCursorPosition', 'textLength', "textLenReconText", 'textContext', 'intendedWord', 'currentToken', # 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial', 'intraWord', 'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace', 'reconstructedText'] # to get rid of duplicated columns, remove the multiple index first df.loc[rowsToKeep, colsToKeep]\ .to_csv(outputFileName, encoding="utf-8") return df # note we are not catching exceptions here, to save time. # errors are caught at the highest level def reduceStep(df, feaConfig, verbose=False): """ REDUCE step: taking the df after the MAP step, and reduce to features, one block a row. :param df: the df passed from the mapStep :param feaConfig: the configuration file with parameters setting the byVars :param verbose: to be passed to reduce functions to save interim data frame if True :return: a Pandas data frame, with # of rows as blocks, and features as columns """ # asserts if df is None: logger.error("ReduceStep: input df is None; quitting") return None if not any([(k in df.columns) for k in feaConfig["byVars"]]): # keyword missing return None studentID = df["BookletNumber"].unique()[0] # #### Reduce #### # here we begin to summarize the feature columns # ################ # This is obviously a waste of time to repeat some feature steps in these # will deal with this later. For now, this is pleasing to the eyes try: dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceFeatureInitialKeypress(d, verbose=verbose) ).reset_index() #print dfFeaInitialKeypress except Exception as e: logger.error("Error in reduceStep: reduceFeatureInitialKeypress") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceWordBasedFeatures(d, verbose=verbose) ).reset_index() #print dfFeaWordBased except Exception as e: logger.error("Error in reduceStep: reduceWordBasedFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaPauses = df.groupby(feaConfig["byVars"]).apply( lambda d: reducePauseFeatures(d, verbose=verbose) ).reset_index() #print dfFeaPauses except Exception as e: logger.error("Error in reduceStep: reducePauseFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: # garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions dfFeaDelete = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1) ).reset_index() #print dfFeaDelete except Exception as e: logger.error("Error in reduceStep: reduceDeleteFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaEditing = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceEditingFeatures(d, verbose=verbose) ).reset_index() #print dfFeaEditing except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply( lambda d: d\ .loc[d.reconstructedText.notnull()]\ .reconstructedText.iloc[-1].count("`") ).rename("flagDiscrepancyMarkers").reset_index() except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: adminEventList = feaConfig['adminEventList'] nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply( lambda d: d\ .loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \ .shape[0] ).rename("flagAdminRaiseHandEvents").reset_index() except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased, dfFeaPauses, dfFeaDelete, dfFeaEditing, nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1) except Exception as e: logger.error("Error in reduceStep: merging all features") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return return dfSummary def processBooklet(filename, featureConfig, verbose=False, outputFeaturePath = ".", featureSetName = "finalFeatures", ): """ Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving. :param filename: full path to the CSV file :param featureConfig: the dict with config info :param verbose: if true, save intermediate data frames to the current directory :param outputFeaturePath: output path :param featureSetName: name of the final feature set; will be the last part of the output csv file name :return: none """ # output file path and name outputFeatureFileName = os.path.join(outputFeaturePath, os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") # debug logger.info("Processing %s", filename) ############# # Get Data try: df = getData(filename, featureConfig=featureConfig) except: df = None if df is None: logger.error("processBooklet: getData failed for %s", filename) return studentID = df["BookletNumber"].unique()[0] ############# # Map #logger.info("Map %s", filename) try: df = mapStep(df, verbose=verbose, feaConfig=featureConfig) except Exception as e: logger.error("Error in mapStep: %s", filename) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return if df is None: logger.error("processBooklet: mapStep failed for %s", filename) return ############# # Reduce #logger.info("Reduce %s", filename) try: df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) except Exception as e: logger.error("Error in reduceStep: %s", filename) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return if df is None: logger.error("processBooklet: reduceStep failed for %s", filename) return ############# # Save Data # debug logger.info("Saving %s", filename) try: # first drop duplicated rows (occasionally there will be) # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force df \ .loc[:, ~df.columns.duplicated()]\ .drop_duplicates() \ .to_csv(outputFeatureFileName, encoding='utf-8') except Exception as e: logger.error("Error writing to_csv: %s", outputFeatureFileName) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return logger.info("Done. Output= %s", outputFeatureFileName) return def processBooklet_dask(filename, featureConfig, verbose=False, outputFeaturePath = ".", featureSetName = "finalFeatures"): """ processing a writing CSV file, for dask parallel processing. We remove any logger reference here. :param filename: full path to the CSV file :param featureConfig: the dict with config info :param verbose: if true, save intermediate data frames to the current directory :param outputFeaturePath: output path :param featureSetName: name of the final feature set; will be the last part of the output csv file name :return: none """ # output file path and name outputFeatureFileName = os.path.join(outputFeaturePath, os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") ############# # Get Data try: df = getData(filename, featureConfig=featureConfig) except: return if df is None: logger.error("processBooklet: getData failed for %s", filename) return ############# # Map try: df = mapStep(df, verbose=verbose, feaConfig=featureConfig) except: return if df is None: #logger.error("processBooklet: mapStep failed for %s", filename) return ############# # Reduce try: df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) except: return if df is None: return ############# # Save Data try: # first drop duplicated rows (occasionally there will be) # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force df \ .loc[:, ~df.columns.duplicated()]\ .drop_duplicates() \ .to_csv(outputFeatureFileName, encoding='utf-8') except Exception as e: return return import sys if __name__ == '__main__': if len(sys.argv) == 1: print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" exit() if sys.argv[1] not in ["Grade4", "Grade8", "test"]: print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" exit() import glob from pdia import * from pdia.writing2017.make2017Features import * import dask.bag as db from distributed import Client import datetime import time # paths today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script today = "20180711_04" grade = sys.argv[1] inputCSVPath = "{}/".format(grade) outputFeaturePath = "{}_descFeatures_{}/".format(grade, today) if not os.path.exists(outputFeaturePath): os.makedirs(outputFeaturePath) featureSetName = "descFeature{}".format(today) print "input folder: {}".format(inputCSVPath) print "output folder: {}".format(outputFeaturePath) print "featureSetName: {}".format(featureSetName) ######### # getting the files to process print "======= Scanning for CSV files ============" print datetime.datetime.now() fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv")) if len(fileList)==0: print "\nNo CSV files found in the directory\n" exit() ########## # garyfeng: to resume by ignoring ones with output already. finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName))) finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles] fileList = list(set(fileList) - set(finishedFiles)) # error files should not be repeated finishedFiles = glob.glob("Error_*.csv") finishedFiles = ["{}/Writing_Grade8_{}_ObservableData.csv".format(grade, f.split("_")[1]) for f in finishedFiles] fileList = list(set(fileList) - set(finishedFiles)) ########## print "Total input CSV files: %i" % len(fileList) print datetime.datetime.now() import gc def processIt(filename): processBooklet_dask(filename, featureConfig=featureConfig2017, verbose=False, outputFeaturePath=outputFeaturePath, featureSetName=featureSetName) gc.collect() return print "======= Begin Processing ============" print datetime.datetime.now() print "=====================================" # test with 1 file # processFile(fileList[0]) # Using distributed clients client = Client() # run parallel with dask db.from_sequence(fileList).map(processIt).compute() print "======== End Processing ===========" print datetime.datetime.now() print "===================================" # To restart # until python run_grade8.py Grade8; do # echo "Program crashed with exit code $?. Respawning.." >&2 # sleep 1 # done This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,32 @@ SET NUM=0 :LOOP SET /A NUM = %NUM% + 1 FOR /f "usebackq" %%G in (`docker ps -q -f "name=pdj-%NUM%"`) DO set PDJID=%%G IF [%PDJID%] == [] GOTO QUERY SET PDJID= GOTO LOOP :QUERY set INPUT= set /P INPUT=Which version of pdia would you like to use? (1=master, 2=2018xval): %=% IF /I "%INPUT%"=="1" GOTO MASTER IF /I "%INPUT%"=="2" GOTO 2018XVAL GOTO QUERY :MASTER SET PDJTAG=master GOTO RUN :2018XVAL SET PDJTAG=2018xval GOTO RUN :RUN SET /A PORT = %NUM% + 8887 docker pull pdia/docked-jupyter:%PDJTAG% docker run -p %PORT%:8888 -h CONTAINER -d -it --rm --name pdj-%NUM%-%PDJTAG% -v "%cd%":/home/jovyan/work pdia/docked-jupyter:%PDJTAG% jupyter notebook --NotebookApp.token='pdia' timeout /t 5 start http://localhost:%PORT%/?token=pdia timeout /t -1 docker stop pdj-%NUM%-%PDJTAG%