const CORES = require('os').cpus().length; const path = require('path'); const fork = require('child_process').fork; module.exports = class MapReduce { constructor(map, reduce, callback, config = {}) { if (typeof map !== 'string' || typeof reduce !== 'string' || typeof callback !== 'function') { throw new Error('Constructor signature not matching actual parameters.'); } this.options = { // THREADS: // // Number of threads we want to use and also number of Map/Reduce nodes that will be created. threads: config.threads || CORES, // KEYS PER REDUCE NODE: // // Number of slices of the intermediate result that each Reduce node will get. // 0 means each Reduce will get # KEYS / # NODES slices of the intermediate result. slicesPerReduceNode: config.slicesPerReduceNode && -config.slicesPerReduceNode || 0, }; // TODO: Add underscore this.callback = callback; this.workers = {}; this.workersAvailable = []; this.dataChunks = {}; this.dataChunksAvailable = []; this.workersToDataChunks = {}; this.dataChunksCount = 0; this.intermediateGroupedResult = {}; this.intermediateGroupedResultsAvailable = []; this.workersToIntermediateGroupedResults = {}; this.result = {}; this.isMapFinished = false; this.isReduceFinished = false; this._distributeWork = this._distributeMap.bind(this); // TODO: Is this bind needed? // Init communication events: for(let i = 0; i < this.options.threads; ++i) { const worker = fork(path.join(__dirname, 'process.js'), [map, reduce]); worker.on('message', result => { if (result === 'READY') { this.workersAvailable.push(worker.pid); this._distributeWork(); } else if (this.isMapFinished) { this._onReduceFinished(worker, result); } else { this._onMapFinished(worker, result); } }); worker.on('exit', (code, signal) => { if (signal) { // console.log(`worker was killed by signal: ${signal}`); } else if (code !== 0) { // console.log(`worker exited with error code: ${code}`); } else { // console.log('worker success!'); } }); this.workers[worker.pid] = worker; } } push(data) { if (this.isMapFinished) { throw new Error(`Can't push more data as the ${ this.isReduceFinished ? 'Reduce' : 'Map' } phase is already finished.`); } data = Array.isArray(data) ? data : [data]; data.forEach(dataChunk => { const index = this.dataChunksCount++; this.dataChunks[index] = dataChunk; this.dataChunksAvailable.push(index); }); this._distributeWork(); } _distributeMap() { if (this.dataChunksAvailable.length === 0 || this.workersAvailable.length === 0) { // console.log('No data or nodes available...'); return; } const dataChunkIndex = this.dataChunksAvailable.pop(); const dataChunk = this.dataChunks[dataChunkIndex]; const workerIndex = this.workersAvailable.pop(); const worker = this.workers[workerIndex]; worker.send({ map: true, key: dataChunkIndex, value: dataChunk, }); this.workersToDataChunks[workerIndex] = dataChunkIndex; this._distributeMap(); } _distributeReduce() { // console.log(Object.keys(this.intermediateGroupedResult).length); // TODO: Maybe the nodes could do that after the map, if the data is big enough! /* for (const key in this.intermediateGroupedResult) { this.intermediateGroupedResult[key] = this.intermediateGroupedResult[key].reduce((a, b) => a + b, 0) } console.log(this.intermediateGroupedResult); throw new Error('BYE'); */ if (this.intermediateGroupedResultsAvailable.length === 0 || this.workersAvailable.length === 0) { // console.log('No intermediate results or nodes available...'); return; } // console.log('Remaining Reduces = ' + this.intermediateGroupedResultsAvailable.length); const intermediateResultKeys = this.intermediateGroupedResultsAvailable.splice(this.options.slicesPerReduceNode); const intermediateResultsData = intermediateResultKeys .map(intermediateResultKey => [intermediateResultKey, this.intermediateGroupedResult[intermediateResultKey]]); const workerIndex = this.workersAvailable.pop(); const worker = this.workers[workerIndex]; worker.send({ map: false, slices: intermediateResultsData, }); this.workersToIntermediateGroupedResults[workerIndex] = intermediateResultKeys; this._distributeReduce(); } _groupMapResults(mapResult, groupedResult = {}) { for (const tuple of mapResult) { const key = tuple[0]; const value = tuple[1]; const group = groupedResult[key] || []; group.push(value); groupedResult[key] = group; } return groupedResult; } _onMapFinished(worker, mapResult) { const workerId = worker.pid; const dataChunkIndex = this.workersToDataChunks[workerId]; delete this.workersToDataChunks[workerId]; delete this.dataChunks[dataChunkIndex]; this.workersAvailable.push(workerId); this._groupMapResults(mapResult, this.intermediateGroupedResult); if (this.isMapFinished = Object.keys(this.dataChunks).length === 0) { this._distributeWork = this._distributeReduce.bind(this); // TODO: Is this bind needed? const intermediateGroupedResultsAvailable = this.intermediateGroupedResultsAvailable = Object.keys(this.intermediateGroupedResult); if (this.options.slicesPerReduceNode === 0) { this.options.slicesPerReduceNode = - Math.ceil(intermediateGroupedResultsAvailable.length / this.options.threads); } } else { this.callback(false); } this._distributeWork(); } _onReduceFinished(worker, reduceResult) { const workerId = worker.pid; const intermediateResultKeys = this.workersToIntermediateGroupedResults[workerId]; delete this.workersToIntermediateGroupedResults[workerId]; // TODO: Is this efficient? intermediateResultKeys.map(key => delete this.intermediateGroupedResult[key]); this.workersAvailable.push(workerId); // console.log(reduceResult); Object.assign(this.result, reduceResult); // TODO: Should we check this.intermediateGroupedResultsAvailable instead= if (this.isReduceFinished = Object.keys(this.intermediateGroupedResult).length === 0) { this.callback(true, this.result); for (const pid in this.workers) { this.workers[pid].kill(); } } else { this._distributeWork(); } } };