Last active
August 1, 2021 08:00
-
-
Save shreyasms17/8b8a064eb08f72e28e3565a0aadd7fc5 to your computer and use it in GitHub Desktop.
Enforcing string type in json records
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.types import * | |
from pyspark.sql.functions import col, from_json | |
def is_leaf(data): | |
''' | |
Description: | |
This function checks if the particular field in the schema is a leaf or not. | |
Types not considered as a leaf : struct, array | |
:param data: [type: dict] a dictionary containing metadata about a field | |
:return leaf: [type: bool] indicates whether a given field is a leaf or not | |
''' | |
try: | |
if isinstance(data['type'], str): | |
leaf = True if data['type'] != 'struct' else False | |
else: | |
leaf = True if data['type']['type'] == 'map' else False | |
except: | |
leaf = False | |
finally: | |
return leaf | |
def enforce_stringtype(json_dict): | |
''' | |
Description: | |
This function recursively traverses the schema metadata | |
and changes the type of every leaf field to string | |
:param json_dict: [type: dict] contains metadata about the field | |
''' | |
if is_leaf(json_dict): | |
# change type to string | |
json_dict['type'] = 'string' | |
return | |
else: | |
if isinstance(json_dict, list): | |
for item in json_dict: | |
enforce_stringtype(item) | |
elif isinstance(json_dict, dict): | |
if isinstance(json_dict['type'], str): | |
enforce_stringtype(json_dict['type']) | |
else: | |
if json_dict['type']['type'] == 'array': | |
if isinstance(json_dict['type']['elementType'], dict): | |
enforce_stringtype(json_dict['type']['elementType']['fields']) | |
else: | |
# change type to string | |
json_dict['type'] = 'string' | |
return | |
elif json_dict['type']['type'] == 'struct': | |
enforce_stringtype(json_dict['type']['fields']) | |
#df : read data into this variable | |
# say the json column name is "json" | |
json_df = spark.read.json(df.rdd.map(lambda row: row.json)) | |
# get schema | |
json_schema = json_df.schema | |
# get schema metadata as a dictionary | |
json_dict = json.loads(json_schema.json()) | |
# enforce the string type | |
enforce_string(json_dict['fields']) | |
# convert schema back to StructType | |
enforced_schema = StructType.fromJson(json_dict) | |
# enforce the changed schema on the json | |
final_df = df.withColumn('type_safe_json', from_json(col('json'), enforced_schema)).select('type_safe_json') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment