Last active
October 11, 2022 06:54
-
-
Save shreyasms17/47a4456abd765181a16066780f6147a3 to your computer and use it in GitHub Desktop.
Execution of AutoFlatten
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import col, explode_outer | |
from copy import deepcopy | |
from autoflatten import AutoFlatten | |
s3_path = 's3://mybucket/orders/' | |
df = spark.read.orc(s3_path) | |
json_df = spark.read.json(df.rdd.map(lambda row: row.json)) | |
json_schema = json_df.schema | |
af = AutoFlatten(json_schema) | |
af.compute() | |
df1 = json_df | |
visited = set([f'.{column}' for column in df1.columns]) | |
cols_to_select = df1.columns | |
for rest_col in af.rest: | |
if rest_col not in visited: | |
cols_to_select += [rest_col[1:]] if af.all_fields[rest_col] not in df1.columns else [col(rest_col[1:]).alias('-'.join(rest_col[1:].split('.')[-2:]))] | |
visited.add(rest_col) | |
df1 = df1.select(cols_to_select) | |
if af.order: | |
for key in af.order: | |
column = key.split('.')[-1] | |
if af.bottom_to_top[key]: | |
''' | |
values for the column in bottom_to_top dict exists if it is an array type | |
''' | |
df1 = df1.select('*', explode_outer(col(column)).alias(f"{column}_exploded")).drop(column) | |
#grabbing all paths to columns after explode | |
cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.select(f'{column}_exploded.*').columns)) | |
#retrieving unvisited columns | |
cols_to_select_set = cols_in_array_col.difference(visited) | |
#check done for duplicate column name & path | |
cols_to_select_list = list(map(lambda x: f"{column}_exploded.{x.split('.')[-1]}" if (x.split('.')[-1] not in df1.columns and f"{column}-{x.split('.')[-1]}" not in df1.columns) else col(f"{column}_exploded.{x.split('.')[-1]}").alias(f"{column}-{x.split('.')[-1]}"), list(cols_to_select_set))) | |
#updating visited set | |
visited.update(cols_to_select_set) | |
df1 = df1.select(df1.columns + cols_to_select_list).drop(f"{column}_exploded") | |
else: | |
''' | |
values for the column in bottom_to_top dict do not exist if it is a struct type | |
''' | |
#grabbing all paths to columns after opening | |
cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.selectExpr(f'{column}.*').columns)) | |
#retrieving unvisited columns | |
cols_to_select_set = cols_in_array_col.difference(visited) | |
#check done for duplicate column name & path | |
cols_to_select_list = list(map(lambda x: f"{column}.{x.split('.')[-1]}" if (x.split('.')[-1] not in df1.columns and f"{column}-{x.split('.')[-1]}" not in df1.columns) else col(f"{column}.{x.split('.')[-1]}").alias(f"{column}-{x.split('.')[-1]}"), list(cols_to_select_set))) | |
#updating visited set | |
visited.update(cols_to_select_set) | |
df1 = df1.select(df1.columns + cols_to_select_list).drop(f"{column}") | |
final_df = df1.select(list(set(af.all_fields.values())) + [x for x in df1.columns if '-' in x]) |
Hello,
Thanks a lot for reaching out.
If there are json records with different schemas then when spark.read.json
is done, it computes a union of the schemas.
Also, if the schemas are different then it would be advisable to first
split them up as they might not be the same contextually and then flatten
those individually.
Thanks and regards,
Shreyas
…On Sat, Apr 9, 2022, 13:17 tooptoop4 ***@***.***> wrote:
***@***.**** commented on this gist.
------------------------------
@shreyasms17 <https://github.com/shreyasms17> how do u handle a single
json file with 2 lines having differing schemas?
{"glossary":{"title":"example glossary","GlossDiv":{"title":"S","GlossList":{"GlossEntry":{"ID":"SGML","SoortAs":"SGML","GlossTerm":"Standard Generalized Markup Language","Acronym":"SGML","Abbrev":"ISO 8879:1986","GlossDef":{"para":"A meta-markup language, used to create markup languages such as DocBook.","GlossSeeAlso":["GML","XML"]},"GlossSee":"markup"}}}}}
{"glossary":{"title":"exaddmple glossary","GlossCDiv":{"title":"S","GlossList":{"GlossEntry":{"ID":"SGML","SortAs":"SGML","GlossTerm":"Standard Generalized Markup Language","Acronym":"SGML","Abbrev":"ISO 8879:1986","GlossDef":{"para":"A meta-markup language, used to create markup languages such as DocBook.","GlossSeeAlso":["GML","XML"]},"GlossSee":"markup"}}}}}
—
Reply to this email directly, view it on GitHub
<https://gist.github.com/47a4456abd765181a16066780f6147a3#gistcomment-4126777>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AKZ2VMVTLGDMT3R67FPNNMDVEEYXFANCNFSM5S6TFISA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@shreyasms17 how do u handle a single json file with 2 lines having differing schemas?