Skip to content

Instantly share code, notes, and snippets.

@tomups
Last active March 25, 2025 19:34
Show Gist options
  • Save tomups/c25d0070e4af49bfc0c9140919dff1e7 to your computer and use it in GitHub Desktop.
Save tomups/c25d0070e4af49bfc0c9140919dff1e7 to your computer and use it in GitHub Desktop.
JSONPath to JSONata converter for AWS Step Functions
#!/usr/bin/env python3
"""
JSONPath to JSONata Converter for AWS Step Functions
This script helps with the initial conversion of AWS Step Functions state machines
from JSONPath to JSONata query language. It performs basic transformations but
complex expressions may need manual adjustment.
Usage:
python jsonpath_to_jsonata_converter.py input_file.json output_file.json
"""
import json
import re
import sys
from copy import deepcopy
def convert_jsonpath_to_jsonata(state_machine):
"""Convert a state machine from JSONPath to JSONata"""
# Create a deep copy to avoid modifying the original
result = deepcopy(state_machine)
# Add QueryLanguage at top level
result["QueryLanguage"] = "JSONata"
# Process each state
for state_name, state in result.get("States", {}).items():
convert_state(state_name, state)
return result
def convert_state(state_name, state):
"""Convert a single state from JSONPath to JSONata"""
state_type = state.get("Type")
if state_type == "Task":
convert_task_state(state)
elif state_type == "Choice":
convert_choice_state(state)
elif state_type == "Parallel":
convert_parallel_state(state)
elif state_type == "Map":
convert_map_state(state)
# Recursively process nested states
for nested_states in ["Branches", "Iterator"]:
if nested_states in state:
if isinstance(state[nested_states], list):
for branch in state[nested_states]:
for nested_state_name, nested_state in branch.get("States", {}).items():
convert_state(nested_state_name, nested_state)
elif isinstance(state[nested_states], dict) and "States" in state[nested_states]:
for nested_state_name, nested_state in state[nested_states].get("States", {}).items():
convert_state(nested_state_name, nested_state)
def convert_task_state(state):
"""Convert a Task state from JSONPath to JSONata"""
# Convert Parameters to Arguments
if "Parameters" in state:
state["Arguments"] = deepcopy(state["Parameters"])
process_nested_json_structure(state["Arguments"])
del state["Parameters"]
# Handle path fields
handle_result_path(state)
handle_output_path(state)
handle_input_path(state)
# Remove any ResultSelector
if "ResultSelector" in state:
del state["ResultSelector"]
def handle_result_path(state):
"""Handle ResultPath conversion"""
if "ResultPath" in state:
if state["ResultPath"]:
result_path = state["ResultPath"]
# Extract the variable name from the path (e.g., "$.result" -> "result")
match = re.match(r'\$\.([a-zA-Z0-9_]+)$', result_path)
if match:
var_name = match.group(1)
if "Assign" not in state:
state["Assign"] = {}
state["Assign"][var_name] = "{% $states.result %}"
# Keep Output field to pass through the result
state["Output"] = "{% $states.result %}"
del state["ResultPath"]
def handle_output_path(state):
"""Handle OutputPath conversion"""
if "OutputPath" in state:
# OutputPath is replaced by Output in JSONata
if state["OutputPath"] == "$":
state["Output"] = "{% $states.result %}"
elif state["OutputPath"]:
# Extract the field name from OutputPath (e.g., "$.field" -> "field")
match = re.match(r'\$\.([a-zA-Z0-9_]+)$', state["OutputPath"])
if match:
field = match.group(1)
state["Output"] = f"{{% $states.result.{field} %}}"
del state["OutputPath"]
def handle_input_path(state):
"""Handle InputPath conversion"""
if "InputPath" in state:
# InputPath is generally not needed in JSONata since
# we can reference input fields directly
del state["InputPath"]
def convert_choice_state(state):
"""Convert a Choice state from JSONPath to JSONata"""
if "Choices" in state:
for choice in state["Choices"]:
if "Variable" in choice:
variable = choice["Variable"]
# Extract the variable path (e.g., "$.status" -> "status")
# or nested path (e.g., "$.data.status" -> "data.status")
match = re.match(r'\$\.(.+)$', variable)
if match:
var_path = match.group(1)
# Create condition based on comparison operator
keys_to_remove = []
condition = None
for op, val in list(choice.items()):
if op in ["StringEquals", "StringEqualsPath",
"NumericEquals", "NumericEqualsPath",
"BooleanEquals", "BooleanEqualsPath",
"StringLessThan", "StringGreaterThan",
"NumericLessThan", "NumericGreaterThan",
"NumericLessThanEquals", "NumericGreaterThanEquals"]:
keys_to_remove.append(op)
# Different handling for path vs literal value comparisons
if op.endswith("Path"):
# For path comparisons, extract the comparison path
comp_match = re.match(r'\$\.(.+)$', val)
if comp_match:
comp_path = comp_match.group(1)
# Create appropriate conditional expression based on operator type
base_op = op[:-4] # Remove "Path" suffix
if base_op == "StringEquals":
condition = f"{{% $states.input.{var_path} = $states.input.{comp_path} %}}"
elif base_op == "NumericEquals":
condition = f"{{% $states.input.{var_path} = $states.input.{comp_path} %}}"
elif base_op == "BooleanEquals":
condition = f"{{% $states.input.{var_path} = $states.input.{comp_path} %}}"
elif base_op == "StringLessThan":
condition = f"{{% $states.input.{var_path} < $states.input.{comp_path} %}}"
elif base_op == "StringGreaterThan":
condition = f"{{% $states.input.{var_path} > $states.input.{comp_path} %}}"
elif base_op == "NumericLessThan":
condition = f"{{% $states.input.{var_path} < $states.input.{comp_path} %}}"
elif base_op == "NumericGreaterThan":
condition = f"{{% $states.input.{var_path} > $states.input.{comp_path} %}}"
elif base_op == "NumericLessThanEquals":
condition = f"{{% $states.input.{var_path} <= $states.input.{comp_path} %}}"
elif base_op == "NumericGreaterThanEquals":
condition = f"{{% $states.input.{var_path} >= $states.input.{comp_path} %}}"
else:
# For literal value comparisons
if op == "StringEquals":
condition = f"{{% $states.input.{var_path} = '{val}' %}}"
elif op == "NumericEquals":
condition = f"{{% $states.input.{var_path} = {val} %}}"
elif op == "BooleanEquals":
bool_val = str(val).lower() # Convert to lowercase string
condition = f"{{% $states.input.{var_path} = {bool_val} %}}"
elif op == "StringLessThan":
condition = f"{{% $states.input.{var_path} < '{val}' %}}"
elif op == "StringGreaterThan":
condition = f"{{% $states.input.{var_path} > '{val}' %}}"
elif op == "NumericLessThan":
condition = f"{{% $states.input.{var_path} < {val} %}}"
elif op == "NumericGreaterThan":
condition = f"{{% $states.input.{var_path} > {val} %}}"
elif op == "NumericLessThanEquals":
condition = f"{{% $states.input.{var_path} <= {val} %}}"
elif op == "NumericGreaterThanEquals":
condition = f"{{% $states.input.{var_path} >= {val} %}}"
# Remove the old comparison fields
for key in keys_to_remove:
del choice[key]
# Set the new Condition field if we created one
if condition:
choice["Condition"] = condition
# Remove the Variable field as it's no longer needed
del choice["Variable"]
def convert_parallel_state(state):
"""Convert a Parallel state from JSONPath to JSONata"""
# Handle path fields
handle_result_path(state)
handle_output_path(state)
handle_input_path(state)
def convert_map_state(state):
"""Convert a Map state from JSONPath to JSONata"""
# Handle ItemsPath conversion to Items
if "ItemsPath" in state:
items_path = state["ItemsPath"]
# Convert from $.items to $states.input.items
match = re.match(r'\$\.(.+)$', items_path)
if match:
field = match.group(1)
state["Items"] = f"{{% $states.input.{field} %}}"
del state["ItemsPath"]
# Handle path fields
handle_result_path(state)
handle_output_path(state)
handle_input_path(state)
# Handle ItemSelector conversion if present
if "ItemSelector" in state:
state["ItemProcessor"] = deepcopy(state["ItemSelector"])
process_nested_json_structure(state["ItemProcessor"])
del state["ItemSelector"]
def process_nested_json_structure(obj):
"""
Recursively process a JSON structure to convert JSONPath expressions to JSONata.
E.g., convert field.$: "$.value" to field: "{% $states.input.value %}"
"""
if isinstance(obj, dict):
# Find keys to remove (we'll add the converted versions)
keys_to_remove = []
keys_to_add = {}
# Process all keys in the dictionary
for key, value in list(obj.items()):
# Check if key ends with .$
if key.endswith('.$'):
base_key = key[:-2] # Remove .$ suffix
keys_to_remove.append(key)
# Handle different JSONPath patterns
if isinstance(value, str):
if value.startswith('$.'):
# Simple path reference: $.value -> $states.input.value
path = value[2:] # Remove $. prefix
keys_to_add[base_key] = f"{{% $states.input.{path} %}}"
elif value.startswith('$$.'):
# Context reference: $$.Execution.StartTime -> $states.context.Execution.StartTime
path = value[3:] # Remove $$. prefix
keys_to_add[base_key] = f"{{% $states.context.{path} %}}"
elif value.startswith('States.Format('):
# Format function: Extract parts and convert to string concatenation
# e.g., States.Format('{}/file.py', $.path) -> $states.input.path & '/file.py'
match = re.match(r'States\.Format\([\'"](.+?)[\'"]\s*,\s*(.+)\)', value)
if match:
template = match.group(1)
path_var = match.group(2)
# Convert template placeholders {} to string concatenation
parts = re.split(r'(\{\})', template)
result = []
# Handle the variable part
if path_var.startswith('$.'):
var_path = path_var[2:] # Remove $. prefix
var_expr = f"$states.input.{var_path}"
else:
var_expr = path_var
# Build the concatenation expression
for part in parts:
if part == '{}':
result.append(var_expr)
elif part: # Skip empty parts
result.append(f"'{part}'")
# Join with ampersand for concatenation
concatenated = " & ".join(result)
keys_to_add[base_key] = f"{{% {concatenated} %}}"
elif value.startswith('States.Array'):
# Handle array functions
if value.startswith('States.ArrayPartition('):
# States.ArrayPartition($.array, 2) -> $partition($states.input.array, 2)
match = re.match(r'States\.ArrayPartition\(\$\.(.+?)\s*,\s*(\d+)\)', value)
if match:
array_path = match.group(1)
chunk_size = match.group(2)
keys_to_add[base_key] = f"{{% $partition($states.input.{array_path}, {chunk_size}) %}}"
elif value.startswith('States.ArrayRange('):
# States.ArrayRange(1, 10, 2) -> $range(1, 10, 2)
match = re.match(r'States\.ArrayRange\((\d+)\s*,\s*(\d+)\s*,\s*(\d+)\)', value)
if match:
start = match.group(1)
end = match.group(2)
step = match.group(3)
keys_to_add[base_key] = f"{{% $range({start}, {end}, {step}) %}}"
elif value.startswith('States.Hash('):
# States.Hash($.data, "SHA-256") -> $hash($states.input.data, "SHA-256")
match = re.match(r'States\.Hash\(\$\.(.+?)\s*,\s*[\'"](.+?)[\'"]\)', value)
if match:
data_path = match.group(1)
hash_algo = match.group(2)
keys_to_add[base_key] = f"{{% $hash($states.input.{data_path}, '{hash_algo}') %}}"
elif value.startswith('States.UUID()'):
# States.UUID() -> $uuid()
keys_to_add[base_key] = "{% $uuid() %}"
elif value.startswith('States.MathRandom()'):
# States.MathRandom() -> $random()
keys_to_add[base_key] = "{% $random() %}"
elif value.startswith('States.MathRandom('):
# States.MathRandom(123) -> $random(123)
match = re.match(r'States\.MathRandom\((\d+)\)', value)
if match:
seed = match.group(1)
keys_to_add[base_key] = f"{{% $random({seed}) %}}"
else:
# Keep other expressions as placeholders for manual conversion
keys_to_add[base_key] = f"{{% /* MANUAL CONVERSION NEEDED: {value} */ %}}"
else:
# Recursively process nested structures
if isinstance(value, (dict, list)):
process_nested_json_structure(value)
# Remove the old .$ keys
for key in keys_to_remove:
del obj[key]
# Add the new converted keys
for key, value in keys_to_add.items():
obj[key] = value
elif isinstance(obj, list):
# Process each item in the list
for i, item in enumerate(obj):
if isinstance(item, (dict, list)):
process_nested_json_structure(item)
def main():
if len(sys.argv) != 3:
print("Usage: python jsonpath_to_jsonata_converter.py input_file.json output_file.json")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2]
with open(input_file, 'r') as f:
state_machine = json.load(f)
converted = convert_jsonpath_to_jsonata(state_machine)
with open(output_file, 'w') as f:
json.dump(converted, f, indent=2)
print(f"Conversion completed. Output written to {output_file}")
print("Note: Manual review and adjustments may be necessary for complex expressions.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment