# Functions for extracting the descriptive features for the 2017 operational analysis # Changes from make2017FinalFeatures.py: # - min ############# import StringIO import os import traceback from pdia.durSinceBlockStart import * from pdia.writing2017.addFeatureTextChange import addTextChangeVars from pdia.writing2017.addKeyPressVars import addKeyPressVars from pdia.writing2017.burstFeatures import * from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures from pdia.writing2017.featuresConfig2017 import featureConfig2017 from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress from pdia.writing2017.getData import getData from pdia.writing2017.reducePauseFeatures import reducePauseFeatures from pdia.writing2017.addWordTokens import * # Configs # parser config from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures # 2016 default configuration # 2017 data config # Read data def getData2017(filename, featureConfig=featureConfig2017): """ Simply a wrap of getData with the 2017 config :param filename: the file name to process :param featureConfig: using the 2017 configuration :return: the parsed df """ return getData(filename, featureConfig=featureConfig) def mapStep(df, feaConfig, verbose=False): """ MAP step: creating keystroke level features, adding columns :param df: the data frame for a booklet, contain potentially multiple blocks :param feaConfig: the configuration for data import/parsing :param verbose: if True, saves the interim data """ # asserts if df is None: logger.error("MapStep: input df is None; quitting") return None if not any([(k in df.columns) for k in feaConfig["byVars"]]): # keyword missing return None studentID = df["BookletNumber"].unique()[0] # ##### MAP #### # to handle the feature functions in the featureMap object # ############## def mapBlock(d): # return None if no keystroke log is available if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0: # print("mapBlock: No Observable data for the block") logger.debug("mapBlock: No Observable data for the block") return None d = durSinceBlockStart(d) if d is not None else None #d = addKeyPressVars(d) if d is not None else None #d = addTextChangeVars(d) if d is not None else None d = addFeatureIKI(d) if d is not None else None d = addWordTokens(d) if d is not None else None # garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5 d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None return d try: # the following groupby().apply() is causing occasional python crashes # df = df \ # .groupby(feaConfig["byVars"]) \ # .apply(mapBlock) # taking a stupid method here tmp=[] for b in df["BlockCode"].unique(): tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock)) df = pd.concat(tmp) except Exception as e: logger.error("Error in mapStep") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8") return # saving if verbose: outputFileName = "{}_mapStep.csv".format( df["BookletNumber"].unique()[0] ) # remove df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8") # simplified for human reading outputFileName = "{}_mapStep_simplified.csv".format( df["BookletNumber"].unique()[0] ) rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"]) df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len() colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount', 'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI', 'reconCursorPosition', 'textLength', "textLenReconText", 'textContext', 'intendedWord', 'currentToken', # 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial', 'intraWord', 'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace', 'reconstructedText'] # to get rid of duplicated columns, remove the multiple index first df.loc[rowsToKeep, colsToKeep]\ .to_csv(outputFileName, encoding="utf-8") return df # note we are not catching exceptions here, to save time. # errors are caught at the highest level def reduceStep(df, feaConfig, verbose=False): """ REDUCE step: taking the df after the MAP step, and reduce to features, one block a row. :param df: the df passed from the mapStep :param feaConfig: the configuration file with parameters setting the byVars :param verbose: to be passed to reduce functions to save interim data frame if True :return: a Pandas data frame, with # of rows as blocks, and features as columns """ # asserts if df is None: logger.error("ReduceStep: input df is None; quitting") return None if not any([(k in df.columns) for k in feaConfig["byVars"]]): # keyword missing return None studentID = df["BookletNumber"].unique()[0] # #### Reduce #### # here we begin to summarize the feature columns # ################ # This is obviously a waste of time to repeat some feature steps in these # will deal with this later. For now, this is pleasing to the eyes try: dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceFeatureInitialKeypress(d, verbose=verbose) ).reset_index() #print dfFeaInitialKeypress except Exception as e: logger.error("Error in reduceStep: reduceFeatureInitialKeypress") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceWordBasedFeatures(d, verbose=verbose) ).reset_index() #print dfFeaWordBased except Exception as e: logger.error("Error in reduceStep: reduceWordBasedFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaPauses = df.groupby(feaConfig["byVars"]).apply( lambda d: reducePauseFeatures(d, verbose=verbose) ).reset_index() #print dfFeaPauses except Exception as e: logger.error("Error in reduceStep: reducePauseFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: # garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions dfFeaDelete = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1) ).reset_index() #print dfFeaDelete except Exception as e: logger.error("Error in reduceStep: reduceDeleteFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfFeaEditing = df.groupby(feaConfig["byVars"]).apply( lambda d: reduceEditingFeatures(d, verbose=verbose) ).reset_index() #print dfFeaEditing except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply( lambda d: d\ .loc[d.reconstructedText.notnull()]\ .reconstructedText.iloc[-1].count("`") ).rename("flagDiscrepancyMarkers").reset_index() except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: adminEventList = feaConfig['adminEventList'] nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply( lambda d: d\ .loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \ .shape[0] ).rename("flagAdminRaiseHandEvents").reset_index() except Exception as e: logger.error("Error in reduceStep: reduceEditingFeatures") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return try: dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased, dfFeaPauses, dfFeaDelete, dfFeaEditing, nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1) except Exception as e: logger.error("Error in reduceStep: merging all features") logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") return return dfSummary def processBooklet(filename, featureConfig, verbose=False, outputFeaturePath = ".", featureSetName = "finalFeatures", ): """ Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving. :param filename: full path to the CSV file :param featureConfig: the dict with config info :param verbose: if true, save intermediate data frames to the current directory :param outputFeaturePath: output path :param featureSetName: name of the final feature set; will be the last part of the output csv file name :return: none """ # output file path and name outputFeatureFileName = os.path.join(outputFeaturePath, os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") # debug logger.info("Processing %s", filename) ############# # Get Data try: df = getData(filename, featureConfig=featureConfig) except: df = None if df is None: logger.error("processBooklet: getData failed for %s", filename) return studentID = df["BookletNumber"].unique()[0] ############# # Map #logger.info("Map %s", filename) try: df = mapStep(df, verbose=verbose, feaConfig=featureConfig) except Exception as e: logger.error("Error in mapStep: %s", filename) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return if df is None: logger.error("processBooklet: mapStep failed for %s", filename) return ############# # Reduce #logger.info("Reduce %s", filename) try: df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) except Exception as e: logger.error("Error in reduceStep: %s", filename) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return if df is None: logger.error("processBooklet: reduceStep failed for %s", filename) return ############# # Save Data # debug logger.info("Saving %s", filename) try: # first drop duplicated rows (occasionally there will be) # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force df \ .loc[:, ~df.columns.duplicated()]\ .drop_duplicates() \ .to_csv(outputFeatureFileName, encoding='utf-8') except Exception as e: logger.error("Error writing to_csv: %s", outputFeatureFileName) logger.exception(e) exc_buffer = StringIO.StringIO() traceback.print_exc(file=exc_buffer) logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) return logger.info("Done. Output= %s", outputFeatureFileName) return def processBooklet_dask(filename, featureConfig, verbose=False, outputFeaturePath = ".", featureSetName = "finalFeatures"): """ processing a writing CSV file, for dask parallel processing. We remove any logger reference here. :param filename: full path to the CSV file :param featureConfig: the dict with config info :param verbose: if true, save intermediate data frames to the current directory :param outputFeaturePath: output path :param featureSetName: name of the final feature set; will be the last part of the output csv file name :return: none """ # output file path and name outputFeatureFileName = os.path.join(outputFeaturePath, os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") ############# # Get Data try: df = getData(filename, featureConfig=featureConfig) except: return if df is None: logger.error("processBooklet: getData failed for %s", filename) return ############# # Map try: df = mapStep(df, verbose=verbose, feaConfig=featureConfig) except: return if df is None: #logger.error("processBooklet: mapStep failed for %s", filename) return ############# # Reduce try: df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) except: return if df is None: return ############# # Save Data try: # first drop duplicated rows (occasionally there will be) # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force df \ .loc[:, ~df.columns.duplicated()]\ .drop_duplicates() \ .to_csv(outputFeatureFileName, encoding='utf-8') except Exception as e: return return import sys if __name__ == '__main__': if len(sys.argv) == 1: print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" exit() if sys.argv[1] not in ["Grade4", "Grade8", "test"]: print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" exit() import glob from pdia import * from pdia.writing2017.make2017Features import * import dask.bag as db from distributed import Client import datetime import time # paths today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script # garyfeng: to resume from a run: # today = "20180709_21" #### grade = sys.argv[1] inputCSVPath = "{}/".format(grade) outputFeaturePath = "{}_descFeatures_{}/".format(grade, today) if not os.path.exists(outputFeaturePath): os.makedirs(outputFeaturePath) featureSetName = "descFeature{}".format(today) print "input folder: {}".format(inputCSVPath) print "output folder: {}".format(outputFeaturePath) print "featureSetName: {}".format(featureSetName) ######### # getting the files to process print "======= Scanning for CSV files ============" print datetime.datetime.now() fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv")) if len(fileList)==0: print "\nNo CSV files found in the directory\n" exit() ########## # garyfeng: to resume by ignoring ones with output already. finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName))) finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles] fileList = list(set(fileList) - set(finishedFiles)) ########## print "Total input CSV files: %i" % len(fileList) print datetime.datetime.now() import gc def processIt(filename): processBooklet_dask(filename, featureConfig=featureConfig2017, verbose=False, outputFeaturePath=outputFeaturePath, featureSetName=featureSetName) gc.collect() return print "======= Begin Processing ============" print datetime.datetime.now() print "=====================================" # test with 1 file # processFile(fileList[0]) # Using distributed clients client = Client() # run parallel with dask db.from_sequence(fileList).map(processIt).compute() print "======== End Processing ===========" print datetime.datetime.now() print "==================================="