Created
December 7, 2018 05:29
-
-
Save mchampaneri/e0dea76c48d23dda8553f2523dff7a2e to your computer and use it in GitHub Desktop.
export data from datastore to bigquery
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Managed dump script file is already provided | |
# by google to manage datastore dump | |
# I have just extended the same script to load | |
# dump to bigquery as soon as dumping opertion | |
# is successfully done | |
import datetime | |
import httplib | |
import json | |
import logging | |
import webapp2 | |
import urllib | |
import time | |
from google.appengine.api import taskqueue | |
from google.appengine.api import app_identity | |
from google.appengine.api import urlfetch | |
class Export(webapp2.RequestHandler): | |
def get(self): | |
access_token, _ = app_identity.get_access_token( | |
'https://www.googleapis.com/auth/cloud-platform') | |
app_id = app_identity.get_application_id() | |
timestamp = datetime.datetime.now().strftime('%Y%m%d%h') | |
output_url_prefix = self.request.get('output_url_prefix') | |
assert output_url_prefix and output_url_prefix.startswith('gs://') | |
if '/' not in output_url_prefix[5:]: | |
# Only a bucket name has been provided - no prefix or trailing slash | |
output_url_prefix += '/' + timestamp | |
else: | |
output_url_prefix += timestamp | |
entity_filter = { | |
'kinds': self.request.get_all('kind'), | |
'namespace_ids': self.request.get_all('namespace_id') | |
} | |
request = { | |
'project_id': app_id, | |
'output_url_prefix': output_url_prefix, | |
'entity_filter': entity_filter | |
} | |
headers = { | |
'Content-Type': 'application/json', | |
'Authorization': 'Bearer ' + access_token | |
} | |
url = 'https://datastore.googleapis.com/v1/projects/%s:export' % app_id | |
try: | |
result = urlfetch.fetch( | |
url=url, | |
payload=json.dumps(request), | |
method=urlfetch.POST, | |
deadline=60, | |
headers=headers) | |
if result.status_code == httplib.OK: | |
# complete is varialbe to check if | |
# export operation is done or not | |
complete = 0 | |
# i Controls itration, if in 5 iteration | |
# if export is not found succesful, then | |
# we will consider it as fail | |
i = 0; | |
# making call to datastore opertaions end point | |
# to get upadate on our export operation | |
while complete == 0 and i < 5: | |
i = i + 1 | |
time.sleep( 30 ) | |
url2 = 'https://datastore.googleapis.com/v1/%s' % (jsonObj["name"]) | |
result2 = urlfetch.fetch( | |
url=url2, | |
method=urlfetch.GET, | |
deadline=60, | |
headers=headers) | |
jsonObj2 = json.loads(result2.content) | |
logging.warning(result2.content) | |
if jsonObj2["metadata"]["common"]["state"] == "SUCCESSFUL": | |
complete = 1 | |
# I am making a call to my task url | |
# as I have defined a task function | |
# that loads data to bigquery by | |
# reading them from cloud storage | |
taskqueue.add( | |
url='/tasks/load_asf_to_bigquery', | |
target='worker', | |
method='post', | |
) | |
break | |
elif result.status_code >= 500: | |
logging.error(result.content) | |
else: | |
logging.warning(result.content) | |
self.response.status_int = result.status_code | |
except urlfetch.Error: | |
logging.exception('Failed to initiate export.') | |
self.response.status_int = httplib.INTERNAL_SERVER_ERROR | |
app = webapp2.WSGIApplication( | |
[ | |
('/cloud-datastore-export', Export), | |
], debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment