const CORES = require('os').cpus().length;

const path = require('path');
const fork = require('child_process').fork;

module.exports = class MapReduce {
	constructor(map, reduce, callback, config = {}) {
		if (typeof map !== 'string' || typeof reduce !== 'string' || typeof callback !== 'function') {
			throw new Error('Constructor signature not matching actual parameters.');
		}

		this.options = {
            // THREADS:
			//
			// Number of threads we want to use and also number of Map/Reduce nodes that will be created.

            threads: config.threads || CORES,

			// KEYS PER REDUCE NODE:
			//
			// Number of slices of the intermediate result that each Reduce node will get.
            // 0 means each Reduce will get # KEYS / # NODES slices of the intermediate result.

            slicesPerReduceNode: config.slicesPerReduceNode && -config.slicesPerReduceNode || 0,
		};

		// TODO: Add underscore

		this.callback = callback;

        this.workers = {};
        this.workersAvailable = [];

		this.dataChunks = {};
		this.dataChunksAvailable = [];
        this.workersToDataChunks = {};
        this.dataChunksCount = 0;

		this.intermediateGroupedResult = {};
		this.intermediateGroupedResultsAvailable = [];
        this.workersToIntermediateGroupedResults = {};

		this.result = {};

		this.isMapFinished = false;
		this.isReduceFinished = false;
		this._distributeWork = this._distributeMap.bind(this); // TODO: Is this bind needed?

		// Init communication events:

        for(let i = 0; i < this.options.threads; ++i) {
            const worker = fork(path.join(__dirname, 'process.js'), [map, reduce]);

            worker.on('message', result => {
                if (result === 'READY') {
                    this.workersAvailable.push(worker.pid);

                    this._distributeWork();
                } else if (this.isMapFinished) {
                    this._onReduceFinished(worker, result);
                } else {
                    this._onMapFinished(worker, result);
                }
            });

            worker.on('exit', (code, signal) => {
                if (signal) {
                    // console.log(`worker was killed by signal: ${signal}`);
                } else if (code !== 0) {
                    // console.log(`worker exited with error code: ${code}`);
                } else {
                    // console.log('worker success!');
                }
            });

            this.workers[worker.pid] = worker;
        }
	}
	
	push(data) {
		if (this.isMapFinished) {
			throw new Error(`Can't push more data as the ${ this.isReduceFinished ? 'Reduce' : 'Map' } phase is already finished.`);
		}
		
		data = Array.isArray(data) ? data : [data];
		
		data.forEach(dataChunk => {
			const index = this.dataChunksCount++;

			this.dataChunks[index] = dataChunk;
			this.dataChunksAvailable.push(index);
		});
		
		this._distributeWork();
	}

    _distributeMap() {
		if (this.dataChunksAvailable.length === 0 || this.workersAvailable.length === 0) {
			// console.log('No data or nodes available...');
			
			return;
		}
		
		const dataChunkIndex = this.dataChunksAvailable.pop();
		const dataChunk = this.dataChunks[dataChunkIndex];

		const workerIndex = this.workersAvailable.pop();
		const worker = this.workers[workerIndex];
		
		worker.send({
            map: true,
			key: dataChunkIndex,
			value: dataChunk,
		});
		
		this.workersToDataChunks[workerIndex] = dataChunkIndex;

		this._distributeMap();
	}

	_distributeReduce() {
		// console.log(Object.keys(this.intermediateGroupedResult).length);

		// TODO: Maybe the nodes could do that after the map, if the data is big enough!

		/*

		for (const key in this.intermediateGroupedResult) {
            this.intermediateGroupedResult[key] = this.intermediateGroupedResult[key].reduce((a, b) => a + b, 0)
		}

		console.log(this.intermediateGroupedResult);

		throw new Error('BYE');

		*/

        if (this.intermediateGroupedResultsAvailable.length === 0 || this.workersAvailable.length === 0) {
            // console.log('No intermediate results or nodes available...');

            return;
        }

        // console.log('Remaining Reduces = ' + this.intermediateGroupedResultsAvailable.length);

        const intermediateResultKeys = this.intermediateGroupedResultsAvailable.splice(this.options.slicesPerReduceNode);
        const intermediateResultsData = intermediateResultKeys
	        .map(intermediateResultKey => [intermediateResultKey, this.intermediateGroupedResult[intermediateResultKey]]);

        const workerIndex = this.workersAvailable.pop();
        const worker = this.workers[workerIndex];

        worker.send({
            map: false,
            slices: intermediateResultsData,
        });

        this.workersToIntermediateGroupedResults[workerIndex] = intermediateResultKeys;

        this._distributeReduce();
	}
	
	_groupMapResults(mapResult, groupedResult = {}) {
		for (const tuple of mapResult) {
			const key = tuple[0];
			const value = tuple[1];
			const group = groupedResult[key] || [];

			group.push(value);

			groupedResult[key] = group;
		}
		
		return groupedResult;
	}

	_onMapFinished(worker, mapResult) {
		const workerId = worker.pid;
		const dataChunkIndex = this.workersToDataChunks[workerId];

        delete this.workersToDataChunks[workerId];
		delete this.dataChunks[dataChunkIndex];

		this.workersAvailable.push(workerId);

        this._groupMapResults(mapResult, this.intermediateGroupedResult);

		if (this.isMapFinished = Object.keys(this.dataChunks).length === 0) {
            this._distributeWork = this._distributeReduce.bind(this); // TODO: Is this bind needed?

            const intermediateGroupedResultsAvailable = this.intermediateGroupedResultsAvailable = Object.keys(this.intermediateGroupedResult);

			if (this.options.slicesPerReduceNode === 0) {
                this.options.slicesPerReduceNode = - Math.ceil(intermediateGroupedResultsAvailable.length / this.options.threads);
			}
        } else {
			this.callback(false);
		}

        this._distributeWork();
	}

	_onReduceFinished(worker, reduceResult) {
        const workerId = worker.pid;
        const intermediateResultKeys = this.workersToIntermediateGroupedResults[workerId];

        delete this.workersToIntermediateGroupedResults[workerId];

        // TODO: Is this efficient?

        intermediateResultKeys.map(key => delete this.intermediateGroupedResult[key]);

        this.workersAvailable.push(workerId);

        // console.log(reduceResult);

        Object.assign(this.result, reduceResult);

        // TODO: Should we check this.intermediateGroupedResultsAvailable instead=

        if (this.isReduceFinished = Object.keys(this.intermediateGroupedResult).length === 0) {
            this.callback(true, this.result);

	        for (const pid in this.workers) {
                this.workers[pid].kill();
	        }
        } else {
            this._distributeWork();
        }
	}
};