Created
September 21, 2021 09:45
-
-
Save jojosati/9137a8df017ea8ed526ed30d5c665de1 to your computer and use it in GitHub Desktop.
MongoDB connector for Data Studio - aggregation cache (getData.gs)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ==== getData ==== | |
// https://developers.google.com/datastudio/connector/build#fetch_and_return_data_with_getdata | |
function getData(request) { | |
// https://developers.google.com/datastudio/connector/reference#request_3 | |
var rconfig = request.configParams; | |
var cs = configService(rconfig); | |
// https://developers.google.com/datastudio/connector/reference#scriptparams | |
var isSample = request.scriptParams && request.scriptParams.sampleExtraction; | |
var query = cs.eval('query'); | |
// should cache only pipeline query (array) | |
var liveQuery = (!query || !query.map); | |
var filtersApplied = false; | |
var rowsMaxCmd = cs.get('rowsMax').split(':'); | |
var rowsMax = +(rowsMaxCmd[0] || 100000); | |
var batchSize = +(rowsMaxCmd[1] || 0) ; | |
if (!batchSize) { | |
batchSize = Math.ceil(rowsMax/500) * 100 ; | |
if (batchSize > 20000) | |
batchSize = 20000; | |
} | |
var pfilter; | |
if (rconfig.pkey) { | |
var pflds = rconfig.pkey.split(/\s*\,\s*/); | |
for (var p in pflds) { | |
p = pflds[p]; | |
var v = rconfig[p.replace(/\./g, '__')]; | |
if (v) { | |
if (!pfilter) | |
pfilter = {}; | |
pfilter[p] = v; | |
} | |
} | |
} | |
var dataCache | |
var schema = [] | |
var pipeline = cs.pipeline(request.dateRange, isSample, pfilter); | |
var schemaFields = refreshRawSchema(request); | |
var hasRecCount = !!schemaFields.find(function(fld){ return fld.name == RECCOUNT}); | |
if (!liveQuery) { | |
schema = schemaFields ; | |
if (schema.length) { | |
var cfgSchema = schemaFromConfig(cs.list('schemaFields')) | |
if (cfgSchema.length) { | |
var projection = {} | |
schema.forEach(function(fld){ | |
projection[fld.name.replace(/__/g, '.')] = true | |
}) | |
pipeline.push({$project: projection}); | |
} | |
} | |
if (!hasRecCount) { | |
pipeline.push({$addFields: newObj(RECCOUNT, 1)}); | |
schema.push({name: RECCOUNT, dataType: 'NUMBER'}); | |
} | |
if (!isSample) | |
dataCache = cs.dataCache(pipeline, {verbose: true}); | |
} | |
else { | |
// live query (aggregate each dimensions trigger) | |
// https://developers.google.com/datastudio/connector/reference#dimensionsfilters | |
// https://developers.google.com/datastudio/connector/reference#filteroperator | |
if (request.dimensionsFilters && request.dimensionsFilters.length) { | |
// console.log('dimensionsFilters', JSON.stringify(request.dimensionsFilters)) | |
filtersApplied = true; | |
var qry = filterQuery(request.dimensionsFilters); | |
if (qry) { | |
pipeline.push({$match: qry}); | |
} | |
} | |
var projection = {}; | |
request.fields.forEach(function(fld) { | |
if (fld.forFilterOnly && filtersApplied) | |
return; | |
projection[fld.name] = true | |
}) | |
var fieldNames = Object.keys(projection); | |
// var schemaFields = refreshSchema(request); | |
schema = schemaFields.filter(function(scf) { return fieldNames.indexOf(scf.name) !== -1; }) | |
// optimize using group stage | |
var numberfields = cs.list('numberFields'); | |
if (!numberfields.length) { | |
numberfields = schemaFields | |
.filter(function(scf) { return scf.dataType === 'NUMBER' }) | |
.map(function(scf){ return scf.name }); | |
} | |
if (numberFields.indexOf(RECCOUNT)==-1) | |
numberfields.push(RECCOUNT); | |
if (numberfields.length) { | |
var grpfields = fieldNames.filter(function(n){ return numberfields.indexOf(n) === -1 }); | |
var sumfields = fieldNames.filter(function(n){ return numberfields.indexOf(n) !== -1 }); | |
if (sumfields.length) { | |
// generate aggregate group | |
var grp_id = grpfields.reduce(function (obj, n) { obj[n] = '$' + n.replace(/__/g, '.'); return obj; }, {}); | |
var grpqry = sumfields.reduce(function(obj, n) { | |
obj[n] = {$sum: ((n == RECCOUNT && !hasRecCount)? 1: ('$' + n.replace(/__/g,'.')))}; return obj; }, {}); | |
grpqry._id = grpfields.length? grp_id : null; | |
pipeline.push({$group: grpqry}); | |
// rewrite projection | |
grpfields.forEach(function(n) { | |
projection[n] = '$_id.' + n; | |
}) | |
projection['_id'] = false; | |
} | |
else { | |
grpfields.forEach(function(n){ projection[n] = 1 }); | |
} | |
} | |
else if (fieldNames.length === 1) { | |
pipeline.push({$group:{ _id: '$' + fieldNames[0].replace(/__/g, '.') }}); | |
// rewrite projection | |
projection[fieldNames[0]] = '$_id'; | |
// projection[fieldNames[0]] = true; | |
projection['_id'] = false; | |
} | |
pipeline.push({$project: projection}); | |
} | |
var process = function () { | |
var rows = []; | |
var readMore = true; | |
var limit = isSample? 10 : batchSize; | |
var batchCount = 0; | |
while (readMore) { | |
var _pipeline = pipeline.concat([]); | |
if (rows.length) | |
_pipeline.push({$skip: rows.length}); | |
_pipeline.push({$limit: limit}); | |
var _jsonData = []; | |
try { | |
_jsonData = cs.fetchData(_pipeline, batchSize); | |
} | |
catch (e) { | |
console.log(e) | |
showError('getData fail.', e); | |
break; | |
} | |
if (_jsonData.length) { | |
if (!schema.length) { | |
schema = schemaFromSample(_jsonData.slice(0, 100), fieldNames); | |
} | |
var _rows = _jsonData.map(function(doc){ | |
var values = schema.map(function(scf) { | |
return objResolve(doc, scf.name) | |
}); | |
return {values: values}; | |
}); | |
rows.push.apply(rows, _rows); | |
} | |
batchCount += 1; | |
readMore = (_jsonData.length >= batchSize && (!rowsMax || rows.length < rowsMax)); | |
} | |
console.log(JSON.stringify(pipeline)) | |
console.log('collection=' + cs.get('collection') + ' rows=' + rows.length + ' batchSize=' + batchSize + ' batchCount=' + batchCount ); | |
return {schema: schema, rows: rows, filtersApplied: filtersApplied}; | |
} | |
if (dataCache) { | |
var expire = +(cs.opt('query', 'expire') || (60 *60)) ; | |
var result = dataCache.race(process, {expire: expire} ) | |
if (result) { | |
var names = request.fields.map(function(fld) { return fld.name }); | |
var selects = result.schema.map(function (fld) { return names.indexOf(fld.name) != -1 }); | |
result.schema = result.schema.filter(function (fld, i) { return selects[i] }); | |
result.rows.forEach(function (row) { | |
row.values = row.values.filter(function (v, i) { return selects[i] }); | |
}) | |
} | |
return result; | |
} | |
return process(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment