Created
May 27, 2020 10:09
-
-
Save vlasky/f6317edf979ec6fa9b580e9bc82f6de1 to your computer and use it in GitHub Desktop.
Meteor demo code for running jobs synchronized across multiple processes. It is based on percolate:synced-cron, but modified to use MySQL. Files must be placed in the Meteor server folder and all required atmosphere & NPM dependencies installed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A package for running jobs synchronized across multiple processes | |
// Based on percolate:synced-cron, but modified to use MySQL | |
// https://github.com/percolatestudio/meteor-synced-cron | |
/*You must create a cron_history table in your MySQL database with the following schema: | |
CREATE TABLE `cron_history` ( | |
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, | |
`intendedAt` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
`name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, | |
`startedAt` timestamp NULL DEFAULT NULL, | |
`finishedAt` timestamp NULL DEFAULT NULL, | |
`result` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, | |
`error` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, | |
PRIMARY KEY (`id`), | |
UNIQUE KEY `intendedAtname` (`intendedAt`,`name`), | |
KEY `startedAt` (`startedAt`) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; | |
*/ | |
import { check } from 'meteor/check'; | |
import _ from 'lodash'; | |
import Later from 'later'; | |
SyncedCron = { | |
_entries: {}, | |
running: false, | |
options: { | |
log: true,//Log job run details to console | |
logger: null, | |
utc: false,//Default to using localTime | |
//TTL in seconds for history records in table to expire | |
//Not supported in MySQL. You will have to implement you | |
//own mechanism to clean expired records from the history table | |
//collectionTTL: 172800 | |
}, | |
config: function(opts) { | |
this.options = _.extend({}, this.options, opts); | |
} | |
} | |
/* | |
Logger factory function. Takes a prefix string and options object | |
and uses an injected `logger` if provided, else falls back to | |
Meteor's `Log` package. | |
Will send a log object to the injected logger, on the following form: | |
message: String | |
level: String (info, warn, error, debug) | |
tag: 'SyncedCron' | |
*/ | |
function createLogger(prefix) { | |
check(prefix, String); | |
// Return noop if logging is disabled. | |
if(SyncedCron.options.log === false) { | |
return function() {}; | |
} | |
return function(level, message) { | |
check(level, Match.OneOf('info', 'error', 'warn', 'debug')); | |
check(message, String); | |
var logger = SyncedCron.options && SyncedCron.options.logger; | |
if(logger && _.isFunction(logger)) { | |
logger({ | |
level: level, | |
message: message, | |
tag: prefix | |
}); | |
} else { | |
Log[level]({ message: prefix + ': ' + message }); | |
} | |
} | |
} | |
var log; | |
Meteor.startup(function() { | |
var options = SyncedCron.options; | |
log = createLogger('SyncedCron'); | |
['info', 'warn', 'error', 'debug'].forEach(function(level) { | |
log[level] = _.partial(log, level); | |
}); | |
// Don't allow TTL less than 5 minutes so we don't break synchronization | |
var minTTL = 300; | |
// Use UTC or localtime for evaluating schedules | |
if (options.utc) | |
Later.date.UTC(); | |
else | |
Later.date.localTime(); | |
}); | |
var scheduleEntry = function(entry) { | |
var schedule = entry.schedule(Later.parse); | |
entry._timer = | |
SyncedCron._laterSetInterval(SyncedCron._entryWrapper(entry), schedule); | |
log.info('Scheduled "' + entry.name + '" next run @' | |
+ Later.schedule(schedule).next(1)); | |
} | |
// add a scheduled job | |
// SyncedCron.add({ | |
// name: String, // *required* unique name of the job | |
// schedule: function(laterParser) {},// *required* when to run the job | |
// job: function() {}, // *required* the code to run | |
// }); | |
SyncedCron.add = function(entry) { | |
check(entry.name, String); | |
check(entry.schedule, Function); | |
check(entry.job, Function); | |
check(entry.persist, Match.Optional(Boolean)); | |
if (entry.persist === undefined) { | |
entry.persist = true; | |
} | |
// check | |
if (!this._entries[entry.name]) { | |
this._entries[entry.name] = entry; | |
// If cron is already running, start directly. | |
if (this.running) { | |
scheduleEntry(entry); | |
} | |
} | |
} | |
// Start processing added jobs | |
SyncedCron.start = function() { | |
var self = this; | |
Meteor.startup(function() { | |
// Schedule each job with later.js | |
_.each(self._entries, function(entry) { | |
scheduleEntry(entry); | |
}); | |
self.running = true; | |
}); | |
} | |
// Return the next scheduled date of the first matching entry or undefined | |
SyncedCron.nextScheduledAtDate = function(jobName) { | |
var entry = this._entries[jobName]; | |
if (entry) | |
return Later.schedule(entry.schedule(Later.parse)).next(1); | |
} | |
// Remove and stop the entry referenced by jobName | |
SyncedCron.remove = function(jobName) { | |
var entry = this._entries[jobName]; | |
if (entry) { | |
if (entry._timer) | |
entry._timer.clear(); | |
delete this._entries[jobName]; | |
log.info('Removed "' + entry.name + '"'); | |
} | |
} | |
// Pause processing, but do not remove jobs so that the start method will | |
// restart existing jobs | |
SyncedCron.pause = function() { | |
if (this.running) { | |
_.each(this._entries, function(entry) { | |
entry._timer.clear(); | |
}); | |
this.running = false; | |
} | |
} | |
// Stop processing and remove ALL jobs | |
SyncedCron.stop = function() { | |
_.each(this._entries, function(entry, name) { | |
SyncedCron.remove(name); | |
}); | |
this.running = false; | |
} | |
// The meat of our logic. Checks if the specified has already run. If not, | |
// records that it's running the job, runs it, and records the output | |
SyncedCron._entryWrapper = function(entry) { | |
var self = this; | |
return function(intendedAt) { | |
intendedAt = new Date(intendedAt.getTime()); | |
intendedAt.setMilliseconds(0); | |
var jobHistory; | |
if (entry.persist) { | |
jobHistory = { | |
intendedAt: intendedAt, | |
name: entry.name, | |
startedAt: new Date() | |
}; | |
// If we have a dup key error, another instance has already tried to run | |
// this job. | |
try | |
{ | |
result = qLiveDb( "INSERT INTO cron_history (intendedAt, name, startedAt) VALUES (?,?,?)", | |
[jobHistory.intendedAt.toISOString().slice(0, 19).replace('T', ' '), jobHistory.name, jobHistory.startedAt.toISOString().slice(0, 19).replace('T', ' ')]); | |
jobHistory._id = result.insertId; | |
} | |
catch(e) | |
{ | |
if (e.code == "ER_DUP_ENTRY") | |
{ | |
console.log("Cant insert into cron_history due to duplicate entry: "+entry.name +" intendedAt: "+jobHistory.intendedAt); | |
//Reset the intendedAt time to be ten seconds from now, as we may have failed the query above, and will get into a deadlock condition. | |
intendedAt = new Date().getTime()+10000; | |
return; | |
} | |
else | |
{ | |
console.log("Exception in SyncedCron: "+JSON.stringify({"name":entry.name, "e: ":e.message})); | |
return; | |
//throw; | |
} | |
}; | |
} | |
// run and record the job | |
try | |
{ | |
log.info('Starting "' + entry.name + '".'); | |
var output = entry.job(intendedAt,entry.name); // <- Run the actual job | |
output = (output == null) ? "" : output; | |
log.info('Finished "' + entry.name + '".'); | |
if(entry.persist) { | |
qLiveDb( "UPDATE cron_history SET finishedAt = ?, result = ? WHERE id = ?", | |
[new Date().toISOString().slice(0, 19).replace('T', ' '), output, jobHistory._id] | |
); | |
} | |
} | |
catch(e) | |
{ | |
log.info('Exception "' + entry.name +'" ' + e.stack); | |
if (entry.persist) { | |
qLiveDb( "UPDATE cron_history SET finishedAt = ?, error = ? WHERE id = ?", | |
[new Date().toISOString().slice(0, 19).replace('T', ' '), e.stack, jobHistory._id] | |
); | |
} | |
} | |
}; | |
} | |
// for tests | |
SyncedCron._reset = function() { | |
this._entries = {}; | |
qLiveDb("TRUNCATE cron_history"); | |
this.running = false; | |
} | |
// --------------------------------------------------------------------------- | |
// The following two functions are lifted from the later.js package, however | |
// I've made the following changes: | |
// - Use Meteor.setTimeout and Meteor.clearTimeout | |
// - Added an 'intendedAt' parameter to the callback fn that specifies the precise | |
// time the callback function *should* be run (so we can co-ordinate jobs) | |
// between multiple, potentially laggy and unsynced machines | |
// From: https://github.com/bunkat/later/blob/master/src/core/setinterval.js | |
SyncedCron._laterSetInterval = function(fn, sched) { | |
var t = SyncedCron._laterSetTimeout(scheduleTimeout, sched), | |
done = false; | |
/** | |
* Executes the specified function and then sets the timeout for the next | |
* interval. | |
*/ | |
function scheduleTimeout(intendedAt) { | |
if(!done) { | |
fn(intendedAt); | |
t = SyncedCron._laterSetTimeout(scheduleTimeout, sched); | |
} | |
} | |
return { | |
/** | |
* Clears the timeout. | |
*/ | |
clear: function() { | |
done = true; | |
t.clear(); | |
} | |
}; | |
}; | |
// From: https://github.com/bunkat/later/blob/master/src/core/settimeout.js | |
SyncedCron._laterSetTimeout = function(fn, sched) { | |
var s = Later.schedule(sched), t; | |
scheduleTimeout(); | |
/** | |
* Schedules the timeout to occur. If the next occurrence is greater than the | |
* max supported delay (2147483647 ms) than we delay for that amount before | |
* attempting to schedule the timeout again. | |
*/ | |
function scheduleTimeout() { | |
var now = Date.now(), | |
next = s.next(2, now); | |
// don't schedlue another occurence if no more exist synced-cron#41 | |
if (! next[0]) | |
return; | |
var diff = next[0].getTime() - now, | |
intendedAt = next[0]; | |
// minimum time to fire is one second, use next occurrence instead | |
if(diff < 1000) { | |
diff = next[1].getTime() - now; | |
intendedAt = next[1]; | |
} | |
if(diff < 2147483647) { | |
t = Meteor.setTimeout(function() { fn(intendedAt); }, diff); | |
} | |
else { | |
t = Meteor.setTimeout(scheduleTimeout, 2147483647); | |
} | |
} | |
return { | |
/** | |
* Clears the timeout. | |
*/ | |
clear: function() { | |
Meteor.clearTimeout(t); | |
} | |
}; | |
}; | |
// --------------------------------------------------------------------------- |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql from 'mysql2/promise'; | |
const pool = Promise.await(mysql.createPool({ | |
host: 'localhost', | |
user: 'root', | |
database: 'test', | |
waitForConnections: true, | |
connectionLimit: 10, | |
queueLimit: 0 | |
})); | |
qLiveDb = (query, args) => { const [rows, fields] = Promise.await( pool.execute(query, args)); return rows; } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Meteor } from 'meteor/meteor'; | |
SyncedCron.config({log: false}); | |
Meteor.startup(() => { | |
//Example SyncedCron job that runs at 7am every day | |
SyncedCron.add({ | |
name: 'wakeupAlarmClock', | |
schedule: function(parser) { | |
return parser.text('at 7:00am'); | |
}, | |
job: function() { | |
console.log("Time to wake up!"); | |
return true; | |
} | |
}); | |
console.log("Starting SyncedCron"); | |
SyncedCron.start(); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment