Skip to content

Instantly share code, notes, and snippets.

@jojosati
Created September 21, 2021 09:47
Show Gist options
  • Save jojosati/700450b740248b7f9743bce8b769f7f5 to your computer and use it in GitHub Desktop.
Save jojosati/700450b740248b7f9743bce8b769f7f5 to your computer and use it in GitHub Desktop.
MongoDB connector for Data Studio - aggregation cache (configService.gs)
function configService(config) {
var fn = { };
fn.get = function (key) {
return config[key] || '';
}
fn.eval = function (key) {
var exp = fn.get(key)
if (exp)
return eval('(' + exp + ')');
}
fn.opt = function (key, okey) {
var exp = fn.get(key)
if (exp) {
var re = new RegExp('\\/\\*{2,}\\s*' + okey + '\\s*\\:\\s*(.*?)\\s*\\*{2,}\\/', 'i')
var m = exp.match(re)
if (m)
return m[1];
}
}
fn.list = function (key) {
return fn.get(key).split(/(?:\s*[\,\n]\s*)+/).filter(function(v){ return v });
}
fn.pipeline = function (dateRange, sample, pfilter) {
var query = fn.eval('query')
// query can be Array (pipeline) or Object ($match stage)
var pipeline = query || [];
if (pipeline && !pipeline.map) {
// convert object to $match stage in pipeline
pipeline = [{$match: pipeline}];
}
if (pfilter) {
var pqry = {}
for (var q in pfilter) {
var re = pfilter[q];
if (!/^\^|\.\*|\$$/.test(re))
re = '^' + escapeRegExp(re);
pqry[q.replace(/__/g, '.')] = {$regex: re };
}
for (var i = 0; i < pipeline.length; i++) {
if (!pipeline[i].$match)
break;
}
pipeline.splice(i, 0, {$match: pqry});
}
// https://developers.google.com/datastudio/connector/date-range#getdata_behavior_when_daterangerequired_is_true
if (dateRange && dateRange.startDate) {
var dateRangeField = fn.get('dateRangeField').split(':');
// lower bound dateRangeField support
var startDate = dateRange.startDate;
var endDate = dateRange.endDate;
if (!sample && dateRangeField[1]) {
var dtz = dateRangeField[1].split(/TZ?/i);
var days = +(dtz[0] || 0); // days allowed
var tz = +(dtz[1] || getProperty('timezoneOffset') || 7) ; // timezone default +7 (bangkok)
var lowDate = calcJSONDate(days, tz).split('T')[0];
if (lowDate > startDate)
startDate = lowDate;
}
if (startDate > endDate)
return null;
var sameDate = (startDate === endDate);
var dateRangeQuery = sameDate? startDate : {$gte: startDate, $lte: endDate};
pipeline.unshift({$match: newObj(dateRangeField[0].replace(/__/g, '.'), dateRangeQuery)});
}
return pipeline;
}
fn.cache = function (key, options) {
options = options || {}
var chunk = options.chunk;
var url = fn.get('mlabUrl');
var collection = fn.get('collection');
var entry = [key, collection, url].join('/')
var PSIGN = '~~~~~~';
var cache = CacheService.getScriptCache();
var genKeys = function (start, end) {
var keys = []
while (start <= end) {
keys.push(entry + start) ;
start += 1;
}
return keys
}
var _getChunk = function (){
var value = '';
var idx = 10
var firstEntry = entry + '1';
var values = cache.getAll(genKeys(1, idx));
if (values[firstEntry]) {
var chunkCount = +(values[firstEntry].split(PSIGN)[0]) ;
values[firstEntry] = values[firstEntry].replace(new RegExp('^\\d+'+ PSIGN), '')
if (chunkCount > idx) {
var avalues = cache.getAll(genKeys(idx + 1, chunkCount));
Object.assign(values, avalues);
}
var keys = genKeys(1, chunkCount) ;
keys.forEach(function(key){
if (!values[key]) {
cache.removeAll([entry].concat(keys));
return undefined;
}
value += values[key];
});
// if (chunkCount > 1)
// console.log(key + ' read chunk count', chunkCount)
}
try {
return value && JSON.parse(value);
} catch (err) {
_remove();
throw(err);
}
}
var _get = function () {
if (chunk)
return _getChunk();
var value = cache.get(entry);
try {
return value && JSON.parse(value);
} catch (err) {
_remove();
throw(err);
}
}
var _putChunk = function (value, expire) {
var strValue = JSON.stringify(value)
// size limit per key 100k
var chunk = {}
var chunkSize = 80 * 1024;
var idx = 0;
while (strValue.length) {
idx += 1;
chunk[entry + idx] = strValue.slice(0, chunkSize);
strValue = strValue.slice(chunkSize);
}
// if (idx > 1)
// console.log(key + ' write chunk count', idx)
if (options.verbose)
console.log(key + ' cached, chunk=' + idx + (expire? (' expire=' + expire + 's') : ''));
var firstEntry = entry + 1;
chunk[firstEntry] = '' + idx + PSIGN + chunk[firstEntry];
var args = [chunk];
if (expire != undefined)
args.push(expire);
return cache.putAll.apply(cache, args);
}
var _put = function (value, expire) {
if (chunk)
return _putChunk(value, expire);
var strValue = JSON.stringify(value)
var args = [entry, strValue];
if (expire != undefined)
args.push(expire);
return cache.put.apply(cache, args);
}
var _removeChunk = function () {
return cache.removeAll(genKeys(1, 100))
}
var _remove = function () {
if (chunk)
return _removeChunk();
return cache.remove(entry);
}
var _race = function (process, raceopts) {
raceopts = Object.assign({}, options, raceopts);
var expire = raceopts.expire || 10 * 60; // default expire 10 minutes
var tsEntry = 'ts_' + entry;
var procEntry = 'proc_' + entry;
var state = cache.get(procEntry)
var bidState = function () {
var ts = Date.now();
cache.put(tsEntry, '' + ts);
Utilities.sleep(100); // 100ms concurrent checking, other charts may try sampling data at same time
if (ts != +cache.get(tsEntry)) {
return 'loser';
}
}
if (!raceopts.force) {
if (!state) {
state = bidState()
if (state)
console.log(key + ', wait for other\'s processing')
}
var retries = 0;
var result;
var timeout = Date.now() + ((raceopts.raceWait || 60) * 1000); // default raceWait 1 minutes
while (state) {
result = _get();
if (result){
if (retries)
console.log(key + ' from cache after retry=' + retries);
return result ;
}
if (cache.get(procEntry) != 'processing') {
state = bidState();
if (!state)
break;
}
if (Date.now() > timeout)
break;
retries += 1;
Utilities.sleep(200);
}
}
cache.put(procEntry, 'processing');
// long proces
result = process();
_put(result, expire)
cache.put(procEntry, 'finished', expire);
return result;
}
return {get: _get, put: _put, remove: _remove, race: _race };
}
fn.dataCache = function (pipeline, options) {
var query = fn.eval('query');
// should cache only pipeline query
if (!query || !query.map)
return null;
var key = JSON.stringify(pipeline);
return fn.cache(MD5(key), Object.assign({chunk: true, raceWait: 5 * 60}, options));
}
fn.fetchData = function (pipeline, batchSize) {
// no pipeline - no result
if (!pipeline)
return [];
// https://docs.mlab.com/data-api/#commands
// https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#aggregate
var url = fn.get('mlabUrl');
if (!/^https?\:\/\//.test(url)) {
// conventional - url contain only dbname, full url store in script env
var propUrl = getProperty('mlabUrl');
url = propUrl.replace('{db}', url);
}
if (/\{apiKey\}/.test(url)) {
var apiKey = getProperty('apiKey');
url = url.replace('{apiKey}', apiKey);
}
var payload = {
aggregate: fn.get('collection'),
pipeline: pipeline,
options: {readPreference: 'secondaryPreferred'},
};
var cursor = {}
if (batchSize)
cursor.batchSize = batchSize;
var options = {
method: 'post',
contentType: 'application/json',
payload: JSON.stringify(payload),
}
// https://developers.google.com/apps-script/reference/url-fetch/url-fetch-app#fetchurl,-params
// return httpResponse - https://developers.google.com/apps-script/reference/url-fetch/http-response.html
var result = JSON.parse(UrlFetchApp.fetch(url, options).getContentText());
if (result.errmsg && result.code===9) {
// aggregate error - The 'cursor' option is required'
// retry again
payload.cursor = cursor;
payload.allowDiskUse = true;
options.payload = JSON.stringify(payload);
result = JSON.parse(UrlFetchApp.fetch(url, options).getContentText());
}
// error handling - https://developers.google.com/datastudio/connector/error-handling#user-facing-errors
if (!result.ok && result.errmsg) {
console.log(result.errmsg, options);
showError(result.errmsg);
}
// mlab result
if (result.cursor)
return result.cursor.firstBatch;
// mserver result
return result.values || [];
}
return fn;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment