Created
September 3, 2024 14:10
-
-
Save jalada/8d6e1597aad2267aea0f11647bcef867 to your computer and use it in GitHub Desktop.
Import a Heroku DB credential into 1Password
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess | |
import re | |
import json | |
import urllib.parse | |
from prompt_toolkit import prompt | |
from prompt_toolkit.completion import Completer, Completion, FuzzyCompleter | |
# Helper function to run subprocess commands | |
def run_subprocess(command): | |
result = subprocess.run(command, stdout=subprocess.PIPE, text=True) | |
return result.stdout | |
class ItemCompleter(Completer): | |
def __init__(self, items): | |
self.items = items | |
def get_completions(self, document, complete_event): | |
word = document.text | |
for item in self.items: | |
yield Completion(item['title'], start_position=-len(word), display=item['title']) | |
def get_1password_accounts(): | |
# Fetch all available 1Password accounts | |
return json.loads(run_subprocess(["op", "account", "list", "--format=json"])) | |
def prompt_user_account_selection(accounts): | |
# Display available accounts to the user | |
print("Available 1Password accounts:") | |
for index, account in enumerate(accounts, start=1): | |
print(f"{index}. {account['email']} ({account['url']})") | |
# Prompt the user to select an account | |
selection = input("Enter the number of the account you want to use: ") | |
selected_index = int(selection.strip()) | |
return accounts[selected_index - 1]['account_uuid'] | |
def get_1password_vaults(account_shorthand): | |
# Fetch all available vaults in the selected 1Password account | |
return json.loads(run_subprocess(["op", "--account", account_shorthand, "vault", "list", "--format=json"])) | |
def prompt_user_vault_selection(vaults): | |
# Display available vaults to the user | |
print("Available vaults:") | |
for index, vault in enumerate(vaults, start=1): | |
print(f"{index}. {vault['name']} (ID: {vault['id']})") | |
# Prompt the user to select a vault | |
selection = input("Enter the number of the vault you want to use: ") | |
selected_index = int(selection.strip()) | |
return vaults[selected_index - 1]['id'] | |
def list_heroku_databases(app_name): | |
# List available databases for the app using Heroku CLI | |
addons = json.loads(run_subprocess(["heroku", "addons", "--app", app_name, "--json"])) | |
return [addon for addon in addons if addon['addon_service']['name'] == 'heroku-postgresql'] | |
def list_heroku_database_credentials(app_name, database_name): | |
# List credentials for a specific database | |
return run_subprocess(["heroku", "pg:credentials", database_name, "--app", app_name]) | |
def parse_credentials_list(credentials_output): | |
# Parse the credentials list output from Heroku CLI | |
credentials = [] | |
for line in credentials_output.splitlines(): | |
if "active" in line: | |
parts = line.split() | |
if parts: | |
credentials.append(parts[0]) | |
return credentials | |
def fetch_connection_url(app_name, database_name, credential_name): | |
# Get the connection information for a specific credential | |
return run_subprocess(["heroku", "pg:credentials:url", database_name, "--name", credential_name, "--app", app_name]) | |
def parse_connection_url(connection_info): | |
# Parse the connection info string | |
match = re.search(r'Connection URL:\s*postgres://(.*?):(.*?)@(.*?):(.*?)\/(.*)', connection_info) | |
if match: | |
return { | |
'username': match.group(1), | |
'password': match.group(2), | |
'hostname': match.group(3), | |
'port': match.group(4), | |
'database': match.group(5) | |
} | |
return None | |
def prompt_user_database_selection(databases): | |
# Display available databases to the user | |
print("Available PostgreSQL databases:") | |
for index, db in enumerate(databases, start=1): | |
attachment_names = [attachment['name'] for attachment in db.get('attachments', [])] | |
attachment_list = ', '.join(attachment_names) if attachment_names else 'No attachments' | |
print(f"{index}. {db['name']} ({attachment_list})") | |
# Prompt the user to select a database | |
selection = input("Enter the number of the database you want to use: ") | |
selected_index = int(selection.strip()) | |
return databases[selected_index - 1]['name'] | |
def prompt_user_credential_selection(credentials): | |
# Display available credentials to the user | |
print("Available credentials:") | |
for index, credential in enumerate(credentials, start=1): | |
print(f"{index}. {credential}") | |
# Prompt the user to select credentials | |
selection = input("Enter the number of the credentials you want to use: ") | |
selected_index = int(selection.strip()) | |
return credentials[selected_index - 1] | |
def create_1password_entry(account_shorthand, vault, title, parsed_db): | |
# Create a new item in 1Password | |
subprocess.run( | |
[ | |
"op", "--account", account_shorthand, "item", "create", | |
"--vault", vault, | |
"--category", "DATABASE", | |
f"--title={title}", | |
f"username[STRING]={parsed_db['username']}", | |
f"password[CONCEALED]={parsed_db['password']}", | |
f"server[STRING]={parsed_db['hostname']}", | |
f"port[STRING]={parsed_db['port']}", | |
f"database[STRING]={parsed_db['database']}" | |
] | |
) | |
def update_1password_item(account_shorthand, item_id, parsed_db): | |
# Update the existing item in 1Password | |
subprocess.run( | |
[ | |
"op", "--account", account_shorthand, "item", "edit", item_id, | |
f"username[STRING]={parsed_db['username']}", | |
f"password[CONCEALED]={parsed_db['password']}", | |
f"server[STRING]={parsed_db['hostname']}", | |
f"port[STRING]={parsed_db['port']}", | |
f"database[STRING]={parsed_db['database']}" | |
] | |
) | |
def list_1password_items(account_shorthand, vault): | |
# List all items in the selected vault | |
return json.loads(run_subprocess(["op", "--account", account_shorthand, "item", "list", "--vault", vault, "--format=json"])) | |
def prompt_user_item_selection(items): | |
# Use prompt_toolkit to interactively search for items | |
completer = FuzzyCompleter(ItemCompleter(items)) | |
selected_title = prompt('Search and select the item to update (or press Enter to create a new item): ', completer=completer) | |
if selected_title.strip() == "": | |
return None | |
for item in items: | |
if item['title'] == selected_title: | |
return item['id'] | |
return None | |
def main(): | |
app_name = input("Enter the Heroku app name: ") | |
# Fetch the available 1Password accounts | |
accounts = get_1password_accounts() | |
if not accounts: | |
print("No 1Password accounts found.") | |
return | |
# Prompt the user to select an account | |
selected_account = prompt_user_account_selection(accounts) | |
# Fetch the available vaults in the selected account | |
vaults = get_1password_vaults(selected_account) | |
if not vaults: | |
print("No vaults found in the selected 1Password account.") | |
return | |
# Prompt the user to select a vault | |
selected_vault = prompt_user_vault_selection(vaults) | |
# List all items in the selected vault | |
items = list_1password_items(selected_account, selected_vault) | |
# List available databases in the app | |
databases = list_heroku_databases(app_name) | |
if not databases: | |
print("No PostgreSQL databases found in the app.") | |
return | |
# Prompt the user to select a database | |
selected_database = prompt_user_database_selection(databases) | |
# List and parse available credentials for the selected database | |
credentials_output = list_heroku_database_credentials(app_name, selected_database) | |
credentials = parse_credentials_list(credentials_output) | |
if not credentials: | |
print(f"No credentials found for the database {selected_database}.") | |
return | |
# Prompt the user to select which credentials to use | |
selected_credential = prompt_user_credential_selection(credentials) | |
# Fetch and parse the connection URL for the selected credential | |
connection_info = fetch_connection_url(app_name, selected_database, selected_credential) | |
parsed_db = parse_connection_url(connection_info) | |
if not parsed_db: | |
print(f"Failed to parse connection information for the credential {selected_credential}.") | |
return | |
# Prompt the user to update an existing item or create a new one | |
item_id = prompt_user_item_selection(items) | |
if item_id: | |
# Update the existing item | |
print(f"Updating item {item_id}...") | |
update_1password_item(selected_account, item_id, parsed_db) | |
else: | |
# Create a new item | |
new_title = input(f"Enter the title for the new item for {selected_database}: ") | |
create_1password_entry(selected_account, selected_vault, new_title, parsed_db) | |
print(f"Successfully created 1Password entries for the selected databases in {app_name}.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment