Forked from isaacharrisholt/force_airbyte_sync.py
Created
January 9, 2023 00:15
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
import requests | |
API_ROOT = 'http://localhost:8000/api/v1' # This is the default for Airbyte | |
def get_workspaces() -> List[str]: | |
response = requests.post(f'{API_ROOT}/workspaces/list') | |
response.raise_for_status() # Either handle this yourself, or use a tool like Sentry for logging | |
return [ | |
workspace['workspaceId'] | |
for workspace in repsonse.json()['workspaces'] | |
] | |
def get_connections_for_workspace(workspace_id: str) -> List[str]: | |
response = requests.post( | |
f'{API_ROOT}/connections/list', | |
json={'workspaceId': workspace_id}, | |
) | |
response.raise_for_status() | |
return [ | |
connection['connectionId'] | |
for connection in response.json()['connections'] | |
if connection['status'] == 'active' # So we can still disable connections in the UI | |
] | |
def trigger_connection_sync(connection_id: str) -> dict: | |
response = requests.post( | |
f'{API_ROOT}/connections/sync', | |
json={'connectionId': connection_id}, | |
) | |
response.raise_for_status() | |
return response.json() | |
if __name__ == '__main__': | |
workspaces = get_workspaces() | |
connections = [] | |
for workspace_id in workspaces: | |
connections.extend(get_connections_for_workspace(workspace_id)) | |
for connection_id in connections: | |
trigger_connection_sync(connection_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment