Last active
January 10, 2019 21:53
-
-
Save jredl-va/3cbe2327e7532e6ec3c514b4e55bffab to your computer and use it in GitHub Desktop.
Simple Cloud SQL Admin API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
API wrapper for cloud sql admin operations | |
""" | |
import httplib | |
import json | |
import logging | |
from google.appengine.api import app_identity, urlfetch | |
from google.appengine.ext.deferred import deferred | |
class CloudSQLApi(object): | |
""" | |
Wrapper for the cloud sql API | |
""" | |
@classmethod | |
def get_access_token(cls): | |
""" | |
Return an access token for the cloud datastore api | |
""" | |
access_token, _ = app_identity.get_access_token(['https://www.googleapis.com/auth/sqlservice.admin', | |
'https://www.googleapis.com/auth/cloud-platform']) | |
return access_token | |
@classmethod | |
def get_operation(cls, operation): | |
""" | |
:param operation: Instance operation ID. | |
""" | |
headers = { | |
'Content-Type': 'application/json', | |
'Authorization': u'Bearer {token}'.format(token=cls.get_access_token()) | |
} | |
url = u'https://www.googleapis.com/sql/v1beta4/projects/{project}/operations/{operation}'.format( | |
project=app_identity.get_application_id(), | |
operation=operation) | |
result = cls.fetch(url, urlfetch.GET, headers=headers) | |
return Operation(result) | |
@classmethod | |
def export_table(cls, database, table, columns, file_path): | |
""" | |
Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump or CSV file. | |
:param database: The database name to query | |
:param table: A cloud sql table | |
:param columns: The columns within the sql table | |
:param file_path: The cloud storage path | |
""" | |
if file_exists(file_path): | |
delete_file(file_path) | |
export_query = u"SELECT {columns} FROM {table}".format( | |
table=table, | |
columns=', '.join(columns) | |
) | |
request = { | |
'exportContext': { | |
'fileType': 'CSV', | |
'uri': file_path, | |
'databases': [database], | |
'csvExportOptions': { | |
'selectQuery': [export_query] | |
} | |
}, | |
} | |
headers = { | |
'Content-Type': 'application/json', | |
'Authorization': u'Bearer {token}'.format(token=cls.get_access_token()) | |
} | |
url = u'https://www.googleapis.com/sql/v1beta4/projects/{project}/instances/{project}/export'.format( | |
project=app_identity.get_application_id()) | |
result = cls.fetch(url, urlfetch.POST, payload=request, headers=headers) | |
return Operation(result) | |
@classmethod | |
def fetch(cls, url, method, payload=None, headers=None): | |
""" | |
:param url: The url to fetch | |
:param method: GET or POST | |
:param payload: The payload of the request | |
:param headers: The headers of the request | |
:return: The result content of the fetch | |
""" | |
if payload is None: | |
payload = {} | |
if headers is None: | |
headers = {} | |
try: | |
result = urlfetch.fetch( | |
url=url, | |
payload=json.dumps(payload), | |
method=method, | |
deadline=60, | |
headers=headers) | |
if result.status_code == httplib.OK: | |
logging.info(result.content) | |
return result.content | |
elif result.status_code >= 500: | |
logging.error(result.content) | |
raise deferred.SingularTaskFailure() | |
else: | |
logging.warning(result.content) | |
raise deferred.SingularTaskFailure() | |
except Exception as e: | |
logging.exception('Backup operation failed: %s', url) | |
raise deferred.SingularTaskFailure(e) | |
class Operation(object): | |
""" | |
Represents information about database instance operations such as create, delete, and restart, or any type of long | |
running backend work, such as backups, creating and updating databases. Operations resources are created in response | |
to operations that were initiated; you never create them directly. | |
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/operations | |
""" | |
def __init__(self, operation): | |
self.operation = {} | |
if operation: | |
self.operation = json.loads(operation) | |
@property | |
def name(self): | |
""" | |
The name of the operation | |
""" | |
return self.operation.get('name') | |
@property | |
def status(self): | |
""" | |
Whether or not the operation is still in progress | |
""" | |
return self.operation.get('status') | |
@property | |
def error(self): | |
""" | |
The error result of the operation in case of failure or cancellation. | |
""" | |
return self.operation.get('error') | |
@property | |
def export_context_uri(self): | |
""" | |
The context for export operation, if applicable. | |
""" | |
return self.operation.get('exportContext', {}).get('uri') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment