Skip to content

Instantly share code, notes, and snippets.

@mchampaneri
Created December 7, 2018 05:29
Show Gist options
  • Save mchampaneri/e0dea76c48d23dda8553f2523dff7a2e to your computer and use it in GitHub Desktop.
Save mchampaneri/e0dea76c48d23dda8553f2523dff7a2e to your computer and use it in GitHub Desktop.
export data from datastore to bigquery
# Managed dump script file is already provided
# by google to manage datastore dump
# I have just extended the same script to load
# dump to bigquery as soon as dumping opertion
# is successfully done
import datetime
import httplib
import json
import logging
import webapp2
import urllib
import time
from google.appengine.api import taskqueue
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
class Export(webapp2.RequestHandler):
def get(self):
access_token, _ = app_identity.get_access_token(
'https://www.googleapis.com/auth/cloud-platform')
app_id = app_identity.get_application_id()
timestamp = datetime.datetime.now().strftime('%Y%m%d%h')
output_url_prefix = self.request.get('output_url_prefix')
assert output_url_prefix and output_url_prefix.startswith('gs://')
if '/' not in output_url_prefix[5:]:
# Only a bucket name has been provided - no prefix or trailing slash
output_url_prefix += '/' + timestamp
else:
output_url_prefix += timestamp
entity_filter = {
'kinds': self.request.get_all('kind'),
'namespace_ids': self.request.get_all('namespace_id')
}
request = {
'project_id': app_id,
'output_url_prefix': output_url_prefix,
'entity_filter': entity_filter
}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
url = 'https://datastore.googleapis.com/v1/projects/%s:export' % app_id
try:
result = urlfetch.fetch(
url=url,
payload=json.dumps(request),
method=urlfetch.POST,
deadline=60,
headers=headers)
if result.status_code == httplib.OK:
# complete is varialbe to check if
# export operation is done or not
complete = 0
# i Controls itration, if in 5 iteration
# if export is not found succesful, then
# we will consider it as fail
i = 0;
# making call to datastore opertaions end point
# to get upadate on our export operation
while complete == 0 and i < 5:
i = i + 1
time.sleep( 30 )
url2 = 'https://datastore.googleapis.com/v1/%s' % (jsonObj["name"])
result2 = urlfetch.fetch(
url=url2,
method=urlfetch.GET,
deadline=60,
headers=headers)
jsonObj2 = json.loads(result2.content)
logging.warning(result2.content)
if jsonObj2["metadata"]["common"]["state"] == "SUCCESSFUL":
complete = 1
# I am making a call to my task url
# as I have defined a task function
# that loads data to bigquery by
# reading them from cloud storage
taskqueue.add(
url='/tasks/load_asf_to_bigquery',
target='worker',
method='post',
)
break
elif result.status_code >= 500:
logging.error(result.content)
else:
logging.warning(result.content)
self.response.status_int = result.status_code
except urlfetch.Error:
logging.exception('Failed to initiate export.')
self.response.status_int = httplib.INTERNAL_SERVER_ERROR
app = webapp2.WSGIApplication(
[
('/cloud-datastore-export', Export),
], debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment