Last active
March 2, 2025 19:20
-
-
Save michiel/23da8df0ee4340eb9ca0be97ff670277 to your computer and use it in GitHub Desktop.
Deep clone Jira epic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from atlassian import Jira | |
# Jira Configuration | |
JIRA_URL = "https://your-jira-instance.atlassian.net" | |
JIRA_USER = "[email protected]" | |
JIRA_API_TOKEN = "your-api-token" | |
# Initialize Jira connection | |
jira = Jira(url=JIRA_URL, username=JIRA_USER, password=JIRA_API_TOKEN, cloud=True) | |
# For debugging - reverse mapping from field ID to field name | |
FIELD_ID_TO_NAME = {} # Will be populated in get_all_fields | |
EXCLUDED_FIELDS = { | |
"comment", "attachment", "issueLinks", "worklog", "timespent", "subtasks", | |
"votes", "watches", "aggregatetimeoriginalestimate", "aggregatetimeestimate", | |
"aggregateprogress", "progress", "timetracking", "thumbnail", "environment", | |
"duedate", "lastViewed", "resolutiondate", "created", "updated", "status" | |
} | |
DYNAMICALLY_EXCLUDED_FIELDS = set() # Tracks fields rejected by Jira | |
# Track fields that need specific formats | |
ARRAY_FIELDS = set() # Fields that must be arrays | |
OBJECT_FIELDS = set() # Fields that must be objects | |
def get_editable_fields(issue_key): | |
""" Returns a set of fields that can be set for the given issue type """ | |
meta = jira.get(f"rest/api/2/issue/{issue_key}/editmeta") | |
return set(meta['fields'].keys()) | |
def get_all_fields(): | |
""" Retrieve all Jira fields and return a mapping {field_name: field_id} """ | |
fields = jira.get("rest/api/2/field") | |
name_to_id = {} | |
# Also populate the reverse mapping for debugging | |
global FIELD_ID_TO_NAME | |
FIELD_ID_TO_NAME = {} | |
for field in fields: | |
name_to_id[field['name']] = field['id'] | |
FIELD_ID_TO_NAME[field['id']] = field['name'] | |
return name_to_id | |
def safe_dict_value(obj, key, default=None): | |
"""Safely get a value from a dictionary without raising KeyError""" | |
if isinstance(obj, dict) and key in obj: | |
return obj[key] | |
return default | |
def sanitize_field_value(field_name, field_value): | |
"""Sanitize a field value based on field name patterns""" | |
if field_value is None: | |
return None | |
# Handle operations - must be strings | |
if "operation" in field_name.lower(): | |
return str(field_value) | |
# Handle numeric fields | |
if any(term in field_name.lower() for term in ["id", "count", "number", "estimate", "spent"]): | |
if isinstance(field_value, str) and field_value.isdigit(): | |
return int(field_value) | |
# Handle dictionary with missing id/value | |
if isinstance(field_value, dict): | |
# Ensure objects have required fields | |
if "key" not in field_value and "id" not in field_value and "value" not in field_value: | |
if "name" in field_value: | |
return {"name": field_value["name"]} | |
elif len(field_value) == 0: | |
return None # Empty dict should be null | |
return field_value | |
def debug_field_value(field_id, value): | |
"""Return a debug string for a field value""" | |
field_name = FIELD_ID_TO_NAME.get(field_id, "Unknown Field") | |
value_type = type(value).__name__ | |
value_str = str(value) | |
# Truncate long values | |
if len(value_str) > 100: | |
value_str = value_str[:97] + "..." | |
return f"{field_id} ({field_name}) = {value_str} [Type: {value_type}]" | |
def filter_relevant_fields(issue_fields, editable_fields, field_mapping): | |
""" Extracts relevant fields ensuring only editable ones are kept """ | |
allowed_keys = {"summary", "description", "issuetype", "project", "priority"} | |
relevant_fields = {} | |
# Process standard fields | |
for key in allowed_keys: | |
if key in issue_fields: | |
# Handle special case for issuetype | |
if key == "issuetype" and isinstance(issue_fields[key], dict): | |
if "name" in issue_fields[key]: | |
relevant_fields[key] = {"name": issue_fields[key]["name"]} | |
else: | |
relevant_fields[key] = issue_fields[key] | |
# Add custom fields only if they are editable and not excluded | |
for field_name, field_id in field_mapping.items(): | |
if ( | |
field_id in issue_fields | |
and field_id in editable_fields | |
and field_id not in EXCLUDED_FIELDS | |
and field_id not in DYNAMICALLY_EXCLUDED_FIELDS | |
): | |
# Skip if the value is None | |
if issue_fields[field_id] is None: | |
continue | |
# Apply general sanitization first based on field name patterns | |
sanitized_value = sanitize_field_value(field_name, issue_fields[field_id]) | |
if sanitized_value is None: | |
continue | |
# Check if we already know the format requirement for this field | |
if field_id in ARRAY_FIELDS: | |
# This field needs to be an array | |
if not isinstance(sanitized_value, list): | |
# Convert to array | |
relevant_fields[field_id] = [sanitized_value] | |
else: | |
relevant_fields[field_id] = sanitized_value | |
continue | |
if field_id in OBJECT_FIELDS: | |
# This field needs to be an object | |
if isinstance(sanitized_value, list) and len(sanitized_value) == 1 and isinstance(sanitized_value[0], dict) and "value" in sanitized_value[0]: | |
# Convert array with single value to object | |
relevant_fields[field_id] = {"value": sanitized_value[0]["value"]} | |
elif isinstance(sanitized_value, list) and len(sanitized_value) > 0 and all(isinstance(item, dict) and "value" in item for item in sanitized_value): | |
# Take first value from list of values | |
relevant_fields[field_id] = {"value": sanitized_value[0]["value"]} | |
elif isinstance(sanitized_value, dict): | |
relevant_fields[field_id] = sanitized_value | |
else: | |
# For non-dict values, wrap in a value object | |
relevant_fields[field_id] = {"value": str(sanitized_value)} | |
continue | |
# Special handling for Sprint field | |
if field_name == "Sprint" or "sprint" in field_name.lower(): | |
# Sprint requires numeric IDs | |
if isinstance(sanitized_value, list): | |
sprint_ids = [] | |
for sprint in sanitized_value: | |
if isinstance(sprint, dict) and 'id' in sprint: | |
sprint_ids.append(sprint['id']) | |
elif isinstance(sprint, (int, str)) and str(sprint).isdigit(): | |
sprint_ids.append(int(sprint)) | |
if sprint_ids: | |
relevant_fields[field_id] = sprint_ids | |
elif isinstance(sanitized_value, (int, str)) and str(sanitized_value).isdigit(): | |
# Handle case where it's a single sprint ID | |
relevant_fields[field_id] = [int(sanitized_value)] | |
else: | |
# Skip this field if we can't extract proper sprint IDs | |
continue | |
# Handle operation fields that require string values | |
elif field_name.startswith("Operation") or "operation" in field_name.lower(): | |
if sanitized_value is not None: | |
# Ensure the value is a string | |
relevant_fields[field_id] = str(sanitized_value) | |
# Handle parent field specially | |
elif field_name.lower() == "parent" or "parent" in field_id.lower(): | |
if isinstance(sanitized_value, dict) and "key" in sanitized_value: | |
relevant_fields[field_id] = {"key": sanitized_value["key"]} | |
elif isinstance(sanitized_value, str): | |
# If it's just a string, assume it's a key | |
relevant_fields[field_id] = {"key": sanitized_value} | |
# Handle Epic Link field specially | |
elif "epic" in field_name.lower() and "link" in field_name.lower(): | |
if isinstance(sanitized_value, dict) and "key" in sanitized_value: | |
relevant_fields[field_id] = sanitized_value["key"] | |
else: | |
# For Epic Link, we usually just need the key string | |
relevant_fields[field_id] = str(sanitized_value) | |
# Handle array/list fields that contain a single value/object with "value" | |
elif isinstance(sanitized_value, list) and len(sanitized_value) == 1 and isinstance(sanitized_value[0], dict) and "value" in sanitized_value[0]: | |
# CRITICAL FIX: For array with single value object, use object directly instead of array | |
relevant_fields[field_id] = {"value": sanitized_value[0]["value"]} | |
# Handle array/list fields | |
elif isinstance(sanitized_value, list): | |
# Process each item in the list to ensure proper formatting | |
processed_list = [] | |
for item in sanitized_value: | |
if isinstance(item, dict) and 'id' in item: | |
# For objects with IDs, send just the ID which is often what's needed | |
processed_list.append(item['id']) | |
elif isinstance(item, dict) and 'key' in item: | |
# For objects with keys, send just the key | |
processed_list.append({"key": item['key']}) | |
elif isinstance(item, dict) and 'value' in item: | |
# For objects with values, send just the value | |
processed_list.append(item['value']) | |
else: | |
# For simple values or other objects, keep as is | |
processed_list.append(item) | |
if processed_list: # Only add if not empty | |
relevant_fields[field_id] = processed_list | |
# Handle object fields with ID or key | |
elif isinstance(sanitized_value, dict): | |
if 'id' in sanitized_value: | |
# Some fields expect just the ID (like custom fields) | |
if "customfield_" in field_id: | |
relevant_fields[field_id] = sanitized_value['id'] | |
else: | |
# Others expect the whole object | |
relevant_fields[field_id] = {"id": sanitized_value['id']} | |
elif 'key' in sanitized_value: | |
# Fields with keys usually expect the object | |
relevant_fields[field_id] = {"key": sanitized_value['key']} | |
elif 'name' in sanitized_value: | |
# Fields with names usually expect the object | |
relevant_fields[field_id] = {"name": sanitized_value['name']} | |
elif 'value' in sanitized_value: | |
# For value objects, sometimes just the value is needed | |
relevant_fields[field_id] = sanitized_value | |
else: | |
# Only add if the dict is not empty | |
if sanitized_value: | |
relevant_fields[field_id] = sanitized_value | |
else: | |
# For primitive values (string, number, etc.) | |
relevant_fields[field_id] = sanitized_value | |
return relevant_fields | |
def create_issue_with_retries(issue_fields): | |
""" Creates an issue and dynamically excludes failing fields if necessary """ | |
retry_fields = issue_fields.copy() | |
max_retries = 15 # Prevent infinite loops | |
retry_count = 0 | |
# Print all fields for debugging | |
print("\n🔍 DEBUG: All fields being sent to Jira:") | |
for field_id, value in sorted(retry_fields.items()): | |
print(f" - {debug_field_value(field_id, value)}") | |
# Ensure required fields are present and in correct format | |
if "issuetype" not in retry_fields or not isinstance(retry_fields["issuetype"], dict): | |
if "issuetype" in retry_fields: | |
# Try to convert to proper format | |
if isinstance(retry_fields["issuetype"], str): | |
retry_fields["issuetype"] = {"name": retry_fields["issuetype"]} | |
else: | |
# If we can't fix it, remove it (will cause error but prevent unexpected behavior) | |
del retry_fields["issuetype"] | |
# Ensure project is correctly formatted | |
if "project" in retry_fields and not isinstance(retry_fields["project"], dict): | |
if isinstance(retry_fields["project"], str): | |
if retry_fields["project"].isdigit(): | |
retry_fields["project"] = {"id": retry_fields["project"]} | |
else: | |
retry_fields["project"] = {"key": retry_fields["project"]} | |
print(f"🔄 Creating issue with {len(retry_fields)} fields") | |
while retry_count < max_retries: | |
try: | |
result = jira.issue_create(retry_fields) # Attempt issue creation | |
print(f"✅ Successfully created issue with {len(retry_fields)} fields") | |
return result | |
except Exception as e: | |
retry_count += 1 | |
error_msg = str(e) | |
print(f"\n⚠️ ERROR CREATING ISSUE (attempt {retry_count}/{max_retries}):") | |
print(f"Error message: {error_msg}") | |
failed_field = None | |
# Error patterns and their expected types | |
type_errors = [ | |
("Number value expected", "number"), | |
("Operation value must be a string", "string"), | |
("String value expected", "string"), | |
("Array value expected", "array"), | |
("Object value expected", "object"), | |
("Boolean value expected", "boolean"), | |
("Could not find valid 'id' or 'value'", "object_with_id"), | |
("data was not an array", "array"), # New error pattern | |
("cannot be set", None), | |
("Field '", None), # Generic field error pattern | |
("is not on the appropriate screen", None), | |
("Field value is required", None) | |
] | |
# Check all fields mentioned in the error message | |
suspicious_fields = [] | |
for field_id in retry_fields.keys(): | |
if field_id in error_msg: | |
suspicious_fields.append(field_id) | |
if suspicious_fields: | |
print("🔍 Fields mentioned in error message:") | |
for field_id in suspicious_fields: | |
print(f" - {debug_field_value(field_id, retry_fields[field_id])}") | |
# Use the first suspicious field as our failed field | |
failed_field = suspicious_fields[0] | |
print(f"📍 Selecting first suspicious field: {failed_field}") | |
else: | |
# Look for patterns in the error message | |
matched_pattern = None | |
for error_pattern, expected_type in type_errors: | |
if error_pattern in error_msg: | |
matched_pattern = error_pattern | |
print(f"📍 Error pattern matched: {error_pattern}") | |
# Special case handling | |
if "Parent Option object" in error_msg: | |
parent_fields = [] | |
for field_id, value in retry_fields.items(): | |
# Check if this might be a parent-related field | |
field_name = FIELD_ID_TO_NAME.get(field_id, "").lower() | |
if (field_id == "parent" or | |
"parent" in field_name or | |
"epic" in field_name): | |
parent_fields.append(field_id) | |
print(f" - Potential parent field: {debug_field_value(field_id, value)}") | |
if parent_fields: | |
failed_field = parent_fields[0] | |
else: | |
# Look for any fields with empty dictionary values | |
for field_id, value in retry_fields.items(): | |
if isinstance(value, dict) and not value: | |
failed_field = field_id | |
break | |
# If no specific field identified yet, look for customfields in the error | |
if not failed_field: | |
for field_id in retry_fields.keys(): | |
if field_id.startswith("customfield_") and field_id in error_msg: | |
failed_field = field_id | |
break | |
# If still no field identified, scan for empty dictionaries or complex structures | |
if not failed_field and ("object" in error_msg.lower() or "value" in error_msg.lower()): | |
for field_id, value in retry_fields.items(): | |
if isinstance(value, dict): | |
# Empty dict or dict without required keys | |
if not value or not any(k in value for k in ['id', 'key', 'value', 'name']): | |
failed_field = field_id | |
break | |
break # Stop checking patterns once we find a match | |
if matched_pattern: | |
print(f"🔍 Looking for problematic fields based on pattern: {matched_pattern}") | |
# Try type-specific checks based on the pattern | |
if "Operation value must be a string" in error_msg: | |
# Look specifically for operation fields that aren't strings | |
for field_id, value in retry_fields.items(): | |
field_name = FIELD_ID_TO_NAME.get(field_id, "").lower() | |
if "operation" in field_name and not isinstance(value, str): | |
print(f" - Found non-string operation field: {debug_field_value(field_id, value)}") | |
failed_field = field_id | |
break | |
# Check for array errors | |
elif "data was not an array" in error_msg: | |
# Look for fields that should be arrays but aren't | |
for field_id, value in retry_fields.items(): | |
if field_id in ARRAY_FIELDS and not isinstance(value, list): | |
print(f" - Found non-array field that should be array: {debug_field_value(field_id, value)}") | |
failed_field = field_id | |
break | |
# If no known field found, look for any field that's not an array but might need to be | |
if not failed_field: | |
for field_id, value in retry_fields.items(): | |
if not isinstance(value, list) and isinstance(value, (dict, str, int, float, bool)): | |
if field_id not in OBJECT_FIELDS: # Don't convert known object fields | |
print(f" - Potential non-array field: {debug_field_value(field_id, value)}") | |
failed_field = field_id | |
break | |
# If we found a failed field, print it | |
if failed_field: | |
print(f"📍 Identified problematic field: {debug_field_value(failed_field, retry_fields[failed_field])}") | |
# If we found a problematic field, attempt to fix it or exclude it | |
if failed_field and failed_field in retry_fields: | |
fixed = False | |
# Special fix for "Could not find valid 'id' or 'value' in the Parent Option object" | |
if "Could not find valid 'id' or 'value'" in error_msg: | |
value = retry_fields[failed_field] | |
if isinstance(value, list) and len(value) == 1 and isinstance(value[0], dict) and "value" in value[0]: | |
# Convert array with single value object to direct value object | |
retry_fields[failed_field] = {"value": value[0]["value"]} | |
print(f"🔄 Converting array with value object to direct value object: {failed_field}") | |
OBJECT_FIELDS.add(failed_field) # Remember this needs to be an object | |
fixed = True | |
elif isinstance(value, list) and len(value) > 0: | |
# Try other conversions for lists | |
if all(isinstance(item, dict) and "value" in item for item in value): | |
# If all items have 'value', use the first one | |
retry_fields[failed_field] = {"value": value[0]["value"]} | |
print(f"🔄 Using first value from list of values: {failed_field}") | |
OBJECT_FIELDS.add(failed_field) | |
fixed = True | |
# Fix for "data was not an array" | |
elif "data was not an array" in error_msg: | |
value = retry_fields[failed_field] | |
if not isinstance(value, list): | |
# Convert to array | |
if isinstance(value, dict) and "value" in value: | |
# If it's a value object, wrap it with array | |
retry_fields[failed_field] = [value] | |
print(f"🔄 Converting value object to array: {failed_field}") | |
ARRAY_FIELDS.add(failed_field) # Remember this needs to be an array | |
fixed = True | |
elif isinstance(value, (str, int, float, bool, dict)): | |
# Convert primitives to array | |
retry_fields[failed_field] = [value] | |
print(f"🔄 Converting value to array: {failed_field}") | |
ARRAY_FIELDS.add(failed_field) | |
fixed = True | |
# Regular type conversion fixes based on error patterns | |
if not fixed and "Operation value must be a string" in error_msg: | |
# Convert to string | |
try: | |
retry_fields[failed_field] = str(retry_fields[failed_field]) | |
print(f"🔄 Converting to string: {failed_field}") | |
fixed = True | |
except Exception as e: | |
print(f"⚠️ Failed to convert to string: {str(e)}") | |
# Fix for parent field | |
if not fixed and "Parent Option object" in error_msg: | |
if failed_field == "parent" and isinstance(retry_fields[failed_field], dict): | |
if "key" in retry_fields[failed_field]: | |
key = retry_fields[failed_field]["key"] | |
# Try just using the key string | |
retry_fields[failed_field] = key | |
print(f"🔄 Simplifying parent field to key string: {key}") | |
fixed = True | |
# If we couldn't fix it, exclude it | |
if not fixed: | |
print(f"⚠️ Excluding problematic field: {failed_field}") | |
DYNAMICALLY_EXCLUDED_FIELDS.add(failed_field) | |
del retry_fields[failed_field] | |
else: | |
# No specific field identified, fallback strategies | |
print("⚠️ No specific field identified, trying fallback strategies...") | |
# 1. Check for parent-related fields if the error suggests parent issues | |
if "parent" in error_msg.lower() or "option" in error_msg.lower(): | |
parent_fields = [field for field in retry_fields.keys() | |
if field == "parent" or | |
"parent" in FIELD_ID_TO_NAME.get(field, "").lower() or | |
"epic" in FIELD_ID_TO_NAME.get(field, "").lower()] | |
if parent_fields: | |
field_to_remove = parent_fields[0] | |
print(f"⚠️ Removing parent-related field: {field_to_remove}") | |
DYNAMICALLY_EXCLUDED_FIELDS.add(field_to_remove) | |
del retry_fields[field_to_remove] | |
else: | |
print("⚠️ No parent fields found despite parent-related error") | |
# 2. Check for array-related errors | |
elif "data was not an array" in error_msg: | |
# Look for a field we can try to convert to an array | |
for field_id, value in list(retry_fields.items()): | |
if not isinstance(value, list) and field_id not in OBJECT_FIELDS: | |
# Try converting to array | |
retry_fields[field_id] = [value] if value is not None else [] | |
print(f"🔄 Converting to array (fallback): {field_id}") | |
ARRAY_FIELDS.add(field_id) | |
break | |
# 3. If object-related errors, remove the most complex field | |
elif any(term in error_msg.lower() for term in ["object", "array", "expected"]): | |
# Find the most complex field | |
complex_fields = [] | |
for field_id, value in retry_fields.items(): | |
if isinstance(value, (dict, list)): | |
complex_fields.append((field_id, len(str(value)))) | |
if complex_fields: | |
# Sort by complexity (string length as a simple metric) | |
complex_fields.sort(key=lambda x: x[1], reverse=True) | |
field_to_remove = complex_fields[0][0] | |
print(f"⚠️ Removing most complex field: {debug_field_value(field_to_remove, retry_fields[field_to_remove])}") | |
DYNAMICALLY_EXCLUDED_FIELDS.add(field_to_remove) | |
del retry_fields[field_to_remove] | |
else: | |
print("⚠️ No complex fields found despite object-related error") | |
# 4. Last resort: if we're near max retries, remove a custom field | |
elif retry_count > max_retries // 2: | |
custom_fields = [f for f in retry_fields.keys() if f.startswith("customfield_")] | |
if custom_fields: | |
field_to_remove = custom_fields[0] | |
print(f"⚠️ Last resort: removing custom field: {debug_field_value(field_to_remove, retry_fields[field_to_remove])}") | |
DYNAMICALLY_EXCLUDED_FIELDS.add(field_to_remove) | |
del retry_fields[field_to_remove] | |
else: | |
print("⚠️ No custom fields left to remove") | |
raise # Re-raise if we can't make progress | |
else: | |
# We don't know what to fix, give up for this iteration | |
print("⚠️ Could not identify a field to fix or remove, will try again with different strategies") | |
# Only re-raise if we're near the max retries | |
if retry_count > max_retries - 3: | |
raise | |
# Break if no more fields to retry with | |
if not retry_fields: | |
print("❌ All fields have been excluded, cannot create issue") | |
raise Exception("Failed to create issue: all fields have been excluded") | |
# If we hit max retries | |
raise Exception(f"Failed to create issue after {max_retries} retries") | |
def clone_attachments(old_issue_key, new_issue_key): | |
""" Clones attachments by downloading and re-uploading them """ | |
attachments = jira.get(f"rest/api/2/issue/{old_issue_key}")['fields'].get("attachment", []) | |
for attachment in attachments: | |
url = attachment['content'] | |
filename = attachment['filename'] | |
print(f"📎 Cloning attachment {filename} from {old_issue_key} to {new_issue_key}") | |
# Download the file | |
response = jira.session.get(url, headers={"Authorization": f"Bearer {JIRA_API_TOKEN}"}, stream=True) | |
if response.status_code == 200: | |
with open(filename, "wb") as file: | |
for chunk in response.iter_content(1024): | |
file.write(chunk) | |
# Upload the file to the new issue | |
jira.issue_add_attachment(new_issue_key, filename) | |
else: | |
print(f"❌ Failed to download attachment {filename} from {old_issue_key}") | |
def clone_issue_links(old_issue_key, new_issue_key): | |
""" Clones issue links (blocks, relates to, etc.) """ | |
issue_data = jira.get(f"rest/api/2/issue/{old_issue_key}") | |
issue_links = issue_data['fields'].get('issuelinks', []) | |
for link in issue_links: | |
link_type = link['type']['name'] | |
inward_issue = link.get('inwardIssue', {}).get('key') | |
outward_issue = link.get('outwardIssue', {}).get('key') | |
if inward_issue: | |
target = inward_issue | |
direction = "inward" | |
elif outward_issue: | |
target = outward_issue | |
direction = "outward" | |
else: | |
continue # Skip if no valid link found | |
print(f"🔗 Cloning issue link {link_type} from {old_issue_key} to {new_issue_key} -> {target}") | |
# Create the link | |
jira.issue_link(link_type, new_issue_key, target) | |
def clone_epic_and_issues(original_epic_key): | |
""" Clones an epic and all its child issues """ | |
field_mapping = get_all_fields() | |
editable_fields = get_editable_fields(original_epic_key) | |
# Get original epic details | |
original_epic = jira.issue(original_epic_key) | |
epic_fields = original_epic['fields'] | |
# Clone the Epic | |
new_epic_fields = filter_relevant_fields(epic_fields, editable_fields, field_mapping) | |
new_epic_fields["summary"] = f"Cloned - {epic_fields['summary']}" | |
new_epic_fields["issuetype"] = {"name": "Epic"} # Ensure correct issue type | |
# Handle Epic Name field (common custom field) | |
if "Epic Name" in field_mapping and field_mapping["Epic Name"] in editable_fields: | |
new_epic_fields[field_mapping["Epic Name"]] = f"Cloned - {epic_fields.get(field_mapping['Epic Name'], '')}" | |
# Create the new epic with retry logic | |
new_epic = create_issue_with_retries(new_epic_fields) | |
new_epic_key = new_epic['key'] | |
print(f"✅ Cloned Epic: {original_epic_key} -> {new_epic_key}") | |
# Clone Attachments (Requires API upload) | |
clone_attachments(original_epic_key, new_epic_key) | |
# Find child issues linked to the original epic | |
jql_query = f'"Epic Link" = {original_epic_key}' | |
child_issues = jira.jql(jql_query)['issues'] | |
# Clone each child issue | |
new_issues = {} | |
for issue in child_issues: | |
issue_key = issue['key'] | |
issue_fields = issue['fields'] | |
editable_fields = get_editable_fields(issue_key) # Get editable fields for each issue type | |
new_issue_fields = filter_relevant_fields(issue_fields, editable_fields, field_mapping) | |
new_issue_fields["summary"] = f"Cloned - {issue_fields['summary']}" | |
# Handle parent field - try different formats based on issue type | |
try: | |
# First try to check if this is a sub-task | |
if "issuetype" in new_issue_fields and isinstance(new_issue_fields["issuetype"], dict) and "name" in new_issue_fields["issuetype"]: | |
issue_type_name = new_issue_fields["issuetype"]["name"].lower() | |
# Handle sub-tasks differently from stories/tasks under epics | |
if "sub" in issue_type_name or "subtask" in issue_type_name: | |
# Sub-tasks use "parent" with object containing key | |
new_issue_fields["parent"] = {"key": new_epic_key} | |
else: | |
# For regular issues under epic, try to use the Epic Link field | |
epic_link_found = False | |
for field_name, field_id in field_mapping.items(): | |
if "epic" in field_name.lower() and "link" in field_name.lower(): | |
new_issue_fields[field_id] = new_epic_key | |
epic_link_found = True | |
break | |
# If no Epic Link field was found, fall back to parent | |
if not epic_link_found: | |
new_issue_fields["parent"] = {"key": new_epic_key} | |
else: | |
# Default case - try Epic Link if available, else use parent | |
epic_link_field = next((field_id for field_name, field_id in field_mapping.items() | |
if "epic" in field_name.lower() and "link" in field_name.lower()), None) | |
if epic_link_field: | |
new_issue_fields[epic_link_field] = new_epic_key | |
else: | |
new_issue_fields["parent"] = {"key": new_epic_key} | |
except Exception as e: | |
print(f"⚠️ Issue setting parent/epic relationship: {str(e)}") | |
# Fall back to the most common approach | |
new_issue_fields["parent"] = {"key": new_epic_key} | |
# Create the new issue with retry logic | |
new_issue = create_issue_with_retries(new_issue_fields) | |
new_issues[issue_key] = new_issue['key'] | |
print(f"✅ Cloned Issue: {issue_key} -> {new_issue['key']}") | |
# Clone Attachments | |
clone_attachments(issue_key, new_issue['key']) | |
# Clone Issue Links after issues are created | |
for old_issue, new_issue in new_issues.items(): | |
clone_issue_links(old_issue, new_issue) | |
if __name__ == "__main__": | |
original_epic_key = "ABC-1" | |
clone_epic_and_issues(original_epic_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment