Created
September 21, 2021 09:47
-
-
Save jojosati/700450b740248b7f9743bce8b769f7f5 to your computer and use it in GitHub Desktop.
MongoDB connector for Data Studio - aggregation cache (configService.gs)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function configService(config) { | |
var fn = { }; | |
fn.get = function (key) { | |
return config[key] || ''; | |
} | |
fn.eval = function (key) { | |
var exp = fn.get(key) | |
if (exp) | |
return eval('(' + exp + ')'); | |
} | |
fn.opt = function (key, okey) { | |
var exp = fn.get(key) | |
if (exp) { | |
var re = new RegExp('\\/\\*{2,}\\s*' + okey + '\\s*\\:\\s*(.*?)\\s*\\*{2,}\\/', 'i') | |
var m = exp.match(re) | |
if (m) | |
return m[1]; | |
} | |
} | |
fn.list = function (key) { | |
return fn.get(key).split(/(?:\s*[\,\n]\s*)+/).filter(function(v){ return v }); | |
} | |
fn.pipeline = function (dateRange, sample, pfilter) { | |
var query = fn.eval('query') | |
// query can be Array (pipeline) or Object ($match stage) | |
var pipeline = query || []; | |
if (pipeline && !pipeline.map) { | |
// convert object to $match stage in pipeline | |
pipeline = [{$match: pipeline}]; | |
} | |
if (pfilter) { | |
var pqry = {} | |
for (var q in pfilter) { | |
var re = pfilter[q]; | |
if (!/^\^|\.\*|\$$/.test(re)) | |
re = '^' + escapeRegExp(re); | |
pqry[q.replace(/__/g, '.')] = {$regex: re }; | |
} | |
for (var i = 0; i < pipeline.length; i++) { | |
if (!pipeline[i].$match) | |
break; | |
} | |
pipeline.splice(i, 0, {$match: pqry}); | |
} | |
// https://developers.google.com/datastudio/connector/date-range#getdata_behavior_when_daterangerequired_is_true | |
if (dateRange && dateRange.startDate) { | |
var dateRangeField = fn.get('dateRangeField').split(':'); | |
// lower bound dateRangeField support | |
var startDate = dateRange.startDate; | |
var endDate = dateRange.endDate; | |
if (!sample && dateRangeField[1]) { | |
var dtz = dateRangeField[1].split(/TZ?/i); | |
var days = +(dtz[0] || 0); // days allowed | |
var tz = +(dtz[1] || getProperty('timezoneOffset') || 7) ; // timezone default +7 (bangkok) | |
var lowDate = calcJSONDate(days, tz).split('T')[0]; | |
if (lowDate > startDate) | |
startDate = lowDate; | |
} | |
if (startDate > endDate) | |
return null; | |
var sameDate = (startDate === endDate); | |
var dateRangeQuery = sameDate? startDate : {$gte: startDate, $lte: endDate}; | |
pipeline.unshift({$match: newObj(dateRangeField[0].replace(/__/g, '.'), dateRangeQuery)}); | |
} | |
return pipeline; | |
} | |
fn.cache = function (key, options) { | |
options = options || {} | |
var chunk = options.chunk; | |
var url = fn.get('mlabUrl'); | |
var collection = fn.get('collection'); | |
var entry = [key, collection, url].join('/') | |
var PSIGN = '~~~~~~'; | |
var cache = CacheService.getScriptCache(); | |
var genKeys = function (start, end) { | |
var keys = [] | |
while (start <= end) { | |
keys.push(entry + start) ; | |
start += 1; | |
} | |
return keys | |
} | |
var _getChunk = function (){ | |
var value = ''; | |
var idx = 10 | |
var firstEntry = entry + '1'; | |
var values = cache.getAll(genKeys(1, idx)); | |
if (values[firstEntry]) { | |
var chunkCount = +(values[firstEntry].split(PSIGN)[0]) ; | |
values[firstEntry] = values[firstEntry].replace(new RegExp('^\\d+'+ PSIGN), '') | |
if (chunkCount > idx) { | |
var avalues = cache.getAll(genKeys(idx + 1, chunkCount)); | |
Object.assign(values, avalues); | |
} | |
var keys = genKeys(1, chunkCount) ; | |
keys.forEach(function(key){ | |
if (!values[key]) { | |
cache.removeAll([entry].concat(keys)); | |
return undefined; | |
} | |
value += values[key]; | |
}); | |
// if (chunkCount > 1) | |
// console.log(key + ' read chunk count', chunkCount) | |
} | |
try { | |
return value && JSON.parse(value); | |
} catch (err) { | |
_remove(); | |
throw(err); | |
} | |
} | |
var _get = function () { | |
if (chunk) | |
return _getChunk(); | |
var value = cache.get(entry); | |
try { | |
return value && JSON.parse(value); | |
} catch (err) { | |
_remove(); | |
throw(err); | |
} | |
} | |
var _putChunk = function (value, expire) { | |
var strValue = JSON.stringify(value) | |
// size limit per key 100k | |
var chunk = {} | |
var chunkSize = 80 * 1024; | |
var idx = 0; | |
while (strValue.length) { | |
idx += 1; | |
chunk[entry + idx] = strValue.slice(0, chunkSize); | |
strValue = strValue.slice(chunkSize); | |
} | |
// if (idx > 1) | |
// console.log(key + ' write chunk count', idx) | |
if (options.verbose) | |
console.log(key + ' cached, chunk=' + idx + (expire? (' expire=' + expire + 's') : '')); | |
var firstEntry = entry + 1; | |
chunk[firstEntry] = '' + idx + PSIGN + chunk[firstEntry]; | |
var args = [chunk]; | |
if (expire != undefined) | |
args.push(expire); | |
return cache.putAll.apply(cache, args); | |
} | |
var _put = function (value, expire) { | |
if (chunk) | |
return _putChunk(value, expire); | |
var strValue = JSON.stringify(value) | |
var args = [entry, strValue]; | |
if (expire != undefined) | |
args.push(expire); | |
return cache.put.apply(cache, args); | |
} | |
var _removeChunk = function () { | |
return cache.removeAll(genKeys(1, 100)) | |
} | |
var _remove = function () { | |
if (chunk) | |
return _removeChunk(); | |
return cache.remove(entry); | |
} | |
var _race = function (process, raceopts) { | |
raceopts = Object.assign({}, options, raceopts); | |
var expire = raceopts.expire || 10 * 60; // default expire 10 minutes | |
var tsEntry = 'ts_' + entry; | |
var procEntry = 'proc_' + entry; | |
var state = cache.get(procEntry) | |
var bidState = function () { | |
var ts = Date.now(); | |
cache.put(tsEntry, '' + ts); | |
Utilities.sleep(100); // 100ms concurrent checking, other charts may try sampling data at same time | |
if (ts != +cache.get(tsEntry)) { | |
return 'loser'; | |
} | |
} | |
if (!raceopts.force) { | |
if (!state) { | |
state = bidState() | |
if (state) | |
console.log(key + ', wait for other\'s processing') | |
} | |
var retries = 0; | |
var result; | |
var timeout = Date.now() + ((raceopts.raceWait || 60) * 1000); // default raceWait 1 minutes | |
while (state) { | |
result = _get(); | |
if (result){ | |
if (retries) | |
console.log(key + ' from cache after retry=' + retries); | |
return result ; | |
} | |
if (cache.get(procEntry) != 'processing') { | |
state = bidState(); | |
if (!state) | |
break; | |
} | |
if (Date.now() > timeout) | |
break; | |
retries += 1; | |
Utilities.sleep(200); | |
} | |
} | |
cache.put(procEntry, 'processing'); | |
// long proces | |
result = process(); | |
_put(result, expire) | |
cache.put(procEntry, 'finished', expire); | |
return result; | |
} | |
return {get: _get, put: _put, remove: _remove, race: _race }; | |
} | |
fn.dataCache = function (pipeline, options) { | |
var query = fn.eval('query'); | |
// should cache only pipeline query | |
if (!query || !query.map) | |
return null; | |
var key = JSON.stringify(pipeline); | |
return fn.cache(MD5(key), Object.assign({chunk: true, raceWait: 5 * 60}, options)); | |
} | |
fn.fetchData = function (pipeline, batchSize) { | |
// no pipeline - no result | |
if (!pipeline) | |
return []; | |
// https://docs.mlab.com/data-api/#commands | |
// https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#aggregate | |
var url = fn.get('mlabUrl'); | |
if (!/^https?\:\/\//.test(url)) { | |
// conventional - url contain only dbname, full url store in script env | |
var propUrl = getProperty('mlabUrl'); | |
url = propUrl.replace('{db}', url); | |
} | |
if (/\{apiKey\}/.test(url)) { | |
var apiKey = getProperty('apiKey'); | |
url = url.replace('{apiKey}', apiKey); | |
} | |
var payload = { | |
aggregate: fn.get('collection'), | |
pipeline: pipeline, | |
options: {readPreference: 'secondaryPreferred'}, | |
}; | |
var cursor = {} | |
if (batchSize) | |
cursor.batchSize = batchSize; | |
var options = { | |
method: 'post', | |
contentType: 'application/json', | |
payload: JSON.stringify(payload), | |
} | |
// https://developers.google.com/apps-script/reference/url-fetch/url-fetch-app#fetchurl,-params | |
// return httpResponse - https://developers.google.com/apps-script/reference/url-fetch/http-response.html | |
var result = JSON.parse(UrlFetchApp.fetch(url, options).getContentText()); | |
if (result.errmsg && result.code===9) { | |
// aggregate error - The 'cursor' option is required' | |
// retry again | |
payload.cursor = cursor; | |
payload.allowDiskUse = true; | |
options.payload = JSON.stringify(payload); | |
result = JSON.parse(UrlFetchApp.fetch(url, options).getContentText()); | |
} | |
// error handling - https://developers.google.com/datastudio/connector/error-handling#user-facing-errors | |
if (!result.ok && result.errmsg) { | |
console.log(result.errmsg, options); | |
showError(result.errmsg); | |
} | |
// mlab result | |
if (result.cursor) | |
return result.cursor.firstBatch; | |
// mserver result | |
return result.values || []; | |
} | |
return fn; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment