-
-
Save vmuriart/43f3754cae58c014300bc68b11d09409 to your computer and use it in GitHub Desktop.
Revisions
-
jiffyclub revised this gist
Jun 18, 2015 . 1 changed file with 2 additions and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,17 +6,9 @@ """ import time from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext spark_conf = ( SparkConf() .setAppName('SparkRunDemo') @@ -32,8 +24,8 @@ sc = SparkContext(conf=spark_conf) sql = SQLContext(sc) hh_spark = sql.read.parquet('households.parquet') p_spark = sql.read.parquet('persons.parquet') t1 = time.time() -
jiffyclub revised this gist
Jun 18, 2015 . 1 changed file with 40 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,40 @@ """ Convert Pandas DFs in an HDFStore to parquet files for better compatibility with Spark. Run from the command line with: spark-submit --driver-memory 4g --master 'local[*]' hdf5_to_parquet.py """ import pandas as pd from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5') persons = store['persons'].reset_index() households = store['households'].reset_index() store.close() spark_conf = ( SparkConf() .setAppName('SparkRunDemo') # .setMaster('local[*]') # .set('spark.driver.memory', '8g') .set('spark.executor.memory', '8g') .set('spark.python.worker.memory', '8g') .set('spark.storage.memoryFraction', 0.2) .set('spark.logConf', True)) print spark_conf.toDebugString() sc = SparkContext(conf=spark_conf) sql = SQLContext(sc) hh_spark = sql.createDataFrame(households) p_spark = sql.createDataFrame(persons) hh_spark.write.parquet('households.parquet') p_spark.write.parquet('persons.parquet') -
jiffyclub revised this gist
Jun 17, 2015 . 1 changed file with 6 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,9 @@ """ Ran from the command line with: time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log """ import time import pandas as pd -
jiffyclub created this gist
Jun 17, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,32 @@ import time import pandas as pd store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5') persons = store['persons'] households = store['households'] store.close() t1 = time.time() persons = persons.merge(households, left_on='household_id', right_index=True) t2 = time.time() print 'time to merge: {}'.format(t2 - t1) persons = persons.query('age >= 18 and income >= 10000') assert len(persons) > 0, 'no people left after query' t3 = time.time() print 'time to filter: {}'.format(t3 - t2) income_by_sex = persons.groupby('sex').income.mean() t4 = time.time() print 'time to groupby agg: {}'.format(t4 - t3) print 'total time: {}'.format(t4 - t1) print income_by_sex This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,54 @@ import time import pandas as pd from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5') persons = store['persons'].reset_index() households = store['households'].reset_index() store.close() spark_conf = ( SparkConf() .setAppName('SparkRunDemo') # .setMaster('local[*]') # .set('spark.driver.memory', '8g') .set('spark.executor.memory', '8g') .set('spark.python.worker.memory', '8g') .set('spark.storage.memoryFraction', 0.2) .set('spark.logConf', True)) print spark_conf.toDebugString() sc = SparkContext(conf=spark_conf) sql = SQLContext(sc) hh_spark = sql.createDataFrame(households) p_spark = sql.createDataFrame(persons) t1 = time.time() merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id) t2 = time.time() print 'time to merge: {}'.format(t2 - t1) # filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000)) filtered = merged.filter('age >= 18 and income >= 10000') t3 = time.time() print 'time to filter: {}'.format(t3 - t2) income_by_sex = filtered.groupby('sex').agg({'income': 'mean'}) t4 = time.time() print 'time to groupby agg: {}'.format(t4 - t3) print income_by_sex.collect() t5 = time.time() print 'time to collect: {}'.format(t5 - t4) print 'total time: {}'.format(t5 - t1)