Skip to content

Instantly share code, notes, and snippets.

@vmuriart
Forked from jiffyclub/hdf_to_parquet.py
Created May 18, 2017 00:31
Show Gist options
  • Select an option

  • Save vmuriart/43f3754cae58c014300bc68b11d09409 to your computer and use it in GitHub Desktop.

Select an option

Save vmuriart/43f3754cae58c014300bc68b11d09409 to your computer and use it in GitHub Desktop.

Revisions

  1. @jiffyclub jiffyclub revised this gist Jun 18, 2015. 1 changed file with 2 additions and 10 deletions.
    12 changes: 2 additions & 10 deletions spark_run.py
    Original file line number Diff line number Diff line change
    @@ -6,17 +6,9 @@
    """
    import time

    import pandas as pd
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext

    store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')

    persons = store['persons'].reset_index()
    households = store['households'].reset_index()

    store.close()

    spark_conf = (
    SparkConf()
    .setAppName('SparkRunDemo')
    @@ -32,8 +24,8 @@
    sc = SparkContext(conf=spark_conf)
    sql = SQLContext(sc)

    hh_spark = sql.createDataFrame(households)
    p_spark = sql.createDataFrame(persons)
    hh_spark = sql.read.parquet('households.parquet')
    p_spark = sql.read.parquet('persons.parquet')

    t1 = time.time()

  2. @jiffyclub jiffyclub revised this gist Jun 18, 2015. 1 changed file with 40 additions and 0 deletions.
    40 changes: 40 additions & 0 deletions hdf_to_parquet.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    """
    Convert Pandas DFs in an HDFStore to parquet files for better compatibility
    with Spark.
    Run from the command line with:
    spark-submit --driver-memory 4g --master 'local[*]' hdf5_to_parquet.py
    """
    import pandas as pd
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext

    store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')

    persons = store['persons'].reset_index()
    households = store['households'].reset_index()

    store.close()

    spark_conf = (
    SparkConf()
    .setAppName('SparkRunDemo')
    # .setMaster('local[*]')
    # .set('spark.driver.memory', '8g')
    .set('spark.executor.memory', '8g')
    .set('spark.python.worker.memory', '8g')
    .set('spark.storage.memoryFraction', 0.2)
    .set('spark.logConf', True))

    print spark_conf.toDebugString()

    sc = SparkContext(conf=spark_conf)
    sql = SQLContext(sc)

    hh_spark = sql.createDataFrame(households)
    p_spark = sql.createDataFrame(persons)

    hh_spark.write.parquet('households.parquet')
    p_spark.write.parquet('persons.parquet')
  3. @jiffyclub jiffyclub revised this gist Jun 17, 2015. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions spark_run.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,9 @@
    """
    Ran from the command line with:
    time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log
    """
    import time

    import pandas as pd
  4. @jiffyclub jiffyclub created this gist Jun 17, 2015.
    32 changes: 32 additions & 0 deletions pandas_run.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,32 @@
    import time

    import pandas as pd

    store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')

    persons = store['persons']
    households = store['households']

    store.close()

    t1 = time.time()

    persons = persons.merge(households, left_on='household_id', right_index=True)

    t2 = time.time()
    print 'time to merge: {}'.format(t2 - t1)

    persons = persons.query('age >= 18 and income >= 10000')
    assert len(persons) > 0, 'no people left after query'

    t3 = time.time()
    print 'time to filter: {}'.format(t3 - t2)

    income_by_sex = persons.groupby('sex').income.mean()

    t4 = time.time()
    print 'time to groupby agg: {}'.format(t4 - t3)

    print 'total time: {}'.format(t4 - t1)

    print income_by_sex
    54 changes: 54 additions & 0 deletions spark_run.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,54 @@
    import time

    import pandas as pd
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext

    store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')

    persons = store['persons'].reset_index()
    households = store['households'].reset_index()

    store.close()

    spark_conf = (
    SparkConf()
    .setAppName('SparkRunDemo')
    # .setMaster('local[*]')
    # .set('spark.driver.memory', '8g')
    .set('spark.executor.memory', '8g')
    .set('spark.python.worker.memory', '8g')
    .set('spark.storage.memoryFraction', 0.2)
    .set('spark.logConf', True))

    print spark_conf.toDebugString()

    sc = SparkContext(conf=spark_conf)
    sql = SQLContext(sc)

    hh_spark = sql.createDataFrame(households)
    p_spark = sql.createDataFrame(persons)

    t1 = time.time()

    merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id)

    t2 = time.time()
    print 'time to merge: {}'.format(t2 - t1)

    # filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000))
    filtered = merged.filter('age >= 18 and income >= 10000')

    t3 = time.time()
    print 'time to filter: {}'.format(t3 - t2)

    income_by_sex = filtered.groupby('sex').agg({'income': 'mean'})

    t4 = time.time()
    print 'time to groupby agg: {}'.format(t4 - t3)

    print income_by_sex.collect()

    t5 = time.time()
    print 'time to collect: {}'.format(t5 - t4)
    print 'total time: {}'.format(t5 - t1)