This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import col, explode_outer, from_json, lit, concat | |
from pyspark.sql.types import StructType, ArrayType | |
def get_json_df(input_df, primary_partition_column, json_column_name, spark_session): | |
''' | |
Description: | |
This function provides the schema of json records and the dataframe to be used for flattening | |
:param input_df: [type: pyspark.sql.dataframe.DataFrame] input dataframe |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.types import * | |
from pyspark.sql.functions import col, from_json | |
def is_leaf(data): | |
''' | |
Description: | |
This function checks if the particular field in the schema is a leaf or not. | |
Types not considered as a leaf : struct, array | |
:param data: [type: dict] a dictionary containing metadata about a field |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import col, explode_outer | |
from pyspark.sql.types import * | |
from copy import deepcopy | |
from autoflatten import AutoFlatten | |
from collections import Counter | |
s3_path = 's3://mybucket/orders/' | |
df = spark.read.orc(s3_path) | |
json_df = spark.read.json(df.rdd.map(lambda row: row.json)) | |
json_schema = json_df.schema |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import col, explode_outer | |
from copy import deepcopy | |
from autoflatten import AutoFlatten | |
s3_path = 's3://mybucket/orders/' | |
df = spark.read.orc(s3_path) | |
json_df = spark.read.json(df.rdd.map(lambda row: row.json)) | |
json_schema = json_df.schema | |
af = AutoFlatten(json_schema) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def compute(self): | |
''' | |
Description: | |
This function performs the required computation and gets all the resources | |
needed for further process of selecting and exploding fields | |
''' | |
self.unnest_dict(self.fields_in_json, '') | |
all_cols_in_explode_cols = set(filter(lambda x: x.startswith(tuple(self.cols_to_explode)), self.all_fields.keys())) | |
self.rest = set(self.all_fields.keys()).difference(all_cols_in_explode_cols) | |
self.structure = self.get_structure([f"json{x}" for x in list(self.cols_to_explode)]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_bottom_to_top(self, order, all_cols_in_explode_cols): | |
''' | |
Description: | |
This function gets the mutually exclusive leaf fields in every array type column | |
:param order: [type: list] contains the fields in order in which array explode has to take place | |
:param all_cols_in_explode_cols: [type: set] contains all fields in array type fields | |
:return bottom_to_top: [type: dict] contains list of mutually exclusive leaf fields for every | |
array type / struct type (parent to array type) field |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def extract_order(self, structure): | |
''' | |
Description: | |
This function does a BFS traversal to obtain the order in which | |
the array type fields are to be exploded | |
:param structure: [type: dict] contains the hierarchical mapping for array fields | |
:return order: [type: list] contains the fields in order in which array explode has to take place | |
''' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_structure(self, col_list): | |
''' | |
Description: | |
This function gets the structure to the traversal to array field in the schema | |
:param col_list: [type: list] contains list of fields that are to be exploded | |
:return structure: [type: dict] contains the hierarchical mapping for array fields | |
''' | |
structure = {'json' : {}} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def unnest_dict(self, json, cur_path): | |
''' | |
Description: | |
This function unnests the dictionaries in the json schema recursively | |
and maps the hierarchical path to the field to the column name when it encounters a leaf node | |
:param json: [type: dict] contains metadata about the field | |
:param cur_path: [type: str] contains hierarchical path to that field, each parent separated by a '.' | |
''' | |
if self.is_leaf(json): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def is_leaf(self, data): | |
''' | |
Description: | |
This function checks if the particular field in the schema is a leaf or not. | |
Types not considered as a leaf : struct, array | |
:param data: [type: dict] a dictionary containing metadata about a field | |
:return leaf: [type: bool] indicates whether a given field is a leaf or not | |
''' |
NewerOlder