Created
March 2, 2022 15:07
-
-
Save shreyasms17/96f74e45d862f8f1dce0532442cc95b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import col, explode_outer, from_json, lit, concat | |
from pyspark.sql.types import StructType, ArrayType | |
def get_json_df(input_df, primary_partition_column, json_column_name, spark_session): | |
''' | |
Description: | |
This function provides the schema of json records and the dataframe to be used for flattening | |
:param input_df: [type: pyspark.sql.dataframe.DataFrame] input dataframe | |
:primary_partition_column: [type: string] name of primary partition column | |
:param json_column_name: [type: string] name of the column with json string | |
:param spark_session: SparkSession object | |
:return df: dataframe to be used for flattening | |
''' | |
input_df = input_df if primary_partition_column is None else input_df.drop(primary_partition_column) | |
# creating a column transformedJSON to create an outer struct | |
df1 = input_df.withColumn('transformed_json', concat(lit("""{"transformedJSON" :"""), input_df[json_column_name], lit("""}"""))) | |
json_df = spark_session.read.json(df1.rdd.map(lambda row: row.transformed_json)) | |
# get schema | |
json_schema = json_df.schema | |
df = df1.drop(json_column_name).withColumn(json_column_name, from_json(col('transformed_json'), json_schema)).drop('transformed_json').select(f'{json_column_name}.*', '*').drop(json_column_name) | |
return df | |
def execute_autoflatten(df, json_column_name): | |
''' | |
Description: | |
This function executes the core autoflattening operation | |
:param df: [type: pyspark.sql.dataframe.DataFrame] dataframe to be used for flattening | |
:param json_column_name: [type: string] name of the column with json string | |
:return df: DataFrame containing flattened records | |
''' | |
# gets all fields of StructType or ArrayType in the nested_fields dictionary | |
nested_fields = dict([ | |
(field.name, field.dataType) | |
for field in df.schema.fields | |
if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType) | |
]) | |
# repeat until all nested_fields i.e. belonging to StructType or ArrayType are covered | |
while nested_fields: | |
# if there are any elements in the nested_fields dictionary | |
if nested_fields: | |
# get a column | |
column_name = list(nested_fields.keys())[0] | |
# if field belongs to a StructType, all child fields inside it are accessed | |
# and are aliased with complete path to every child field | |
if isinstance(nested_fields[column_name], StructType): | |
unnested = [col(column_name + '.' + child).alias(column_name + '>' + child) for child in [ n.name for n in nested_fields[column_name]]] | |
df = df.select("*", *unnested).drop(column_name) | |
# else, if the field belongs to an ArrayType, an explode_outer is done | |
elif isinstance(nested_fields[column_name], ArrayType): | |
df = df.withColumn(column_name, explode_outer(column_name)) | |
# Now that df is updated, gets all fields of StructType and ArrayType in a fresh nested_fields dictionary | |
nested_fields = dict([ | |
(field.name, field.dataType) | |
for field in df.schema.fields | |
if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType) | |
]) | |
# renaming all fields extracted with json> to retain complete path to the field | |
for df_col_name in df.columns: | |
df = df.withColumnRenamed(df_col_name, df_col_name.replace("transformedJSON", json_column_name)) | |
return df | |
def execute(input_df, primary_partition_column, json_column_name, spark_session): | |
''' | |
Description: | |
This function executes the flattening of json records in the dataframe | |
:param input_df: [type: pyspark.sql.dataframe.DataFrame] input dataframe | |
:primary_partition_column: [type: string] name of primary partition column | |
:param json_column_name: [type: string] name of the column with json string | |
:param spark_session: SparkSession object | |
:return unstd_df: contains flattened dataframe with unstandardized column name format | |
''' | |
json_df = get_json_df(input_df, primary_partition_column, json_column_name, spark_session) | |
unstd_df = execute_autoflatten(json_df, json_column_name) | |
return unstd_df |
what is json_column_name? Please can someone provide some additional information
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @shreyasms17, If I am loading the final structure to the RDBMS, what would be the primary key? How to handle updates/deletes?