Last active
October 8, 2018 02:15
-
-
Save sonhmai/61e3de5109e74d5cc537229b575ac6c1 to your computer and use it in GitHub Desktop.
pyspark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class pyspark.sql.DataFrame | |
people = spark.read.parquet("...") | |
department = spark.read.parquet("...") | |
people.filter(people.age > 30) \ | |
.join(department, people.deptId == department.id) \ | |
.groupBy(department.name, "gender") \ | |
.agg({"salary": "avg", "age": "max"}) | |
# agg -> aggregate on entire dataframe without groups | |
df.agg({"age": "max"}).collect() | |
from pyspark.sql import functions as F | |
df.agg(F.min(df.age)).collect() | |
# persists dataframe with default storage level (MEMORY_AND_DiSK). Useful in cases: | |
# 1) reusing in alternative loop (ML algos). 2) reuse RDD multiple times in single application, job, notebook. | |
# 3) upfront cost to regenerate RDD is big (HDFS, after complex set of map(), filter(),..) | |
df.cache() | |
# returns new dataframe that has exactly numPartitions partitions | |
df1 = df.coalesce(numPartitions=1) | |
df1.rdd.getNumPartitions # output 1 | |
df.collect() # returns all records as list of Row | |
df.count() # number of rows in dataframe | |
df.describe() # basic statistics for numeric and string columns | |
df.distinct().count() # distinct returns a new dataframe containing distinct rows in this dataframe | |
# drop() returns new df that drops specified col | |
df.drop('age').collect() | |
df.drop(df.age).collect() | |
df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect() | |
=> [Row(age=5, height=85, name='Bob')] | |
df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect() | |
=> [Row(age=5, name='Bob', height=85)] | |
df.join(df2, 'name', 'inner').drop('age', 'height').collect() | |
=> [Row(name='Bob')] | |
df.dropDuplicates() # returns new df with duplicate rows removed, optionally only considering certain cols | |
from pyspark.sql import Row | |
df = sc.parallelize([ \ | |
... Row(name='Alice', age=5, height=80), \ | |
... Row(name='Alice', age=5, height=80), \ | |
... Row(name='Alice', age=10, height=80)]).toDF() | |
df.dropDuplicates().show() # if name, age, height are similar -> duplicate | |
df.dropDuplicates(['name', 'height']).show() # if name and height are similar -> duplicate | |
df.dropna(how='any', thresh=None, subset=None) #subset optional list of col names to consider | |
df.dtypes # return list of tuples (colname, datatype) | |
=> [('age', 'int'), ('name', 'string')] | |
df.na #returns a DataFrameNaFunctions for handling missing values | |
# sort() and orderBy() | |
df.orderBy() # returns new df sorted by specified columns | |
df.sort(df.age.desc()).collect() | |
df.sort("age", ascending=False).collect() | |
df.orderBy(df.age.desc()).collect() | |
from pyspark.sql.functions import * | |
df.sort(asc("age")).collect() | |
df.orderBy(desc("age"), "name").collect() | |
df.orderBy(["age","name"], ascending=[0,1]).collect() | |
df.printSchema() | |
# replace(to_replace, value, subset). subset - optional list of col names to consider. | |
# replace to_replace with value except when to_replace is a dict | |
df4.na.replace(10, 20).show() | |
df4.na.replace('Alice', None).show() | |
df4.na.replace({'Alice': None}).show() | |
df4.na.replace(['Alice','Bob'],['A','B'],'name').show() | |
# take(num) -> returns 1st num rows as a list of Row | |
df.take(2) | |
# toDF(*cols) -> returns new class DataFrame with new specified col names | |
df.toDF('f1', 'f2').collect() | |
=> [Row(f1=2, f2='Alice'), Row(f1=5, f2='Bob')] | |
# write => DataFrameWriter -> interface for saving content of non-streaming DataFrame out into external storage | |
# writeStream => DataStreamWriter -> saving streaming df to external storage | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class pyspark.sql.DataFrameWriter(df) | |
# bucketBy(numbuckets, col, *cols) -> buckets output by given cols. If specified, output is laid out on file system similar | |
# to Hive's bucketing scheme | |
(df.write.format('parquet') \ | |
.bucketBy(numBuckets=100, col='year', cols='month') \ | |
.mode("overwrite") \ | |
.saveAsTable('bucketed_table')) | |
df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data') | |
jdbc(url, table, mode=None, properties=None) # saves content of DataFrame to external db via JDBC | |
df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) | |
# parquet(path, mode=None, partitionBy=None, compression=None)[source] | |
# compresseion = 1 in ['snappy', 'gzip', 'lzo'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment