from atlassian import Jira # Jira Configuration JIRA_URL = "https://your-jira-instance.atlassian.net" JIRA_USER = "your-email@example.com" JIRA_API_TOKEN = "your-api-token" # Initialize Jira connection jira = Jira(url=JIRA_URL, username=JIRA_USER, password=JIRA_API_TOKEN, cloud=True) # For debugging - reverse mapping from field ID to field name FIELD_ID_TO_NAME = {} # Will be populated in get_all_fields EXCLUDED_FIELDS = { "comment", "attachment", "issueLinks", "worklog", "timespent", "subtasks", "votes", "watches", "aggregatetimeoriginalestimate", "aggregatetimeestimate", "aggregateprogress", "progress", "timetracking", "thumbnail", "environment", "duedate", "lastViewed", "resolutiondate", "created", "updated", "status" } DYNAMICALLY_EXCLUDED_FIELDS = set() # Tracks fields rejected by Jira # Track fields that need specific formats ARRAY_FIELDS = set() # Fields that must be arrays OBJECT_FIELDS = set() # Fields that must be objects def get_editable_fields(issue_key): """ Returns a set of fields that can be set for the given issue type """ meta = jira.get(f"rest/api/2/issue/{issue_key}/editmeta") return set(meta['fields'].keys()) def get_all_fields(): """ Retrieve all Jira fields and return a mapping {field_name: field_id} """ fields = jira.get("rest/api/2/field") name_to_id = {} # Also populate the reverse mapping for debugging global FIELD_ID_TO_NAME FIELD_ID_TO_NAME = {} for field in fields: name_to_id[field['name']] = field['id'] FIELD_ID_TO_NAME[field['id']] = field['name'] return name_to_id def safe_dict_value(obj, key, default=None): """Safely get a value from a dictionary without raising KeyError""" if isinstance(obj, dict) and key in obj: return obj[key] return default def sanitize_field_value(field_name, field_value): """Sanitize a field value based on field name patterns""" if field_value is None: return None # Handle operations - must be strings if "operation" in field_name.lower(): return str(field_value) # Handle numeric fields if any(term in field_name.lower() for term in ["id", "count", "number", "estimate", "spent"]): if isinstance(field_value, str) and field_value.isdigit(): return int(field_value) # Handle dictionary with missing id/value if isinstance(field_value, dict): # Ensure objects have required fields if "key" not in field_value and "id" not in field_value and "value" not in field_value: if "name" in field_value: return {"name": field_value["name"]} elif len(field_value) == 0: return None # Empty dict should be null return field_value def debug_field_value(field_id, value): """Return a debug string for a field value""" field_name = FIELD_ID_TO_NAME.get(field_id, "Unknown Field") value_type = type(value).__name__ value_str = str(value) # Truncate long values if len(value_str) > 100: value_str = value_str[:97] + "..." return f"{field_id} ({field_name}) = {value_str} [Type: {value_type}]" def filter_relevant_fields(issue_fields, editable_fields, field_mapping): """ Extracts relevant fields ensuring only editable ones are kept """ allowed_keys = {"summary", "description", "issuetype", "project", "priority"} relevant_fields = {} # Process standard fields for key in allowed_keys: if key in issue_fields: # Handle special case for issuetype if key == "issuetype" and isinstance(issue_fields[key], dict): if "name" in issue_fields[key]: relevant_fields[key] = {"name": issue_fields[key]["name"]} else: relevant_fields[key] = issue_fields[key] # Add custom fields only if they are editable and not excluded for field_name, field_id in field_mapping.items(): if ( field_id in issue_fields and field_id in editable_fields and field_id not in EXCLUDED_FIELDS and field_id not in DYNAMICALLY_EXCLUDED_FIELDS ): # Skip if the value is None if issue_fields[field_id] is None: continue # Apply general sanitization first based on field name patterns sanitized_value = sanitize_field_value(field_name, issue_fields[field_id]) if sanitized_value is None: continue # Check if we already know the format requirement for this field if field_id in ARRAY_FIELDS: # This field needs to be an array if not isinstance(sanitized_value, list): # Convert to array relevant_fields[field_id] = [sanitized_value] else: relevant_fields[field_id] = sanitized_value continue if field_id in OBJECT_FIELDS: # This field needs to be an object if isinstance(sanitized_value, list) and len(sanitized_value) == 1 and isinstance(sanitized_value[0], dict) and "value" in sanitized_value[0]: # Convert array with single value to object relevant_fields[field_id] = {"value": sanitized_value[0]["value"]} elif isinstance(sanitized_value, list) and len(sanitized_value) > 0 and all(isinstance(item, dict) and "value" in item for item in sanitized_value): # Take first value from list of values relevant_fields[field_id] = {"value": sanitized_value[0]["value"]} elif isinstance(sanitized_value, dict): relevant_fields[field_id] = sanitized_value else: # For non-dict values, wrap in a value object relevant_fields[field_id] = {"value": str(sanitized_value)} continue # Special handling for Sprint field if field_name == "Sprint" or "sprint" in field_name.lower(): # Sprint requires numeric IDs if isinstance(sanitized_value, list): sprint_ids = [] for sprint in sanitized_value: if isinstance(sprint, dict) and 'id' in sprint: sprint_ids.append(sprint['id']) elif isinstance(sprint, (int, str)) and str(sprint).isdigit(): sprint_ids.append(int(sprint)) if sprint_ids: relevant_fields[field_id] = sprint_ids elif isinstance(sanitized_value, (int, str)) and str(sanitized_value).isdigit(): # Handle case where it's a single sprint ID relevant_fields[field_id] = [int(sanitized_value)] else: # Skip this field if we can't extract proper sprint IDs continue # Handle operation fields that require string values elif field_name.startswith("Operation") or "operation" in field_name.lower(): if sanitized_value is not None: # Ensure the value is a string relevant_fields[field_id] = str(sanitized_value) # Handle parent field specially elif field_name.lower() == "parent" or "parent" in field_id.lower(): if isinstance(sanitized_value, dict) and "key" in sanitized_value: relevant_fields[field_id] = {"key": sanitized_value["key"]} elif isinstance(sanitized_value, str): # If it's just a string, assume it's a key relevant_fields[field_id] = {"key": sanitized_value} # Handle Epic Link field specially elif "epic" in field_name.lower() and "link" in field_name.lower(): if isinstance(sanitized_value, dict) and "key" in sanitized_value: relevant_fields[field_id] = sanitized_value["key"] else: # For Epic Link, we usually just need the key string relevant_fields[field_id] = str(sanitized_value) # Handle array/list fields that contain a single value/object with "value" elif isinstance(sanitized_value, list) and len(sanitized_value) == 1 and isinstance(sanitized_value[0], dict) and "value" in sanitized_value[0]: # CRITICAL FIX: For array with single value object, use object directly instead of array relevant_fields[field_id] = {"value": sanitized_value[0]["value"]} # Handle array/list fields elif isinstance(sanitized_value, list): # Process each item in the list to ensure proper formatting processed_list = [] for item in sanitized_value: if isinstance(item, dict) and 'id' in item: # For objects with IDs, send just the ID which is often what's needed processed_list.append(item['id']) elif isinstance(item, dict) and 'key' in item: # For objects with keys, send just the key processed_list.append({"key": item['key']}) elif isinstance(item, dict) and 'value' in item: # For objects with values, send just the value processed_list.append(item['value']) else: # For simple values or other objects, keep as is processed_list.append(item) if processed_list: # Only add if not empty relevant_fields[field_id] = processed_list # Handle object fields with ID or key elif isinstance(sanitized_value, dict): if 'id' in sanitized_value: # Some fields expect just the ID (like custom fields) if "customfield_" in field_id: relevant_fields[field_id] = sanitized_value['id'] else: # Others expect the whole object relevant_fields[field_id] = {"id": sanitized_value['id']} elif 'key' in sanitized_value: # Fields with keys usually expect the object relevant_fields[field_id] = {"key": sanitized_value['key']} elif 'name' in sanitized_value: # Fields with names usually expect the object relevant_fields[field_id] = {"name": sanitized_value['name']} elif 'value' in sanitized_value: # For value objects, sometimes just the value is needed relevant_fields[field_id] = sanitized_value else: # Only add if the dict is not empty if sanitized_value: relevant_fields[field_id] = sanitized_value else: # For primitive values (string, number, etc.) relevant_fields[field_id] = sanitized_value return relevant_fields def create_issue_with_retries(issue_fields): """ Creates an issue and dynamically excludes failing fields if necessary """ retry_fields = issue_fields.copy() max_retries = 15 # Prevent infinite loops retry_count = 0 # Print all fields for debugging print("\nš DEBUG: All fields being sent to Jira:") for field_id, value in sorted(retry_fields.items()): print(f" - {debug_field_value(field_id, value)}") # Ensure required fields are present and in correct format if "issuetype" not in retry_fields or not isinstance(retry_fields["issuetype"], dict): if "issuetype" in retry_fields: # Try to convert to proper format if isinstance(retry_fields["issuetype"], str): retry_fields["issuetype"] = {"name": retry_fields["issuetype"]} else: # If we can't fix it, remove it (will cause error but prevent unexpected behavior) del retry_fields["issuetype"] # Ensure project is correctly formatted if "project" in retry_fields and not isinstance(retry_fields["project"], dict): if isinstance(retry_fields["project"], str): if retry_fields["project"].isdigit(): retry_fields["project"] = {"id": retry_fields["project"]} else: retry_fields["project"] = {"key": retry_fields["project"]} print(f"š Creating issue with {len(retry_fields)} fields") while retry_count < max_retries: try: result = jira.issue_create(retry_fields) # Attempt issue creation print(f"ā Successfully created issue with {len(retry_fields)} fields") return result except Exception as e: retry_count += 1 error_msg = str(e) print(f"\nā ļø ERROR CREATING ISSUE (attempt {retry_count}/{max_retries}):") print(f"Error message: {error_msg}") failed_field = None # Error patterns and their expected types type_errors = [ ("Number value expected", "number"), ("Operation value must be a string", "string"), ("String value expected", "string"), ("Array value expected", "array"), ("Object value expected", "object"), ("Boolean value expected", "boolean"), ("Could not find valid 'id' or 'value'", "object_with_id"), ("data was not an array", "array"), # New error pattern ("cannot be set", None), ("Field '", None), # Generic field error pattern ("is not on the appropriate screen", None), ("Field value is required", None) ] # Check all fields mentioned in the error message suspicious_fields = [] for field_id in retry_fields.keys(): if field_id in error_msg: suspicious_fields.append(field_id) if suspicious_fields: print("š Fields mentioned in error message:") for field_id in suspicious_fields: print(f" - {debug_field_value(field_id, retry_fields[field_id])}") # Use the first suspicious field as our failed field failed_field = suspicious_fields[0] print(f"š Selecting first suspicious field: {failed_field}") else: # Look for patterns in the error message matched_pattern = None for error_pattern, expected_type in type_errors: if error_pattern in error_msg: matched_pattern = error_pattern print(f"š Error pattern matched: {error_pattern}") # Special case handling if "Parent Option object" in error_msg: parent_fields = [] for field_id, value in retry_fields.items(): # Check if this might be a parent-related field field_name = FIELD_ID_TO_NAME.get(field_id, "").lower() if (field_id == "parent" or "parent" in field_name or "epic" in field_name): parent_fields.append(field_id) print(f" - Potential parent field: {debug_field_value(field_id, value)}") if parent_fields: failed_field = parent_fields[0] else: # Look for any fields with empty dictionary values for field_id, value in retry_fields.items(): if isinstance(value, dict) and not value: failed_field = field_id break # If no specific field identified yet, look for customfields in the error if not failed_field: for field_id in retry_fields.keys(): if field_id.startswith("customfield_") and field_id in error_msg: failed_field = field_id break # If still no field identified, scan for empty dictionaries or complex structures if not failed_field and ("object" in error_msg.lower() or "value" in error_msg.lower()): for field_id, value in retry_fields.items(): if isinstance(value, dict): # Empty dict or dict without required keys if not value or not any(k in value for k in ['id', 'key', 'value', 'name']): failed_field = field_id break break # Stop checking patterns once we find a match if matched_pattern: print(f"š Looking for problematic fields based on pattern: {matched_pattern}") # Try type-specific checks based on the pattern if "Operation value must be a string" in error_msg: # Look specifically for operation fields that aren't strings for field_id, value in retry_fields.items(): field_name = FIELD_ID_TO_NAME.get(field_id, "").lower() if "operation" in field_name and not isinstance(value, str): print(f" - Found non-string operation field: {debug_field_value(field_id, value)}") failed_field = field_id break # Check for array errors elif "data was not an array" in error_msg: # Look for fields that should be arrays but aren't for field_id, value in retry_fields.items(): if field_id in ARRAY_FIELDS and not isinstance(value, list): print(f" - Found non-array field that should be array: {debug_field_value(field_id, value)}") failed_field = field_id break # If no known field found, look for any field that's not an array but might need to be if not failed_field: for field_id, value in retry_fields.items(): if not isinstance(value, list) and isinstance(value, (dict, str, int, float, bool)): if field_id not in OBJECT_FIELDS: # Don't convert known object fields print(f" - Potential non-array field: {debug_field_value(field_id, value)}") failed_field = field_id break # If we found a failed field, print it if failed_field: print(f"š Identified problematic field: {debug_field_value(failed_field, retry_fields[failed_field])}") # If we found a problematic field, attempt to fix it or exclude it if failed_field and failed_field in retry_fields: fixed = False # Special fix for "Could not find valid 'id' or 'value' in the Parent Option object" if "Could not find valid 'id' or 'value'" in error_msg: value = retry_fields[failed_field] if isinstance(value, list) and len(value) == 1 and isinstance(value[0], dict) and "value" in value[0]: # Convert array with single value object to direct value object retry_fields[failed_field] = {"value": value[0]["value"]} print(f"š Converting array with value object to direct value object: {failed_field}") OBJECT_FIELDS.add(failed_field) # Remember this needs to be an object fixed = True elif isinstance(value, list) and len(value) > 0: # Try other conversions for lists if all(isinstance(item, dict) and "value" in item for item in value): # If all items have 'value', use the first one retry_fields[failed_field] = {"value": value[0]["value"]} print(f"š Using first value from list of values: {failed_field}") OBJECT_FIELDS.add(failed_field) fixed = True # Fix for "data was not an array" elif "data was not an array" in error_msg: value = retry_fields[failed_field] if not isinstance(value, list): # Convert to array if isinstance(value, dict) and "value" in value: # If it's a value object, wrap it with array retry_fields[failed_field] = [value] print(f"š Converting value object to array: {failed_field}") ARRAY_FIELDS.add(failed_field) # Remember this needs to be an array fixed = True elif isinstance(value, (str, int, float, bool, dict)): # Convert primitives to array retry_fields[failed_field] = [value] print(f"š Converting value to array: {failed_field}") ARRAY_FIELDS.add(failed_field) fixed = True # Regular type conversion fixes based on error patterns if not fixed and "Operation value must be a string" in error_msg: # Convert to string try: retry_fields[failed_field] = str(retry_fields[failed_field]) print(f"š Converting to string: {failed_field}") fixed = True except Exception as e: print(f"ā ļø Failed to convert to string: {str(e)}") # Fix for parent field if not fixed and "Parent Option object" in error_msg: if failed_field == "parent" and isinstance(retry_fields[failed_field], dict): if "key" in retry_fields[failed_field]: key = retry_fields[failed_field]["key"] # Try just using the key string retry_fields[failed_field] = key print(f"š Simplifying parent field to key string: {key}") fixed = True # If we couldn't fix it, exclude it if not fixed: print(f"ā ļø Excluding problematic field: {failed_field}") DYNAMICALLY_EXCLUDED_FIELDS.add(failed_field) del retry_fields[failed_field] else: # No specific field identified, fallback strategies print("ā ļø No specific field identified, trying fallback strategies...") # 1. Check for parent-related fields if the error suggests parent issues if "parent" in error_msg.lower() or "option" in error_msg.lower(): parent_fields = [field for field in retry_fields.keys() if field == "parent" or "parent" in FIELD_ID_TO_NAME.get(field, "").lower() or "epic" in FIELD_ID_TO_NAME.get(field, "").lower()] if parent_fields: field_to_remove = parent_fields[0] print(f"ā ļø Removing parent-related field: {field_to_remove}") DYNAMICALLY_EXCLUDED_FIELDS.add(field_to_remove) del retry_fields[field_to_remove] else: print("ā ļø No parent fields found despite parent-related error") # 2. Check for array-related errors elif "data was not an array" in error_msg: # Look for a field we can try to convert to an array for field_id, value in list(retry_fields.items()): if not isinstance(value, list) and field_id not in OBJECT_FIELDS: # Try converting to array retry_fields[field_id] = [value] if value is not None else [] print(f"š Converting to array (fallback): {field_id}") ARRAY_FIELDS.add(field_id) break # 3. If object-related errors, remove the most complex field elif any(term in error_msg.lower() for term in ["object", "array", "expected"]): # Find the most complex field complex_fields = [] for field_id, value in retry_fields.items(): if isinstance(value, (dict, list)): complex_fields.append((field_id, len(str(value)))) if complex_fields: # Sort by complexity (string length as a simple metric) complex_fields.sort(key=lambda x: x[1], reverse=True) field_to_remove = complex_fields[0][0] print(f"ā ļø Removing most complex field: {debug_field_value(field_to_remove, retry_fields[field_to_remove])}") DYNAMICALLY_EXCLUDED_FIELDS.add(field_to_remove) del retry_fields[field_to_remove] else: print("ā ļø No complex fields found despite object-related error") # 4. Last resort: if we're near max retries, remove a custom field elif retry_count > max_retries // 2: custom_fields = [f for f in retry_fields.keys() if f.startswith("customfield_")] if custom_fields: field_to_remove = custom_fields[0] print(f"ā ļø Last resort: removing custom field: {debug_field_value(field_to_remove, retry_fields[field_to_remove])}") DYNAMICALLY_EXCLUDED_FIELDS.add(field_to_remove) del retry_fields[field_to_remove] else: print("ā ļø No custom fields left to remove") raise # Re-raise if we can't make progress else: # We don't know what to fix, give up for this iteration print("ā ļø Could not identify a field to fix or remove, will try again with different strategies") # Only re-raise if we're near the max retries if retry_count > max_retries - 3: raise # Break if no more fields to retry with if not retry_fields: print("ā All fields have been excluded, cannot create issue") raise Exception("Failed to create issue: all fields have been excluded") # If we hit max retries raise Exception(f"Failed to create issue after {max_retries} retries") def clone_attachments(old_issue_key, new_issue_key): """ Clones attachments by downloading and re-uploading them """ attachments = jira.get(f"rest/api/2/issue/{old_issue_key}")['fields'].get("attachment", []) for attachment in attachments: url = attachment['content'] filename = attachment['filename'] print(f"š Cloning attachment {filename} from {old_issue_key} to {new_issue_key}") # Download the file response = jira.session.get(url, headers={"Authorization": f"Bearer {JIRA_API_TOKEN}"}, stream=True) if response.status_code == 200: with open(filename, "wb") as file: for chunk in response.iter_content(1024): file.write(chunk) # Upload the file to the new issue jira.issue_add_attachment(new_issue_key, filename) else: print(f"ā Failed to download attachment {filename} from {old_issue_key}") def clone_issue_links(old_issue_key, new_issue_key): """ Clones issue links (blocks, relates to, etc.) """ issue_data = jira.get(f"rest/api/2/issue/{old_issue_key}") issue_links = issue_data['fields'].get('issuelinks', []) for link in issue_links: link_type = link['type']['name'] inward_issue = link.get('inwardIssue', {}).get('key') outward_issue = link.get('outwardIssue', {}).get('key') if inward_issue: target = inward_issue direction = "inward" elif outward_issue: target = outward_issue direction = "outward" else: continue # Skip if no valid link found print(f"š Cloning issue link {link_type} from {old_issue_key} to {new_issue_key} -> {target}") # Create the link jira.issue_link(link_type, new_issue_key, target) def clone_epic_and_issues(original_epic_key): """ Clones an epic and all its child issues """ field_mapping = get_all_fields() editable_fields = get_editable_fields(original_epic_key) # Get original epic details original_epic = jira.issue(original_epic_key) epic_fields = original_epic['fields'] # Clone the Epic new_epic_fields = filter_relevant_fields(epic_fields, editable_fields, field_mapping) new_epic_fields["summary"] = f"Cloned - {epic_fields['summary']}" new_epic_fields["issuetype"] = {"name": "Epic"} # Ensure correct issue type # Handle Epic Name field (common custom field) if "Epic Name" in field_mapping and field_mapping["Epic Name"] in editable_fields: new_epic_fields[field_mapping["Epic Name"]] = f"Cloned - {epic_fields.get(field_mapping['Epic Name'], '')}" # Create the new epic with retry logic new_epic = create_issue_with_retries(new_epic_fields) new_epic_key = new_epic['key'] print(f"ā Cloned Epic: {original_epic_key} -> {new_epic_key}") # Clone Attachments (Requires API upload) clone_attachments(original_epic_key, new_epic_key) # Find child issues linked to the original epic jql_query = f'"Epic Link" = {original_epic_key}' child_issues = jira.jql(jql_query)['issues'] # Clone each child issue new_issues = {} for issue in child_issues: issue_key = issue['key'] issue_fields = issue['fields'] editable_fields = get_editable_fields(issue_key) # Get editable fields for each issue type new_issue_fields = filter_relevant_fields(issue_fields, editable_fields, field_mapping) new_issue_fields["summary"] = f"Cloned - {issue_fields['summary']}" # Handle parent field - try different formats based on issue type try: # First try to check if this is a sub-task if "issuetype" in new_issue_fields and isinstance(new_issue_fields["issuetype"], dict) and "name" in new_issue_fields["issuetype"]: issue_type_name = new_issue_fields["issuetype"]["name"].lower() # Handle sub-tasks differently from stories/tasks under epics if "sub" in issue_type_name or "subtask" in issue_type_name: # Sub-tasks use "parent" with object containing key new_issue_fields["parent"] = {"key": new_epic_key} else: # For regular issues under epic, try to use the Epic Link field epic_link_found = False for field_name, field_id in field_mapping.items(): if "epic" in field_name.lower() and "link" in field_name.lower(): new_issue_fields[field_id] = new_epic_key epic_link_found = True break # If no Epic Link field was found, fall back to parent if not epic_link_found: new_issue_fields["parent"] = {"key": new_epic_key} else: # Default case - try Epic Link if available, else use parent epic_link_field = next((field_id for field_name, field_id in field_mapping.items() if "epic" in field_name.lower() and "link" in field_name.lower()), None) if epic_link_field: new_issue_fields[epic_link_field] = new_epic_key else: new_issue_fields["parent"] = {"key": new_epic_key} except Exception as e: print(f"ā ļø Issue setting parent/epic relationship: {str(e)}") # Fall back to the most common approach new_issue_fields["parent"] = {"key": new_epic_key} # Create the new issue with retry logic new_issue = create_issue_with_retries(new_issue_fields) new_issues[issue_key] = new_issue['key'] print(f"ā Cloned Issue: {issue_key} -> {new_issue['key']}") # Clone Attachments clone_attachments(issue_key, new_issue['key']) # Clone Issue Links after issues are created for old_issue, new_issue in new_issues.items(): clone_issue_links(old_issue, new_issue) if __name__ == "__main__": original_epic_key = "ABC-1" clone_epic_and_issues(original_epic_key)