-
-
Save ptgamr/4d1d07b770321a138c91 to your computer and use it in GitHub Desktop.
| 'use strict'; | |
| const fs = Promise.promisifyAll(require('fs')); | |
| const path = require('path'); | |
| const crypto = require('crypto'); | |
| const CronJob = require('cron').CronJob; | |
| module.exports = class FlowUploader { | |
| constructor(tempDir, uploadDir, maxFileSize, fileParameterName) { | |
| this.tempDir = tempDir || './tmp'; | |
| this.uploadDir = uploadDir || './uploads'; | |
| this.maxFileSize = maxFileSize; | |
| this.fileParameterName = fileParameterName || 'file'; | |
| try { | |
| fs.mkdirSync(this.tempDir); | |
| } catch (e) {} | |
| //run clean up job five minutes after midnight, every day | |
| new CronJob('5 0 * * *', () => { this._cleanUnfinishedChunks(); }, null, true, 'Europe/Zurich'); | |
| } | |
| chunkExists(req) { | |
| let chunkNumber = req.query.flowChunkNumber, | |
| chunkSize = req.query.flowChunkSize, | |
| totalSize = req.query.flowTotalSize, | |
| identifier = req.query.flowIdentifier, | |
| fileName = req.query.flowFilename; | |
| let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName); | |
| if (validation !== 'VALID') { | |
| return Promise.reject(validation); | |
| } | |
| let chunkFilename = this._getChunkFilename(chunkNumber, identifier); | |
| return fs.statAsync(chunkFilename); | |
| } | |
| saveChunk(req) { | |
| let fields = req.body, | |
| files = req.files; | |
| let chunkNumber = Number(fields.flowChunkNumber), | |
| chunkSize = Number(fields.flowChunkSize), | |
| totalSize = Number(fields.flowTotalSize), | |
| identifier = fields.flowIdentifier, | |
| fileName = fields.flowFilename; | |
| if (!files[this.fileParameterName] || !files[this.fileParameterName].size) { | |
| return Promise.reject('INVALID_FLOW_REQUEST'); | |
| } | |
| let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, files[this.fileParameterName].size); | |
| if (validation !== 'VALID') { | |
| return Promise.reject(validation); | |
| } | |
| let chunkFilename = this._getChunkFilename(chunkNumber, identifier); | |
| return fs.renameAsync(files[this.fileParameterName].path, chunkFilename) | |
| .then(() => { | |
| let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize); | |
| if (chunkNumber !== numberOfChunks) { | |
| return 'PARTLY_DONE'; | |
| } | |
| let chunkFileNames = []; | |
| for (let i = 1; i <= numberOfChunks; i++) { | |
| chunkFileNames.push(this._getChunkFilename(i, identifier)); | |
| } | |
| return Promise.map( | |
| chunkFileNames, | |
| chunkFileName => fs.statAsync(chunkFileName), | |
| {concurency: 2} | |
| ).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName)) | |
| .then(filename => filename, () => 'ERROR_VERIFY_CHUNK'); | |
| }); | |
| } | |
| download(fileName) { | |
| let downloadPath = this._getDownloadPath(fileName); | |
| return fs.statAsync(downloadPath).then(() => { | |
| return fs.createReadStream(downloadPath); | |
| }); | |
| } | |
| _isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, fileSize) { | |
| identifier = this._cleanIdentifier(identifier); | |
| if (!chunkNumber || !chunkSize || !totalSize || !identifier || !fileName) { | |
| return 'INVALID_FLOW_REQUEST'; | |
| } | |
| let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize); | |
| if (chunkNumber > numberOfChunks) { | |
| return 'INVALID_CHUNK_NUMBER'; | |
| } | |
| if (this.maxFileSize && totalSize > this.maxFileSize) { | |
| return 'INVALID_FILE_SIZE'; | |
| } | |
| if (typeof fileSize !== 'undefined') { | |
| if (chunkNumber < numberOfChunks && fileSize !== chunkSize) { | |
| console.log('>>>>.', typeof fileSize, typeof chunkSize); | |
| return 'INVALID_FILESIZE_CHUNKSIZE_MISMATCH'; | |
| } | |
| if (numberOfChunks > 1 && chunkNumber === numberOfChunks && fileSize !== ((totalSize % chunkSize) + parseInt(chunkSize))) { | |
| return 'INVALID_LAST_CHUNK'; | |
| } | |
| if (numberOfChunks === 1 && fileSize !== totalSize) { | |
| return 'INVALID_SINGLE_CHUNK'; | |
| } | |
| } | |
| return 'VALID'; | |
| } | |
| _getNumberOfChunks(totalSize, chunkSize) { | |
| return Math.max(Math.floor(totalSize/chunkSize), 1); | |
| } | |
| _cleanIdentifier(identifier) { | |
| return identifier.replace(/[^0-9A-Za-z_-]/g, ''); | |
| } | |
| _getChunkFilename(chunkNumber, identifier) { | |
| identifier = this._cleanIdentifier(identifier); | |
| let hash = crypto.createHash('sha1').update(identifier).digest('hex'); | |
| return path.resolve(this.tempDir, `./${identifier}-${hash}.${chunkNumber}`); | |
| } | |
| _getDownloadPath(fileName) { | |
| return path.resolve(this.uploadDir, `./${fileName}`); | |
| } | |
| _writeToUploadDir(numberOfChunks, identifier, fileName) { | |
| let hash = crypto.createHash('sha1').update(identifier).digest('hex'); | |
| let writeDir = path.resolve(this.uploadDir, `./${identifier}-${hash}${path.extname(fileName)}`); | |
| let writableStream = fs.createWriteStream(writeDir); | |
| let chunkFileNames = []; | |
| for (let i = 1; i <= numberOfChunks; i++) { | |
| chunkFileNames.push(this._getChunkFilename(i, identifier)); | |
| } | |
| return Promise.each( | |
| chunkFileNames, | |
| chunkFileName => { | |
| return new Promise(resolve => { | |
| let sourceStream = fs.createReadStream(chunkFileName); | |
| sourceStream.pipe(writableStream, { | |
| end: false | |
| }); | |
| sourceStream.on('end', function() { | |
| fs.unlink(chunkFileName); | |
| resolve(); | |
| }); | |
| }); | |
| } | |
| ).then(() => { | |
| writableStream.end(); | |
| return path.basename(writeDir); | |
| }); | |
| } | |
| _cleanUnfinishedChunks() { | |
| let now = new Date().getTime(); | |
| let oneDay = 24 * 60 * 60 * 1000; | |
| fs.readdirAsync(this.tempDir) | |
| .map(fileName => { | |
| let filePath = path.resolve(this.tempDir, `./${fileName}`); | |
| return fs.statAsync(filePath).then(stat => { | |
| return { | |
| filePath: filePath, | |
| stat: stat | |
| }; | |
| }); | |
| }, {concurency: 2}) | |
| .filter(fileStat => { | |
| let modifiedTime = fileStat.stat.ctime.getTime(); | |
| return (now - modifiedTime) >= oneDay; | |
| }) | |
| .each(fileStat => fs.unlinkAsync(fileStat.filePath)); | |
| } | |
| }; |
| // Handle status checks on chunks through Flow.js | |
| router.get('/flow', function(req, res){ | |
| return uploader | |
| .chunkExists(req) | |
| .then( | |
| () => res.status(200).send(), | |
| () => res.status(204).send() | |
| ); | |
| }); | |
| // Handle uploads through Flow.js | |
| router.post('/flow', multipartMiddleware, function(req, res) { | |
| return uploader | |
| .saveChunk(req) | |
| .then( | |
| status => res.status(200).send(status), | |
| err => res.status(400).send(err) | |
| ); | |
| }); |
@ptgamr, can you please provide a full example?
Thanks
Are you sure the Oops, I see this is perfectly valid bluebird functionality ;)_cleanUnfinishedChunks method works? You're calling 'map' on a 'Promise' it seems. I'm assuming you intended to call fs.readdirSync instead of fs.readdirAsync?
and: concurency (line 78) is spelled wrong: should be concurrency
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileSuccess FlowFile_lastProgressCallback: 1467522223840_prevProgress: 1_prevUploadedSize: 11230731averageSpeed: 0bytes: nullchunks: Array[10]currentSpeed: 0error: falsefile: FileflowObj: Flowname: "Jess....: Object ERROR_VERIFY_CHUNK FlowChunk
(index):166 complete
im getting a specific ERROR_VERIFY_CHUNK on the FlowFile_lastProgressCallback call. from the upload dir i see that the files peices are still present and not combined in any way... anything im doing wrong? i used this code on the server, no changes
@LaurensRietveld do is this working for you?
//...
let chunkFileNames = [];
for (let i = 1; i <= numberOfChunks; i++) {
chunkFileNames.push(this._getChunkFilename(i, identifier));
}
return Promise.map(
chunkFileNames,
chunkFileName => fs.statAsync(chunkFileName),
{concurrency: 2}
).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName))
.then(filename => filename, () => 'ERROR_VERIFY_CHUNK');
//...mine is throwing that error
Aaaaaaaaaaaaaah... totally ok, missing uploads dir, fixed and its all working magic
Hey pal,
I'm trying to use your code to upload a file but every time I try, I got an error 400. I've add some console.log and it's appear the "renameAsync" function is never call. Any thoughts ?