Last active
December 1, 2023 21:39
-
-
Save ImIOImI/328aadacde5d939a61383ecadc1d2a08 to your computer and use it in GitHub Desktop.
Search All K8s Manifests for A String
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess | |
import argparse | |
import os | |
def get_resource_types(): | |
cmd = ["kubectl", "api-resources", "-o", "name"] | |
output = subprocess.check_output(cmd, text=True) | |
resource_types = output.strip().split('\n') | |
return resource_types | |
def search_and_save_manifests(resource_types, search_string): | |
results_folder = "results" | |
os.makedirs(results_folder, exist_ok=True) | |
# Get the total number of resource types | |
total_resource_types = len(resource_types) | |
for type_count, resource in enumerate(resource_types): | |
try: | |
print(f"Processing resource type {resource}: {type_count + 1} of {total_resource_types}") | |
cmd = ["kubectl", "get", resource, "--all-namespaces", "-o", "yaml"] | |
# Get a single yaml file with all the manifests for the resource type. This saves a LOT of kubectl calls | |
# and time. Its MUCH faster to narrow down the resource type and then getting and searching individual | |
# manifest files one by one | |
# all_type_manifests = subprocess.check_output(cmd, text=True).strip().split("\n---\n") | |
try: | |
all_type_manifests = subprocess.check_output(cmd, text=True).strip().split("\n---\n") | |
except subprocess.CalledProcessError as e: | |
print(f"Error: {e}") | |
all_type_manifests = "" | |
for idx, type_manifest in enumerate(all_type_manifests): | |
# Check if the search string is in the manifest | |
if search_string in type_manifest: | |
# we found the string, so break down the huge manifest file into individual resources | |
cmd = ["kubectl", "get", resource, "-o", "custom-columns=:metadata.name", "--no-headers"] | |
names = subprocess.check_output(cmd, text=True).strip().split('\n') | |
for name in names: | |
cmd = ["kubectl", "get", resource, name, "-o", "yaml"] | |
manifest = subprocess.check_output(cmd, text=True).strip() | |
print(f"Manifest for {resource} {name} yaml len: {len(manifest)}") | |
# Check if the search string is in the manifest | |
if search_string in manifest: | |
# we found the string, so save the manifest | |
file_name = f"{name}.yaml" | |
results_folder = f"results/{search_string}/{resource}" | |
os.makedirs(results_folder, exist_ok=True) | |
with open(os.path.join(results_folder, file_name), "w") as file: | |
print(f"########### Writing manifest for resource {resource} {name} to file") | |
file.write(f"# {resource} {name} Manifest {idx}\n") | |
file.write(manifest) | |
except subprocess.CalledProcessError as e: | |
print(f"Error getting manifests for resource {resource}: {e}") | |
continue | |
def main(): | |
parser = argparse.ArgumentParser(description="Search and save Kubernetes manifests containing a specific string.") | |
parser.add_argument("search_string", help="String to search for in manifests.") | |
args = parser.parse_args() | |
resource_types = get_resource_types() | |
search_and_save_manifests(resource_types, args.search_string) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a weapon of last resort. May God have mercy on your soul if it comes to this.