Last active
March 25, 2025 19:34
-
-
Save tomups/c25d0070e4af49bfc0c9140919dff1e7 to your computer and use it in GitHub Desktop.
JSONPath to JSONata converter for AWS Step Functions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
JSONPath to JSONata Converter for AWS Step Functions | |
This script helps with the initial conversion of AWS Step Functions state machines | |
from JSONPath to JSONata query language. It performs basic transformations but | |
complex expressions may need manual adjustment. | |
Usage: | |
python jsonpath_to_jsonata_converter.py input_file.json output_file.json | |
""" | |
import json | |
import re | |
import sys | |
from copy import deepcopy | |
def convert_jsonpath_to_jsonata(state_machine): | |
"""Convert a state machine from JSONPath to JSONata""" | |
# Create a deep copy to avoid modifying the original | |
result = deepcopy(state_machine) | |
# Add QueryLanguage at top level | |
result["QueryLanguage"] = "JSONata" | |
# Process each state | |
for state_name, state in result.get("States", {}).items(): | |
convert_state(state_name, state) | |
return result | |
def convert_state(state_name, state): | |
"""Convert a single state from JSONPath to JSONata""" | |
state_type = state.get("Type") | |
if state_type == "Task": | |
convert_task_state(state) | |
elif state_type == "Choice": | |
convert_choice_state(state) | |
elif state_type == "Parallel": | |
convert_parallel_state(state) | |
elif state_type == "Map": | |
convert_map_state(state) | |
# Recursively process nested states | |
for nested_states in ["Branches", "Iterator"]: | |
if nested_states in state: | |
if isinstance(state[nested_states], list): | |
for branch in state[nested_states]: | |
for nested_state_name, nested_state in branch.get("States", {}).items(): | |
convert_state(nested_state_name, nested_state) | |
elif isinstance(state[nested_states], dict) and "States" in state[nested_states]: | |
for nested_state_name, nested_state in state[nested_states].get("States", {}).items(): | |
convert_state(nested_state_name, nested_state) | |
def convert_task_state(state): | |
"""Convert a Task state from JSONPath to JSONata""" | |
# Convert Parameters to Arguments | |
if "Parameters" in state: | |
state["Arguments"] = deepcopy(state["Parameters"]) | |
process_nested_json_structure(state["Arguments"]) | |
del state["Parameters"] | |
# Handle path fields | |
handle_result_path(state) | |
handle_output_path(state) | |
handle_input_path(state) | |
# Remove any ResultSelector | |
if "ResultSelector" in state: | |
del state["ResultSelector"] | |
def handle_result_path(state): | |
"""Handle ResultPath conversion""" | |
if "ResultPath" in state: | |
if state["ResultPath"]: | |
result_path = state["ResultPath"] | |
# Extract the variable name from the path (e.g., "$.result" -> "result") | |
match = re.match(r'\$\.([a-zA-Z0-9_]+)$', result_path) | |
if match: | |
var_name = match.group(1) | |
if "Assign" not in state: | |
state["Assign"] = {} | |
state["Assign"][var_name] = "{% $states.result %}" | |
# Keep Output field to pass through the result | |
state["Output"] = "{% $states.result %}" | |
del state["ResultPath"] | |
def handle_output_path(state): | |
"""Handle OutputPath conversion""" | |
if "OutputPath" in state: | |
# OutputPath is replaced by Output in JSONata | |
if state["OutputPath"] == "$": | |
state["Output"] = "{% $states.result %}" | |
elif state["OutputPath"]: | |
# Extract the field name from OutputPath (e.g., "$.field" -> "field") | |
match = re.match(r'\$\.([a-zA-Z0-9_]+)$', state["OutputPath"]) | |
if match: | |
field = match.group(1) | |
state["Output"] = f"{{% $states.result.{field} %}}" | |
del state["OutputPath"] | |
def handle_input_path(state): | |
"""Handle InputPath conversion""" | |
if "InputPath" in state: | |
# InputPath is generally not needed in JSONata since | |
# we can reference input fields directly | |
del state["InputPath"] | |
def convert_choice_state(state): | |
"""Convert a Choice state from JSONPath to JSONata""" | |
if "Choices" in state: | |
for choice in state["Choices"]: | |
if "Variable" in choice: | |
variable = choice["Variable"] | |
# Extract the variable path (e.g., "$.status" -> "status") | |
# or nested path (e.g., "$.data.status" -> "data.status") | |
match = re.match(r'\$\.(.+)$', variable) | |
if match: | |
var_path = match.group(1) | |
# Create condition based on comparison operator | |
keys_to_remove = [] | |
condition = None | |
for op, val in list(choice.items()): | |
if op in ["StringEquals", "StringEqualsPath", | |
"NumericEquals", "NumericEqualsPath", | |
"BooleanEquals", "BooleanEqualsPath", | |
"StringLessThan", "StringGreaterThan", | |
"NumericLessThan", "NumericGreaterThan", | |
"NumericLessThanEquals", "NumericGreaterThanEquals"]: | |
keys_to_remove.append(op) | |
# Different handling for path vs literal value comparisons | |
if op.endswith("Path"): | |
# For path comparisons, extract the comparison path | |
comp_match = re.match(r'\$\.(.+)$', val) | |
if comp_match: | |
comp_path = comp_match.group(1) | |
# Create appropriate conditional expression based on operator type | |
base_op = op[:-4] # Remove "Path" suffix | |
if base_op == "StringEquals": | |
condition = f"{{% $states.input.{var_path} = $states.input.{comp_path} %}}" | |
elif base_op == "NumericEquals": | |
condition = f"{{% $states.input.{var_path} = $states.input.{comp_path} %}}" | |
elif base_op == "BooleanEquals": | |
condition = f"{{% $states.input.{var_path} = $states.input.{comp_path} %}}" | |
elif base_op == "StringLessThan": | |
condition = f"{{% $states.input.{var_path} < $states.input.{comp_path} %}}" | |
elif base_op == "StringGreaterThan": | |
condition = f"{{% $states.input.{var_path} > $states.input.{comp_path} %}}" | |
elif base_op == "NumericLessThan": | |
condition = f"{{% $states.input.{var_path} < $states.input.{comp_path} %}}" | |
elif base_op == "NumericGreaterThan": | |
condition = f"{{% $states.input.{var_path} > $states.input.{comp_path} %}}" | |
elif base_op == "NumericLessThanEquals": | |
condition = f"{{% $states.input.{var_path} <= $states.input.{comp_path} %}}" | |
elif base_op == "NumericGreaterThanEquals": | |
condition = f"{{% $states.input.{var_path} >= $states.input.{comp_path} %}}" | |
else: | |
# For literal value comparisons | |
if op == "StringEquals": | |
condition = f"{{% $states.input.{var_path} = '{val}' %}}" | |
elif op == "NumericEquals": | |
condition = f"{{% $states.input.{var_path} = {val} %}}" | |
elif op == "BooleanEquals": | |
bool_val = str(val).lower() # Convert to lowercase string | |
condition = f"{{% $states.input.{var_path} = {bool_val} %}}" | |
elif op == "StringLessThan": | |
condition = f"{{% $states.input.{var_path} < '{val}' %}}" | |
elif op == "StringGreaterThan": | |
condition = f"{{% $states.input.{var_path} > '{val}' %}}" | |
elif op == "NumericLessThan": | |
condition = f"{{% $states.input.{var_path} < {val} %}}" | |
elif op == "NumericGreaterThan": | |
condition = f"{{% $states.input.{var_path} > {val} %}}" | |
elif op == "NumericLessThanEquals": | |
condition = f"{{% $states.input.{var_path} <= {val} %}}" | |
elif op == "NumericGreaterThanEquals": | |
condition = f"{{% $states.input.{var_path} >= {val} %}}" | |
# Remove the old comparison fields | |
for key in keys_to_remove: | |
del choice[key] | |
# Set the new Condition field if we created one | |
if condition: | |
choice["Condition"] = condition | |
# Remove the Variable field as it's no longer needed | |
del choice["Variable"] | |
def convert_parallel_state(state): | |
"""Convert a Parallel state from JSONPath to JSONata""" | |
# Handle path fields | |
handle_result_path(state) | |
handle_output_path(state) | |
handle_input_path(state) | |
def convert_map_state(state): | |
"""Convert a Map state from JSONPath to JSONata""" | |
# Handle ItemsPath conversion to Items | |
if "ItemsPath" in state: | |
items_path = state["ItemsPath"] | |
# Convert from $.items to $states.input.items | |
match = re.match(r'\$\.(.+)$', items_path) | |
if match: | |
field = match.group(1) | |
state["Items"] = f"{{% $states.input.{field} %}}" | |
del state["ItemsPath"] | |
# Handle path fields | |
handle_result_path(state) | |
handle_output_path(state) | |
handle_input_path(state) | |
# Handle ItemSelector conversion if present | |
if "ItemSelector" in state: | |
state["ItemProcessor"] = deepcopy(state["ItemSelector"]) | |
process_nested_json_structure(state["ItemProcessor"]) | |
del state["ItemSelector"] | |
def process_nested_json_structure(obj): | |
""" | |
Recursively process a JSON structure to convert JSONPath expressions to JSONata. | |
E.g., convert field.$: "$.value" to field: "{% $states.input.value %}" | |
""" | |
if isinstance(obj, dict): | |
# Find keys to remove (we'll add the converted versions) | |
keys_to_remove = [] | |
keys_to_add = {} | |
# Process all keys in the dictionary | |
for key, value in list(obj.items()): | |
# Check if key ends with .$ | |
if key.endswith('.$'): | |
base_key = key[:-2] # Remove .$ suffix | |
keys_to_remove.append(key) | |
# Handle different JSONPath patterns | |
if isinstance(value, str): | |
if value.startswith('$.'): | |
# Simple path reference: $.value -> $states.input.value | |
path = value[2:] # Remove $. prefix | |
keys_to_add[base_key] = f"{{% $states.input.{path} %}}" | |
elif value.startswith('$$.'): | |
# Context reference: $$.Execution.StartTime -> $states.context.Execution.StartTime | |
path = value[3:] # Remove $$. prefix | |
keys_to_add[base_key] = f"{{% $states.context.{path} %}}" | |
elif value.startswith('States.Format('): | |
# Format function: Extract parts and convert to string concatenation | |
# e.g., States.Format('{}/file.py', $.path) -> $states.input.path & '/file.py' | |
match = re.match(r'States\.Format\([\'"](.+?)[\'"]\s*,\s*(.+)\)', value) | |
if match: | |
template = match.group(1) | |
path_var = match.group(2) | |
# Convert template placeholders {} to string concatenation | |
parts = re.split(r'(\{\})', template) | |
result = [] | |
# Handle the variable part | |
if path_var.startswith('$.'): | |
var_path = path_var[2:] # Remove $. prefix | |
var_expr = f"$states.input.{var_path}" | |
else: | |
var_expr = path_var | |
# Build the concatenation expression | |
for part in parts: | |
if part == '{}': | |
result.append(var_expr) | |
elif part: # Skip empty parts | |
result.append(f"'{part}'") | |
# Join with ampersand for concatenation | |
concatenated = " & ".join(result) | |
keys_to_add[base_key] = f"{{% {concatenated} %}}" | |
elif value.startswith('States.Array'): | |
# Handle array functions | |
if value.startswith('States.ArrayPartition('): | |
# States.ArrayPartition($.array, 2) -> $partition($states.input.array, 2) | |
match = re.match(r'States\.ArrayPartition\(\$\.(.+?)\s*,\s*(\d+)\)', value) | |
if match: | |
array_path = match.group(1) | |
chunk_size = match.group(2) | |
keys_to_add[base_key] = f"{{% $partition($states.input.{array_path}, {chunk_size}) %}}" | |
elif value.startswith('States.ArrayRange('): | |
# States.ArrayRange(1, 10, 2) -> $range(1, 10, 2) | |
match = re.match(r'States\.ArrayRange\((\d+)\s*,\s*(\d+)\s*,\s*(\d+)\)', value) | |
if match: | |
start = match.group(1) | |
end = match.group(2) | |
step = match.group(3) | |
keys_to_add[base_key] = f"{{% $range({start}, {end}, {step}) %}}" | |
elif value.startswith('States.Hash('): | |
# States.Hash($.data, "SHA-256") -> $hash($states.input.data, "SHA-256") | |
match = re.match(r'States\.Hash\(\$\.(.+?)\s*,\s*[\'"](.+?)[\'"]\)', value) | |
if match: | |
data_path = match.group(1) | |
hash_algo = match.group(2) | |
keys_to_add[base_key] = f"{{% $hash($states.input.{data_path}, '{hash_algo}') %}}" | |
elif value.startswith('States.UUID()'): | |
# States.UUID() -> $uuid() | |
keys_to_add[base_key] = "{% $uuid() %}" | |
elif value.startswith('States.MathRandom()'): | |
# States.MathRandom() -> $random() | |
keys_to_add[base_key] = "{% $random() %}" | |
elif value.startswith('States.MathRandom('): | |
# States.MathRandom(123) -> $random(123) | |
match = re.match(r'States\.MathRandom\((\d+)\)', value) | |
if match: | |
seed = match.group(1) | |
keys_to_add[base_key] = f"{{% $random({seed}) %}}" | |
else: | |
# Keep other expressions as placeholders for manual conversion | |
keys_to_add[base_key] = f"{{% /* MANUAL CONVERSION NEEDED: {value} */ %}}" | |
else: | |
# Recursively process nested structures | |
if isinstance(value, (dict, list)): | |
process_nested_json_structure(value) | |
# Remove the old .$ keys | |
for key in keys_to_remove: | |
del obj[key] | |
# Add the new converted keys | |
for key, value in keys_to_add.items(): | |
obj[key] = value | |
elif isinstance(obj, list): | |
# Process each item in the list | |
for i, item in enumerate(obj): | |
if isinstance(item, (dict, list)): | |
process_nested_json_structure(item) | |
def main(): | |
if len(sys.argv) != 3: | |
print("Usage: python jsonpath_to_jsonata_converter.py input_file.json output_file.json") | |
sys.exit(1) | |
input_file = sys.argv[1] | |
output_file = sys.argv[2] | |
with open(input_file, 'r') as f: | |
state_machine = json.load(f) | |
converted = convert_jsonpath_to_jsonata(state_machine) | |
with open(output_file, 'w') as f: | |
json.dump(converted, f, indent=2) | |
print(f"Conversion completed. Output written to {output_file}") | |
print("Note: Manual review and adjustments may be necessary for complex expressions.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment