Last active
May 9, 2023 14:42
-
-
Save noxify/9b9d933ac742937a6360366851da97cf to your computer and use it in GitHub Desktop.
Parquet generation via JSON-Schema
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// file: helpers/parquet.ts | |
import { | |
FieldDefinition, | |
ParquetType, | |
SchemaDefinition, | |
} from '@dsnp/parquetjs/dist/lib/declare' | |
import { JSONSchema4 } from 'json-schema' | |
export function createStringField({ | |
optional = true, | |
}: Partial<{ | |
optional: boolean | |
}>): Partial<FieldDefinition> { | |
return createField({ type: 'UTF8', optional }) | |
} | |
export function createBooleanField({ | |
optional = true, | |
}: { | |
optional?: boolean | |
}): Partial<FieldDefinition> { | |
return createField({ type: 'BOOLEAN', optional }) | |
} | |
export function createIntField({ | |
optional = true, | |
}: { | |
optional?: boolean | |
}): Partial<FieldDefinition> { | |
return createField({ type: 'INT64', optional }) | |
} | |
export function createFloatField({ | |
optional = true, | |
}: Partial<{ | |
optional: boolean | |
}>): Partial<FieldDefinition> { | |
return createField({ type: 'FLOAT', optional }) | |
} | |
export function createDecimalField({ | |
precision = 3, | |
optional = true, | |
}: Partial<{ | |
precision?: number | |
optional?: boolean | |
}>): Partial<FieldDefinition> { | |
return createField({ type: 'DECIMAL', precision, optional }) | |
} | |
export function createTimestampField({ | |
optional = true, | |
}: Partial<{ | |
optional?: boolean | |
}>) { | |
return createField({ type: 'TIMESTAMP_MILLIS', optional }) | |
} | |
export function createRepeatableStructField({ | |
fields, | |
}: { | |
fields: { [fieldName: string]: FieldDefinition } | |
}): Partial<FieldDefinition> { | |
return { | |
optional: true, | |
type: 'LIST', | |
fields: { | |
list: { | |
optional: false, | |
repeated: true, | |
fields: { | |
element: { | |
optional: true, | |
repeated: false, | |
fields: fields, | |
}, | |
}, | |
}, | |
}, | |
} | |
} | |
export function createStructField({ | |
fields, | |
}: { | |
fields: { [fieldName: string]: FieldDefinition } | |
}): Partial<FieldDefinition> { | |
return { | |
optional: true, | |
fields: fields, | |
} | |
} | |
export function createArrayField({ | |
type, | |
optional = true, | |
}: Partial<{ | |
type: ParquetType | |
optional?: boolean | |
}>): Partial<FieldDefinition> { | |
return createField({ | |
optional, | |
type: 'LIST', | |
fields: { | |
list: { | |
optional: false, | |
repeated: true, | |
fields: { | |
element: { | |
type, | |
optional: true, | |
}, | |
}, | |
}, | |
}, | |
}) | |
} | |
export function createField( | |
definition: FieldDefinition, | |
): Partial<FieldDefinition> { | |
return definition | |
} | |
export function createFromJsonSchema({ | |
definition, | |
}: { | |
definition: JSONSchema4 | |
}) { | |
const schema: SchemaDefinition = {} | |
for (const [fieldName, fieldValue] of Object.entries<JSONSchema4>( | |
definition.properties || {}, | |
)) { | |
switch (fieldValue.type) { | |
case 'string': | |
schema[fieldName] = createStringField({}) | |
break | |
case 'integer': | |
case 'number': | |
schema[fieldName] = createIntField({}) | |
break | |
case 'boolean': | |
schema[fieldName] = createBooleanField({}) | |
break | |
case 'array': | |
switch ((fieldValue.items as JSONSchema4[])[0].type) { | |
case 'string': | |
schema[fieldName] = createArrayField({ type: 'UTF8' }) | |
break | |
case 'integer': | |
case 'number': | |
schema[fieldName] = createArrayField({ type: 'INT64' }) | |
break | |
case 'boolean': | |
schema[fieldName] = createArrayField({ type: 'BOOLEAN' }) | |
break | |
case 'object': | |
schema[fieldName] = createRepeatableStructField({ | |
fields: createFromJsonSchema({ | |
definition: (fieldValue.items as JSONSchema4[])[0], | |
}), | |
}) | |
break | |
} | |
break | |
case 'object': | |
schema[fieldName] = { | |
fields: createFromJsonSchema({ definition: fieldValue }), | |
} | |
break | |
default: | |
console.warn( | |
`Unable to find a definition for field "${fieldName}" with JSON Schema type "${fieldValue.type}".`, | |
) | |
} | |
} | |
return schema | |
} | |
function convertTypeToSql({ | |
fieldName, | |
type, | |
isArray = false, | |
isObject = false, | |
fields = [], | |
}: { | |
fieldName: string | |
type: string | |
isArray?: boolean | |
isObject?: boolean | |
fields?: [string, string][] | |
}) { | |
if (!isObject && !isArray) { | |
return [fieldName, type] | |
} else if (!isObject && isArray) { | |
return [fieldName, `array(${type})`] | |
} else if (isObject && !isArray) { | |
const parsedSubFields = fields.map( | |
(fieldDefinition) => `${fieldDefinition[0]} ${fieldDefinition[1]}`, | |
) | |
return [fieldName, `ROW(${parsedSubFields.join(', ')})`] | |
} else { | |
throw new Error( | |
`The current combination of isArray and isObject for field "${fieldName}" is not allowed.`, | |
) | |
} | |
} | |
export function generateSqlSchema({ | |
definition, | |
}: { | |
definition: { [key: string]: any } | |
}): any[] { | |
const sqlSchema = [] | |
for (const [fieldName, fieldValue] of Object.entries<{ [key: string]: any }>( | |
definition, | |
)) { | |
switch (fieldValue.type) { | |
case 'string': | |
sqlSchema.push(convertTypeToSql({ fieldName, type: 'varchar' })) | |
break | |
case 'date-time': | |
sqlSchema.push(convertTypeToSql({ fieldName, type: 'timestamp(3)' })) | |
break | |
case 'integer': | |
case 'number': | |
sqlSchema.push(convertTypeToSql({ fieldName, type: 'bigint' })) | |
break | |
case 'boolean': | |
sqlSchema.push(convertTypeToSql({ fieldName, type: 'boolean' })) | |
break | |
case 'array': | |
const arrayItems = !Array.isArray(fieldValue.items) | |
? [fieldValue.items] | |
: fieldValue.items | |
switch (arrayItems[0].type) { | |
case 'string': | |
sqlSchema.push( | |
convertTypeToSql({ fieldName, type: 'varchar', isArray: true }), | |
) | |
break | |
case 'integer': | |
case 'number': | |
sqlSchema.push( | |
convertTypeToSql({ fieldName, type: 'bigint', isArray: true }), | |
) | |
break | |
case 'boolean': | |
sqlSchema.push( | |
convertTypeToSql({ fieldName, type: 'boolean', isArray: true }), | |
) | |
break | |
case 'object': | |
const tmpFields = generateSqlSchema({ | |
definition: arrayItems[0].properties, | |
}) | |
const tmpSchema = convertTypeToSql({ | |
fieldName, | |
type: 'ROW', | |
isObject: true, | |
fields: tmpFields, | |
}) | |
sqlSchema.push( | |
convertTypeToSql({ | |
fieldName, | |
type: tmpSchema[1], | |
isArray: true, | |
}), | |
) | |
break | |
} | |
break | |
case 'object': | |
const tmpFields = generateSqlSchema({ | |
definition: fieldValue.properties, | |
}) | |
sqlSchema.push( | |
convertTypeToSql({ | |
fieldName, | |
type: 'row', | |
isObject: true, | |
fields: tmpFields, | |
}), | |
) | |
break | |
default: | |
console.warn( | |
`Unable to find a definition for field "${fieldName}" with JSON Schema type "${fieldValue.type}".`, | |
) | |
} | |
} | |
return sqlSchema | |
} | |
function parquetArrayTransformer({ | |
fieldName, | |
definition, | |
}: { | |
fieldName: string | |
definition: any | |
}) { | |
return [ | |
`${fieldName}`, | |
{ | |
'list[]': { | |
element: | |
definition.type === 'object' | |
? parquetTransformer({ | |
definition: definition.properties, | |
}) | |
: {}, | |
}, | |
}, | |
] | |
} | |
export function parquetTransformer({ definition }: { definition: any }) { | |
const res: any = {} | |
for (const [fieldName, fieldDefinition] of Object.entries<JSONSchema4>( | |
definition, | |
)) { | |
if (fieldDefinition.type !== 'object' && fieldDefinition.type !== 'array') { | |
res[fieldName] = fieldName | |
} | |
if (fieldDefinition.type === 'object') { | |
res[fieldName] = [ | |
fieldName, | |
parquetTransformer({ | |
definition: fieldDefinition.properties, | |
}), | |
] | |
} | |
if (fieldDefinition.type === 'array') { | |
res[fieldName] = parquetArrayTransformer({ | |
fieldName, | |
definition: (fieldDefinition.items as JSONSchema4[])[0], | |
}) | |
} | |
} | |
return res | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"dependencies": { | |
"map-transform": "0.4.1", | |
"@dsnp/parquetjs": "1.2.3" | |
}, | |
"devDependencies": { | |
"@types/json-schema": "7.0.11" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default { | |
type: 'object', | |
properties: { | |
string_field: { | |
type: 'string', | |
}, | |
int_field: { | |
type: 'integer', | |
}, | |
array_field: { | |
type: 'array', | |
items: [ | |
{ | |
type: 'string', | |
}, | |
], | |
additionalItems: false, | |
}, | |
timestamp_field: { | |
type: 'string', | |
}, | |
obj_field: { | |
type: 'object', | |
properties: { | |
sub1: { | |
type: 'string', | |
}, | |
sub2: { | |
type: 'string', | |
}, | |
}, | |
additionalProperties: false, | |
}, | |
struct_field: { | |
type: 'array', | |
items: [ | |
{ | |
type: 'object', | |
properties: { | |
sub3: { | |
type: 'string', | |
}, | |
sub4: { | |
type: 'string', | |
}, | |
sub5: { | |
type: 'object', | |
properties: { | |
sub6: { | |
type: 'string', | |
}, | |
sub7: { | |
type: 'string', | |
}, | |
}, | |
additionalProperties: false, | |
}, | |
sub8: { | |
type: 'array', | |
items: [ | |
{ | |
type: 'string', | |
}, | |
{ | |
type: 'string', | |
}, | |
], | |
additionalItems: false, | |
}, | |
}, | |
additionalProperties: false, | |
}, | |
], | |
additionalItems: false, | |
}, | |
}, | |
additionalProperties: false, | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import jsonSchema from './test-schema' | |
import mapTransform from 'map-transform' | |
import parquetjs from '@dsnp/parquetjs' | |
import { createFromJsonSchema } from 'helpers/parquet.js' | |
import path from 'path' | |
const userInput = [ | |
{ | |
"string_field": true, | |
"int_field": 343, | |
"array_field": ["ele1", "ele2"], | |
"timestamp_field": "2016-07-20T17:30:15+05:30", | |
"obj_field": { | |
"sub1": "sub1val", | |
"sub2": "sub2val" | |
}, | |
"struct_field": [ | |
{ | |
"sub3": "sub3val1", | |
"sub4": "sub4val1", | |
"sub5": { | |
"sub6": null, | |
"sub7": "sub7val" | |
}, | |
"sub8": ["ele1", "ele2"] | |
} | |
] | |
} | |
] | |
// Generate a valid `map-transform` definition | |
// based on the given JSON Schema | |
const transformerDefinition = [ | |
'data[]', | |
{ | |
$iterate: true, | |
...parquetTransformer({ | |
definition: jsonSchema.properties, | |
}), | |
}, | |
] | |
// Generate a Schema for our Parquet file | |
const generatedParquetSchema = createFromJsonSchema({ | |
definition: jsonSchema as JSONSchema4, | |
}) | |
// Transform the given user input into the needed | |
// structure for `parquetjs.appendRow` | |
// this is required if you plan to have simple arrays or complex structures | |
// otherwise it's possible that the parquet file can't be read from 3rd party tools | |
// we have tested it via | |
// * PyCharm Parquet Viewer Plugin ( v3.0.0 ) | |
// https://github.com/benwatson528/intellij-avro-parquet-plugin/releases/tag/v3.0.0 | |
// * Our trino cluster | |
const parquetData = mapTransform(transformerDefinition)({ | |
data: userInput, | |
}) | |
const parquetFilePath = path.resolve('test_parquet.parquet') | |
const writer = await parquetjs.ParquetWriter.openFile( | |
parquetSchema, | |
parquetFilePath, | |
) | |
for (const input of parquetData) { | |
await writer.appendRow({ | |
...input | |
}) | |
} | |
await writer.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment