Skip to content

Instantly share code, notes, and snippets.

@garyfeng
Last active July 17, 2018 03:37
Show Gist options
  • Select an option

  • Save garyfeng/72545b10424acb8a081a07c58f46c07d to your computer and use it in GitHub Desktop.

Select an option

Save garyfeng/72545b10424acb8a081a07c58f46c07d to your computer and use it in GitHub Desktop.

Revisions

  1. garyfeng revised this gist Jul 17, 2018. 2 changed files with 2 additions and 2 deletions.
    2 changes: 1 addition & 1 deletion 2017Writing_descFeature_g4.py
    Original file line number Diff line number Diff line change
    @@ -474,7 +474,7 @@ def processBooklet_dask(filename,
    today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script

    # garyfeng: to resume from a run:
    today = "20180709_21"
    # today = "20180709_21"
    ####

    grade = sys.argv[1]
    2 changes: 1 addition & 1 deletion 2017Writing_descFeature_g8.py
    Original file line number Diff line number Diff line change
    @@ -472,7 +472,7 @@ def processBooklet_dask(filename,

    # paths
    today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script
    today = "20180711_04"
    # today = "20180711_04"
    grade = sys.argv[1]

    inputCSVPath = "{}/".format(grade)
  2. garyfeng created this gist Jul 17, 2018.
    536 changes: 536 additions & 0 deletions 2017Writing_descFeature_g4.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,536 @@
    # Functions for extracting the descriptive features for the 2017 operational analysis
    # Changes from make2017FinalFeatures.py:
    # - min

    #############

    import StringIO
    import os
    import traceback

    from pdia.durSinceBlockStart import *
    from pdia.writing2017.addFeatureTextChange import addTextChangeVars
    from pdia.writing2017.addKeyPressVars import addKeyPressVars
    from pdia.writing2017.burstFeatures import *
    from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures
    from pdia.writing2017.featuresConfig2017 import featureConfig2017
    from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress
    from pdia.writing2017.getData import getData
    from pdia.writing2017.reducePauseFeatures import reducePauseFeatures
    from pdia.writing2017.addWordTokens import *
    # Configs
    # parser config
    from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures


    # 2016 default configuration

    # 2017 data config


    # Read data
    def getData2017(filename, featureConfig=featureConfig2017):
    """
    Simply a wrap of getData with the 2017 config
    :param filename: the file name to process
    :param featureConfig: using the 2017 configuration
    :return: the parsed df
    """
    return getData(filename, featureConfig=featureConfig)


    def mapStep(df, feaConfig, verbose=False):
    """
    MAP step: creating keystroke level features, adding columns
    :param df: the data frame for a booklet, contain potentially multiple blocks
    :param feaConfig: the configuration for data import/parsing
    :param verbose: if True, saves the interim data
    """

    # asserts
    if df is None:
    logger.error("MapStep: input df is None; quitting")
    return None

    if not any([(k in df.columns) for k in feaConfig["byVars"]]):
    # keyword missing
    return None

    studentID = df["BookletNumber"].unique()[0]

    # ##### MAP ####
    # to handle the feature functions in the featureMap object
    # ##############
    def mapBlock(d):
    # return None if no keystroke log is available
    if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0:
    # print("mapBlock: No Observable data for the block")
    logger.debug("mapBlock: No Observable data for the block")
    return None
    d = durSinceBlockStart(d) if d is not None else None
    #d = addKeyPressVars(d) if d is not None else None
    #d = addTextChangeVars(d) if d is not None else None
    d = addFeatureIKI(d) if d is not None else None
    d = addWordTokens(d) if d is not None else None
    # garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5
    d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None
    return d

    try:
    # the following groupby().apply() is causing occasional python crashes
    # df = df \
    # .groupby(feaConfig["byVars"]) \
    # .apply(mapBlock)
    # taking a stupid method here
    tmp=[]
    for b in df["BlockCode"].unique():
    tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock))
    df = pd.concat(tmp)
    except Exception as e:
    logger.error("Error in mapStep")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8")

    return

    # saving
    if verbose:
    outputFileName = "{}_mapStep.csv".format(
    df["BookletNumber"].unique()[0]
    )
    # remove
    df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8")

    # simplified for human reading
    outputFileName = "{}_mapStep_simplified.csv".format(
    df["BookletNumber"].unique()[0]
    )
    rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"])
    df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len()
    colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount',
    'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI',
    'reconCursorPosition', 'textLength', "textLenReconText",
    'textContext', 'intendedWord', 'currentToken',
    # 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial',
    'intraWord',
    'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace',
    'reconstructedText']
    # to get rid of duplicated columns, remove the multiple index first
    df.loc[rowsToKeep, colsToKeep]\
    .to_csv(outputFileName, encoding="utf-8")

    return df

    # note we are not catching exceptions here, to save time.
    # errors are caught at the highest level

    def reduceStep(df, feaConfig, verbose=False):
    """
    REDUCE step: taking the df after the MAP step, and reduce to features, one block a row.
    :param df: the df passed from the mapStep
    :param feaConfig: the configuration file with parameters setting the byVars
    :param verbose: to be passed to reduce functions to save interim data frame if True
    :return: a Pandas data frame, with # of rows as blocks, and features as columns
    """

    # asserts
    if df is None:
    logger.error("ReduceStep: input df is None; quitting")
    return None

    if not any([(k in df.columns) for k in feaConfig["byVars"]]):
    # keyword missing
    return None

    studentID = df["BookletNumber"].unique()[0]

    # #### Reduce ####
    # here we begin to summarize the feature columns
    # ################

    # This is obviously a waste of time to repeat some feature steps in these
    # will deal with this later. For now, this is pleasing to the eyes
    try:
    dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceFeatureInitialKeypress(d, verbose=verbose)
    ).reset_index()
    #print dfFeaInitialKeypress
    except Exception as e:
    logger.error("Error in reduceStep: reduceFeatureInitialKeypress")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceWordBasedFeatures(d, verbose=verbose)
    ).reset_index()
    #print dfFeaWordBased
    except Exception as e:
    logger.error("Error in reduceStep: reduceWordBasedFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfFeaPauses = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reducePauseFeatures(d, verbose=verbose)
    ).reset_index()
    #print dfFeaPauses
    except Exception as e:
    logger.error("Error in reduceStep: reducePauseFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    # garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions
    dfFeaDelete = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1)
    ).reset_index()
    #print dfFeaDelete
    except Exception as e:
    logger.error("Error in reduceStep: reduceDeleteFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfFeaEditing = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceEditingFeatures(d, verbose=verbose)
    ).reset_index()
    #print dfFeaEditing
    except Exception as e:
    logger.error("Error in reduceStep: reduceEditingFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply(
    lambda d: d\
    .loc[d.reconstructedText.notnull()]\
    .reconstructedText.iloc[-1].count("`")
    ).rename("flagDiscrepancyMarkers").reset_index()
    except Exception as e:
    logger.error("Error in reduceStep: reduceEditingFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    adminEventList = feaConfig['adminEventList']
    nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply(
    lambda d: d\
    .loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \
    .shape[0]
    ).rename("flagAdminRaiseHandEvents").reset_index()
    except Exception as e:
    logger.error("Error in reduceStep: reduceEditingFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased,
    dfFeaPauses, dfFeaDelete, dfFeaEditing,
    nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1)
    except Exception as e:
    logger.error("Error in reduceStep: merging all features")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    return dfSummary

    def processBooklet(filename,
    featureConfig,
    verbose=False,
    outputFeaturePath = ".",
    featureSetName = "finalFeatures", ):
    """
    Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving.
    :param filename: full path to the CSV file
    :param featureConfig: the dict with config info
    :param verbose: if true, save intermediate data frames to the current directory
    :param outputFeaturePath: output path
    :param featureSetName: name of the final feature set; will be the last part of the output csv file name
    :return: none
    """

    # output file path and name
    outputFeatureFileName = os.path.join(outputFeaturePath,
    os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
    # debug
    logger.info("Processing %s", filename)
    #############
    # Get Data
    try:
    df = getData(filename, featureConfig=featureConfig)
    except:
    df = None
    if df is None:
    logger.error("processBooklet: getData failed for %s", filename)
    return

    studentID = df["BookletNumber"].unique()[0]

    #############
    # Map
    #logger.info("Map %s", filename)
    try:
    df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
    except Exception as e:
    logger.error("Error in mapStep: %s", filename)
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
    return

    if df is None:
    logger.error("processBooklet: mapStep failed for %s", filename)
    return


    #############
    # Reduce
    #logger.info("Reduce %s", filename)
    try:
    df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
    except Exception as e:
    logger.error("Error in reduceStep: %s", filename)
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
    return

    if df is None:
    logger.error("processBooklet: reduceStep failed for %s", filename)
    return

    #############
    # Save Data
    # debug
    logger.info("Saving %s", filename)
    try:
    # first drop duplicated rows (occasionally there will be)
    # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
    df \
    .loc[:, ~df.columns.duplicated()]\
    .drop_duplicates() \
    .to_csv(outputFeatureFileName, encoding='utf-8')

    except Exception as e:
    logger.error("Error writing to_csv: %s", outputFeatureFileName)
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
    return

    logger.info("Done. Output= %s", outputFeatureFileName)
    return


    def processBooklet_dask(filename,
    featureConfig,
    verbose=False,
    outputFeaturePath = ".",
    featureSetName = "finalFeatures"):
    """
    processing a writing CSV file, for dask parallel processing. We remove any logger reference here.
    :param filename: full path to the CSV file
    :param featureConfig: the dict with config info
    :param verbose: if true, save intermediate data frames to the current directory
    :param outputFeaturePath: output path
    :param featureSetName: name of the final feature set; will be the last part of the output csv file name
    :return: none
    """

    # output file path and name
    outputFeatureFileName = os.path.join(outputFeaturePath,
    os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
    #############
    # Get Data
    try:
    df = getData(filename, featureConfig=featureConfig)
    except:
    return

    if df is None:
    logger.error("processBooklet: getData failed for %s", filename)
    return

    #############
    # Map
    try:
    df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
    except:
    return

    if df is None:
    #logger.error("processBooklet: mapStep failed for %s", filename)
    return

    #############
    # Reduce
    try:
    df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
    except:
    return

    if df is None:
    return

    #############
    # Save Data
    try:
    # first drop duplicated rows (occasionally there will be)
    # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
    df \
    .loc[:, ~df.columns.duplicated()]\
    .drop_duplicates() \
    .to_csv(outputFeatureFileName, encoding='utf-8')
    except Exception as e:
    return
    return



    import sys

    if __name__ == '__main__':

    if len(sys.argv) == 1:
    print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
    exit()
    if sys.argv[1] not in ["Grade4", "Grade8", "test"]:
    print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
    exit()


    import glob
    from pdia import *
    from pdia.writing2017.make2017Features import *
    import dask.bag as db
    from distributed import Client
    import datetime
    import time

    # paths
    today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script

    # garyfeng: to resume from a run:
    today = "20180709_21"
    ####

    grade = sys.argv[1]

    inputCSVPath = "{}/".format(grade)
    outputFeaturePath = "{}_descFeatures_{}/".format(grade, today)
    if not os.path.exists(outputFeaturePath):
    os.makedirs(outputFeaturePath)
    featureSetName = "descFeature{}".format(today)
    print "input folder: {}".format(inputCSVPath)
    print "output folder: {}".format(outputFeaturePath)
    print "featureSetName: {}".format(featureSetName)

    #########
    # getting the files to process
    print "======= Scanning for CSV files ============"
    print datetime.datetime.now()
    fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv"))
    if len(fileList)==0:
    print "\nNo CSV files found in the directory\n"
    exit()

    ##########
    # garyfeng: to resume by ignoring ones with output already.
    finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName)))
    finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles]
    fileList = list(set(fileList) - set(finishedFiles))
    ##########

    print "Total input CSV files: %i" % len(fileList)
    print datetime.datetime.now()


    import gc
    def processIt(filename):
    processBooklet_dask(filename,
    featureConfig=featureConfig2017,
    verbose=False,
    outputFeaturePath=outputFeaturePath,
    featureSetName=featureSetName)
    gc.collect()
    return


    print "======= Begin Processing ============"
    print datetime.datetime.now()
    print "====================================="
    # test with 1 file
    # processFile(fileList[0])

    # Using distributed clients
    client = Client()
    # run parallel with dask
    db.from_sequence(fileList).map(processIt).compute()

    print "======== End Processing ==========="
    print datetime.datetime.now()
    print "==================================="

    540 changes: 540 additions & 0 deletions 2017Writing_descFeature_g8.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,540 @@
    # Functions for extracting the descriptive features for the 2017 operational analysis
    # Changes from make2017FinalFeatures.py:
    # - min

    #############

    import StringIO
    import os
    import traceback

    from pdia.durSinceBlockStart import *
    from pdia.writing2017.addFeatureTextChange import addTextChangeVars
    from pdia.writing2017.addKeyPressVars import addKeyPressVars
    from pdia.writing2017.burstFeatures import *
    from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures
    from pdia.writing2017.featuresConfig2017 import featureConfig2017
    from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress
    from pdia.writing2017.getData import getData
    from pdia.writing2017.reducePauseFeatures import reducePauseFeatures
    from pdia.writing2017.addWordTokens import *
    # Configs
    # parser config
    from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures


    # 2016 default configuration

    # 2017 data config


    # Read data
    def getData2017(filename, featureConfig=featureConfig2017):
    """
    Simply a wrap of getData with the 2017 config
    :param filename: the file name to process
    :param featureConfig: using the 2017 configuration
    :return: the parsed df
    """
    return getData(filename, featureConfig=featureConfig)


    def mapStep(df, feaConfig, verbose=False):
    """
    MAP step: creating keystroke level features, adding columns
    :param df: the data frame for a booklet, contain potentially multiple blocks
    :param feaConfig: the configuration for data import/parsing
    :param verbose: if True, saves the interim data
    """

    # asserts
    if df is None:
    logger.error("MapStep: input df is None; quitting")
    return None

    if not any([(k in df.columns) for k in feaConfig["byVars"]]):
    # keyword missing
    return None

    studentID = df["BookletNumber"].unique()[0]

    # ##### MAP ####
    # to handle the feature functions in the featureMap object
    # ##############
    def mapBlock(d):
    # return None if no keystroke log is available
    if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0:
    # print("mapBlock: No Observable data for the block")
    logger.debug("mapBlock: No Observable data for the block")
    return None
    d = durSinceBlockStart(d) if d is not None else None
    #d = addKeyPressVars(d) if d is not None else None
    #d = addTextChangeVars(d) if d is not None else None
    d = addFeatureIKI(d) if d is not None else None
    d = addWordTokens(d) if d is not None else None
    # garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5
    d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None
    return d

    try:
    # the following groupby().apply() is causing occasional python crashes
    # df = df \
    # .groupby(feaConfig["byVars"]) \
    # .apply(mapBlock)
    # taking a stupid method here
    tmp=[]
    for b in df["BlockCode"].unique():
    tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock))
    df = pd.concat(tmp)
    except Exception as e:
    logger.error("Error in mapStep")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8")

    return

    # saving
    if verbose:
    outputFileName = "{}_mapStep.csv".format(
    df["BookletNumber"].unique()[0]
    )
    # remove
    df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8")

    # simplified for human reading
    outputFileName = "{}_mapStep_simplified.csv".format(
    df["BookletNumber"].unique()[0]
    )
    rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"])
    df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len()
    colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount',
    'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI',
    'reconCursorPosition', 'textLength', "textLenReconText",
    'textContext', 'intendedWord', 'currentToken',
    # 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial',
    'intraWord',
    'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace',
    'reconstructedText']
    # to get rid of duplicated columns, remove the multiple index first
    df.loc[rowsToKeep, colsToKeep]\
    .to_csv(outputFileName, encoding="utf-8")

    return df

    # note we are not catching exceptions here, to save time.
    # errors are caught at the highest level

    def reduceStep(df, feaConfig, verbose=False):
    """
    REDUCE step: taking the df after the MAP step, and reduce to features, one block a row.
    :param df: the df passed from the mapStep
    :param feaConfig: the configuration file with parameters setting the byVars
    :param verbose: to be passed to reduce functions to save interim data frame if True
    :return: a Pandas data frame, with # of rows as blocks, and features as columns
    """

    # asserts
    if df is None:
    logger.error("ReduceStep: input df is None; quitting")
    return None

    if not any([(k in df.columns) for k in feaConfig["byVars"]]):
    # keyword missing
    return None

    studentID = df["BookletNumber"].unique()[0]

    # #### Reduce ####
    # here we begin to summarize the feature columns
    # ################

    # This is obviously a waste of time to repeat some feature steps in these
    # will deal with this later. For now, this is pleasing to the eyes
    try:
    dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceFeatureInitialKeypress(d, verbose=verbose)
    ).reset_index()
    #print dfFeaInitialKeypress
    except Exception as e:
    logger.error("Error in reduceStep: reduceFeatureInitialKeypress")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceWordBasedFeatures(d, verbose=verbose)
    ).reset_index()
    #print dfFeaWordBased
    except Exception as e:
    logger.error("Error in reduceStep: reduceWordBasedFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfFeaPauses = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reducePauseFeatures(d, verbose=verbose)
    ).reset_index()
    #print dfFeaPauses
    except Exception as e:
    logger.error("Error in reduceStep: reducePauseFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    # garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions
    dfFeaDelete = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1)
    ).reset_index()
    #print dfFeaDelete
    except Exception as e:
    logger.error("Error in reduceStep: reduceDeleteFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfFeaEditing = df.groupby(feaConfig["byVars"]).apply(
    lambda d: reduceEditingFeatures(d, verbose=verbose)
    ).reset_index()
    #print dfFeaEditing
    except Exception as e:
    logger.error("Error in reduceStep: reduceEditingFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply(
    lambda d: d\
    .loc[d.reconstructedText.notnull()]\
    .reconstructedText.iloc[-1].count("`")
    ).rename("flagDiscrepancyMarkers").reset_index()
    except Exception as e:
    logger.error("Error in reduceStep: reduceEditingFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    adminEventList = feaConfig['adminEventList']
    nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply(
    lambda d: d\
    .loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \
    .shape[0]
    ).rename("flagAdminRaiseHandEvents").reset_index()
    except Exception as e:
    logger.error("Error in reduceStep: reduceEditingFeatures")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    try:
    dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased,
    dfFeaPauses, dfFeaDelete, dfFeaEditing,
    nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1)
    except Exception as e:
    logger.error("Error in reduceStep: merging all features")
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())

    df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")

    return

    return dfSummary

    def processBooklet(filename,
    featureConfig,
    verbose=False,
    outputFeaturePath = ".",
    featureSetName = "finalFeatures", ):
    """
    Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving.
    :param filename: full path to the CSV file
    :param featureConfig: the dict with config info
    :param verbose: if true, save intermediate data frames to the current directory
    :param outputFeaturePath: output path
    :param featureSetName: name of the final feature set; will be the last part of the output csv file name
    :return: none
    """

    # output file path and name
    outputFeatureFileName = os.path.join(outputFeaturePath,
    os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
    # debug
    logger.info("Processing %s", filename)
    #############
    # Get Data
    try:
    df = getData(filename, featureConfig=featureConfig)
    except:
    df = None
    if df is None:
    logger.error("processBooklet: getData failed for %s", filename)
    return

    studentID = df["BookletNumber"].unique()[0]

    #############
    # Map
    #logger.info("Map %s", filename)
    try:
    df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
    except Exception as e:
    logger.error("Error in mapStep: %s", filename)
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
    return

    if df is None:
    logger.error("processBooklet: mapStep failed for %s", filename)
    return


    #############
    # Reduce
    #logger.info("Reduce %s", filename)
    try:
    df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
    except Exception as e:
    logger.error("Error in reduceStep: %s", filename)
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
    return

    if df is None:
    logger.error("processBooklet: reduceStep failed for %s", filename)
    return

    #############
    # Save Data
    # debug
    logger.info("Saving %s", filename)
    try:
    # first drop duplicated rows (occasionally there will be)
    # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
    df \
    .loc[:, ~df.columns.duplicated()]\
    .drop_duplicates() \
    .to_csv(outputFeatureFileName, encoding='utf-8')

    except Exception as e:
    logger.error("Error writing to_csv: %s", outputFeatureFileName)
    logger.exception(e)
    exc_buffer = StringIO.StringIO()
    traceback.print_exc(file=exc_buffer)
    logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
    return

    logger.info("Done. Output= %s", outputFeatureFileName)
    return


    def processBooklet_dask(filename,
    featureConfig,
    verbose=False,
    outputFeaturePath = ".",
    featureSetName = "finalFeatures"):
    """
    processing a writing CSV file, for dask parallel processing. We remove any logger reference here.
    :param filename: full path to the CSV file
    :param featureConfig: the dict with config info
    :param verbose: if true, save intermediate data frames to the current directory
    :param outputFeaturePath: output path
    :param featureSetName: name of the final feature set; will be the last part of the output csv file name
    :return: none
    """

    # output file path and name
    outputFeatureFileName = os.path.join(outputFeaturePath,
    os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
    #############
    # Get Data
    try:
    df = getData(filename, featureConfig=featureConfig)
    except:
    return

    if df is None:
    logger.error("processBooklet: getData failed for %s", filename)
    return

    #############
    # Map
    try:
    df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
    except:
    return

    if df is None:
    #logger.error("processBooklet: mapStep failed for %s", filename)
    return

    #############
    # Reduce
    try:
    df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
    except:
    return

    if df is None:
    return

    #############
    # Save Data
    try:
    # first drop duplicated rows (occasionally there will be)
    # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
    df \
    .loc[:, ~df.columns.duplicated()]\
    .drop_duplicates() \
    .to_csv(outputFeatureFileName, encoding='utf-8')
    except Exception as e:
    return
    return



    import sys

    if __name__ == '__main__':

    if len(sys.argv) == 1:
    print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
    exit()
    if sys.argv[1] not in ["Grade4", "Grade8", "test"]:
    print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
    exit()


    import glob
    from pdia import *
    from pdia.writing2017.make2017Features import *
    import dask.bag as db
    from distributed import Client
    import datetime
    import time

    # paths
    today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script
    today = "20180711_04"
    grade = sys.argv[1]

    inputCSVPath = "{}/".format(grade)
    outputFeaturePath = "{}_descFeatures_{}/".format(grade, today)
    if not os.path.exists(outputFeaturePath):
    os.makedirs(outputFeaturePath)
    featureSetName = "descFeature{}".format(today)
    print "input folder: {}".format(inputCSVPath)
    print "output folder: {}".format(outputFeaturePath)
    print "featureSetName: {}".format(featureSetName)

    #########
    # getting the files to process
    print "======= Scanning for CSV files ============"
    print datetime.datetime.now()
    fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv"))
    if len(fileList)==0:
    print "\nNo CSV files found in the directory\n"
    exit()

    ##########
    # garyfeng: to resume by ignoring ones with output already.
    finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName)))
    finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles]
    fileList = list(set(fileList) - set(finishedFiles))
    # error files should not be repeated
    finishedFiles = glob.glob("Error_*.csv")
    finishedFiles = ["{}/Writing_Grade8_{}_ObservableData.csv".format(grade, f.split("_")[1]) for f in finishedFiles]
    fileList = list(set(fileList) - set(finishedFiles))
    ##########

    print "Total input CSV files: %i" % len(fileList)
    print datetime.datetime.now()

    import gc
    def processIt(filename):
    processBooklet_dask(filename,
    featureConfig=featureConfig2017,
    verbose=False,
    outputFeaturePath=outputFeaturePath,
    featureSetName=featureSetName)
    gc.collect()
    return


    print "======= Begin Processing ============"
    print datetime.datetime.now()
    print "====================================="
    # test with 1 file
    # processFile(fileList[0])

    # Using distributed clients
    client = Client()
    # run parallel with dask
    db.from_sequence(fileList).map(processIt).compute()

    print "======== End Processing ==========="
    print datetime.datetime.now()
    print "==================================="

    # To restart
    # until python run_grade8.py Grade8; do
    # echo "Program crashed with exit code $?. Respawning.." >&2
    # sleep 1
    # done
    32 changes: 32 additions & 0 deletions pdj-windows.bat
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,32 @@
    SET NUM=0

    :LOOP
    SET /A NUM = %NUM% + 1
    FOR /f "usebackq" %%G in (`docker ps -q -f "name=pdj-%NUM%"`) DO set PDJID=%%G
    IF [%PDJID%] == [] GOTO QUERY
    SET PDJID=
    GOTO LOOP

    :QUERY
    set INPUT=
    set /P INPUT=Which version of pdia would you like to use? (1=master, 2=2018xval): %=%
    IF /I "%INPUT%"=="1" GOTO MASTER
    IF /I "%INPUT%"=="2" GOTO 2018XVAL
    GOTO QUERY

    :MASTER
    SET PDJTAG=master
    GOTO RUN

    :2018XVAL
    SET PDJTAG=2018xval
    GOTO RUN

    :RUN
    SET /A PORT = %NUM% + 8887
    docker pull pdia/docked-jupyter:%PDJTAG%
    docker run -p %PORT%:8888 -h CONTAINER -d -it --rm --name pdj-%NUM%-%PDJTAG% -v "%cd%":/home/jovyan/work pdia/docked-jupyter:%PDJTAG% jupyter notebook --NotebookApp.token='pdia'
    timeout /t 5
    start http://localhost:%PORT%/?token=pdia
    timeout /t -1
    docker stop pdj-%NUM%-%PDJTAG%