Skip to content

Instantly share code, notes, and snippets.

@vmuriart
Forked from jiffyclub/hdf_to_parquet.py
Created May 18, 2017 00:31
Show Gist options
  • Select an option

  • Save vmuriart/43f3754cae58c014300bc68b11d09409 to your computer and use it in GitHub Desktop.

Select an option

Save vmuriart/43f3754cae58c014300bc68b11d09409 to your computer and use it in GitHub Desktop.
Do the same thing in Spark and Pandas
import time
import pandas as pd
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons']
households = store['households']
store.close()
t1 = time.time()
persons = persons.merge(households, left_on='household_id', right_index=True)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
persons = persons.query('age >= 18 and income >= 10000')
assert len(persons) > 0, 'no people left after query'
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = persons.groupby('sex').income.mean()
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print 'total time: {}'.format(t4 - t1)
print income_by_sex
"""
Ran from the command line with:
time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log
"""
import time
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons'].reset_index()
households = store['households'].reset_index()
store.close()
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.createDataFrame(households)
p_spark = sql.createDataFrame(persons)
t1 = time.time()
merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
# filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000))
filtered = merged.filter('age >= 18 and income >= 10000')
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = filtered.groupby('sex').agg({'income': 'mean'})
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print income_by_sex.collect()
t5 = time.time()
print 'time to collect: {}'.format(t5 - t4)
print 'total time: {}'.format(t5 - t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment