Last active
September 24, 2022 16:05
-
-
Save jiffyclub/905bf5e8bf17ec59ab8f to your computer and use it in GitHub Desktop.
Do the same thing in Spark and Pandas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Convert Pandas DFs in an HDFStore to parquet files for better compatibility | |
with Spark. | |
Run from the command line with: | |
spark-submit --driver-memory 4g --master 'local[*]' hdf5_to_parquet.py | |
""" | |
import pandas as pd | |
from pyspark import SparkContext, SparkConf | |
from pyspark.sql import SQLContext | |
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5') | |
persons = store['persons'].reset_index() | |
households = store['households'].reset_index() | |
store.close() | |
spark_conf = ( | |
SparkConf() | |
.setAppName('SparkRunDemo') | |
# .setMaster('local[*]') | |
# .set('spark.driver.memory', '8g') | |
.set('spark.executor.memory', '8g') | |
.set('spark.python.worker.memory', '8g') | |
.set('spark.storage.memoryFraction', 0.2) | |
.set('spark.logConf', True)) | |
print spark_conf.toDebugString() | |
sc = SparkContext(conf=spark_conf) | |
sql = SQLContext(sc) | |
hh_spark = sql.createDataFrame(households) | |
p_spark = sql.createDataFrame(persons) | |
hh_spark.write.parquet('households.parquet') | |
p_spark.write.parquet('persons.parquet') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pandas as pd | |
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5') | |
persons = store['persons'] | |
households = store['households'] | |
store.close() | |
t1 = time.time() | |
persons = persons.merge(households, left_on='household_id', right_index=True) | |
t2 = time.time() | |
print 'time to merge: {}'.format(t2 - t1) | |
persons = persons.query('age >= 18 and income >= 10000') | |
assert len(persons) > 0, 'no people left after query' | |
t3 = time.time() | |
print 'time to filter: {}'.format(t3 - t2) | |
income_by_sex = persons.groupby('sex').income.mean() | |
t4 = time.time() | |
print 'time to groupby agg: {}'.format(t4 - t3) | |
print 'total time: {}'.format(t4 - t1) | |
print income_by_sex |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Ran from the command line with: | |
time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log | |
""" | |
import time | |
from pyspark import SparkContext, SparkConf | |
from pyspark.sql import SQLContext | |
spark_conf = ( | |
SparkConf() | |
.setAppName('SparkRunDemo') | |
# .setMaster('local[*]') | |
# .set('spark.driver.memory', '8g') | |
.set('spark.executor.memory', '8g') | |
.set('spark.python.worker.memory', '8g') | |
.set('spark.storage.memoryFraction', 0.2) | |
.set('spark.logConf', True)) | |
print spark_conf.toDebugString() | |
sc = SparkContext(conf=spark_conf) | |
sql = SQLContext(sc) | |
hh_spark = sql.read.parquet('households.parquet') | |
p_spark = sql.read.parquet('persons.parquet') | |
t1 = time.time() | |
merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id) | |
t2 = time.time() | |
print 'time to merge: {}'.format(t2 - t1) | |
# filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000)) | |
filtered = merged.filter('age >= 18 and income >= 10000') | |
t3 = time.time() | |
print 'time to filter: {}'.format(t3 - t2) | |
income_by_sex = filtered.groupby('sex').agg({'income': 'mean'}) | |
t4 = time.time() | |
print 'time to groupby agg: {}'.format(t4 - t3) | |
print income_by_sex.collect() | |
t5 = time.time() | |
print 'time to collect: {}'.format(t5 - t4) | |
print 'total time: {}'.format(t5 - t1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment