Skip to content

Instantly share code, notes, and snippets.

@jojosati
Created September 21, 2021 09:45
Show Gist options
  • Save jojosati/9137a8df017ea8ed526ed30d5c665de1 to your computer and use it in GitHub Desktop.
Save jojosati/9137a8df017ea8ed526ed30d5c665de1 to your computer and use it in GitHub Desktop.
MongoDB connector for Data Studio - aggregation cache (getData.gs)
// ==== getData ====
// https://developers.google.com/datastudio/connector/build#fetch_and_return_data_with_getdata
function getData(request) {
// https://developers.google.com/datastudio/connector/reference#request_3
var rconfig = request.configParams;
var cs = configService(rconfig);
// https://developers.google.com/datastudio/connector/reference#scriptparams
var isSample = request.scriptParams && request.scriptParams.sampleExtraction;
var query = cs.eval('query');
// should cache only pipeline query (array)
var liveQuery = (!query || !query.map);
var filtersApplied = false;
var rowsMaxCmd = cs.get('rowsMax').split(':');
var rowsMax = +(rowsMaxCmd[0] || 100000);
var batchSize = +(rowsMaxCmd[1] || 0) ;
if (!batchSize) {
batchSize = Math.ceil(rowsMax/500) * 100 ;
if (batchSize > 20000)
batchSize = 20000;
}
var pfilter;
if (rconfig.pkey) {
var pflds = rconfig.pkey.split(/\s*\,\s*/);
for (var p in pflds) {
p = pflds[p];
var v = rconfig[p.replace(/\./g, '__')];
if (v) {
if (!pfilter)
pfilter = {};
pfilter[p] = v;
}
}
}
var dataCache
var schema = []
var pipeline = cs.pipeline(request.dateRange, isSample, pfilter);
var schemaFields = refreshRawSchema(request);
var hasRecCount = !!schemaFields.find(function(fld){ return fld.name == RECCOUNT});
if (!liveQuery) {
schema = schemaFields ;
if (schema.length) {
var cfgSchema = schemaFromConfig(cs.list('schemaFields'))
if (cfgSchema.length) {
var projection = {}
schema.forEach(function(fld){
projection[fld.name.replace(/__/g, '.')] = true
})
pipeline.push({$project: projection});
}
}
if (!hasRecCount) {
pipeline.push({$addFields: newObj(RECCOUNT, 1)});
schema.push({name: RECCOUNT, dataType: 'NUMBER'});
}
if (!isSample)
dataCache = cs.dataCache(pipeline, {verbose: true});
}
else {
// live query (aggregate each dimensions trigger)
// https://developers.google.com/datastudio/connector/reference#dimensionsfilters
// https://developers.google.com/datastudio/connector/reference#filteroperator
if (request.dimensionsFilters && request.dimensionsFilters.length) {
// console.log('dimensionsFilters', JSON.stringify(request.dimensionsFilters))
filtersApplied = true;
var qry = filterQuery(request.dimensionsFilters);
if (qry) {
pipeline.push({$match: qry});
}
}
var projection = {};
request.fields.forEach(function(fld) {
if (fld.forFilterOnly && filtersApplied)
return;
projection[fld.name] = true
})
var fieldNames = Object.keys(projection);
// var schemaFields = refreshSchema(request);
schema = schemaFields.filter(function(scf) { return fieldNames.indexOf(scf.name) !== -1; })
// optimize using group stage
var numberfields = cs.list('numberFields');
if (!numberfields.length) {
numberfields = schemaFields
.filter(function(scf) { return scf.dataType === 'NUMBER' })
.map(function(scf){ return scf.name });
}
if (numberFields.indexOf(RECCOUNT)==-1)
numberfields.push(RECCOUNT);
if (numberfields.length) {
var grpfields = fieldNames.filter(function(n){ return numberfields.indexOf(n) === -1 });
var sumfields = fieldNames.filter(function(n){ return numberfields.indexOf(n) !== -1 });
if (sumfields.length) {
// generate aggregate group
var grp_id = grpfields.reduce(function (obj, n) { obj[n] = '$' + n.replace(/__/g, '.'); return obj; }, {});
var grpqry = sumfields.reduce(function(obj, n) {
obj[n] = {$sum: ((n == RECCOUNT && !hasRecCount)? 1: ('$' + n.replace(/__/g,'.')))}; return obj; }, {});
grpqry._id = grpfields.length? grp_id : null;
pipeline.push({$group: grpqry});
// rewrite projection
grpfields.forEach(function(n) {
projection[n] = '$_id.' + n;
})
projection['_id'] = false;
}
else {
grpfields.forEach(function(n){ projection[n] = 1 });
}
}
else if (fieldNames.length === 1) {
pipeline.push({$group:{ _id: '$' + fieldNames[0].replace(/__/g, '.') }});
// rewrite projection
projection[fieldNames[0]] = '$_id';
// projection[fieldNames[0]] = true;
projection['_id'] = false;
}
pipeline.push({$project: projection});
}
var process = function () {
var rows = [];
var readMore = true;
var limit = isSample? 10 : batchSize;
var batchCount = 0;
while (readMore) {
var _pipeline = pipeline.concat([]);
if (rows.length)
_pipeline.push({$skip: rows.length});
_pipeline.push({$limit: limit});
var _jsonData = [];
try {
_jsonData = cs.fetchData(_pipeline, batchSize);
}
catch (e) {
console.log(e)
showError('getData fail.', e);
break;
}
if (_jsonData.length) {
if (!schema.length) {
schema = schemaFromSample(_jsonData.slice(0, 100), fieldNames);
}
var _rows = _jsonData.map(function(doc){
var values = schema.map(function(scf) {
return objResolve(doc, scf.name)
});
return {values: values};
});
rows.push.apply(rows, _rows);
}
batchCount += 1;
readMore = (_jsonData.length >= batchSize && (!rowsMax || rows.length < rowsMax));
}
console.log(JSON.stringify(pipeline))
console.log('collection=' + cs.get('collection') + ' rows=' + rows.length + ' batchSize=' + batchSize + ' batchCount=' + batchCount );
return {schema: schema, rows: rows, filtersApplied: filtersApplied};
}
if (dataCache) {
var expire = +(cs.opt('query', 'expire') || (60 *60)) ;
var result = dataCache.race(process, {expire: expire} )
if (result) {
var names = request.fields.map(function(fld) { return fld.name });
var selects = result.schema.map(function (fld) { return names.indexOf(fld.name) != -1 });
result.schema = result.schema.filter(function (fld, i) { return selects[i] });
result.rows.forEach(function (row) {
row.values = row.values.filter(function (v, i) { return selects[i] });
})
}
return result;
}
return process();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment