Last active
July 22, 2019 00:52
-
-
Save btmills/5229749 to your computer and use it in GitHub Desktop.
Use a connection pool to rapidly insert a large number of records into a RethinkDB database.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var r = require('rethinkdb'); | |
// How many connections in the pool? | |
// 2547 will fail with "(libuv) Failed to create kqueue (24)" | |
var CONNECTIONS = 2546; | |
// Insert this many rows | |
var COUNT = 10000; | |
// Database to use for the test | |
var DB = 'test' | |
// Table to use for the test | |
var TABLE = 'insertTest'; | |
// Connection info | |
var CONNINFO = { | |
host: 'localhost', | |
port: 28015 | |
}; | |
// Based on http://tomswitzer.net/2011/02/super-simple-javascript-queue/ | |
var Queue = function () { | |
var first = 0, last = -1, q = []; | |
var enqueue = function (val) { | |
q[++last] = val; | |
}; | |
var dequeue = function () { | |
if (first > last) // empty | |
return undefined; | |
var res = q[first]; | |
delete q[first]; | |
first++; | |
return res; | |
}; | |
var length = function () { | |
return last - first + 1; | |
}; | |
return { | |
enqueue: enqueue, | |
dequeue: dequeue, | |
length: length | |
}; | |
}; | |
// Instead of a new connection per request, try pooling connections | |
var Pool = (function () { | |
// Queue of callbacks awaiting a connection | |
var q = new Queue(); | |
// Stack of available connections | |
var pool = []; | |
// Initialize the connections in the pool | |
// Must be called first | |
// callback is function (err) | |
var init = function (size, callback) { | |
if (pool && pool.length < size) { // Add another connection | |
r.connect(CONNINFO, function (err, conn) { | |
if (err) return callback(err); | |
pool.push(conn); | |
init(size, callback); | |
}); | |
} else { // Pool has been filled | |
callback(null); | |
} | |
}; | |
// Get a connection from the pool | |
// callback is function (conn) | |
var get = function (callback) { | |
if (pool.length > 0) { | |
callback(pool.pop()); | |
} else { | |
q.enqueue(callback); | |
} | |
}; | |
// Return a connection to the pool | |
var done = function (conn) { | |
// For purposes of experimentation, assume conn is valid | |
// Would need to check for real-world use | |
// Either give the connection to the next callback in q | |
// or return it to the pool | |
if (q.length() > 0) { | |
process.nextTick(function () { | |
(q.dequeue())(conn); | |
}); | |
} else { | |
pool.push(conn); | |
} | |
}; | |
// Close all connections in the pool | |
var close = function () { | |
while (pool.length > 0) { | |
(pool.pop()).close(); | |
} | |
}; | |
return { | |
init: init, | |
get: get, | |
done: done, | |
close: close | |
}; | |
})(); | |
// Insert a whole bunch | |
// callback is function (err, iteration) | |
var insert = function (count, callback) { | |
console.time('Insert'); | |
for (var i = 1; i <= count; i++) { | |
(function (i) { | |
Pool.get(function (conn) { | |
r.db(DB) | |
.table(TABLE) | |
.insert({ number: i }) | |
.run(conn, function (err, res) { | |
Pool.done(conn); | |
if (err) { | |
console.error('Error inserting #%d', i, err); | |
return callback(err, i); | |
} | |
callback(null, i); | |
}); | |
}); | |
})(i); | |
} | |
} | |
// Insert callback maintains success and fail counts | |
var inserted = (function() { | |
var success = 0; | |
var fail = 0; | |
// callback is function (success count, fail count) | |
return function (err, row, callback) { | |
if (err) { | |
fail++; | |
} else { | |
success++; | |
} | |
console.log('Processed %d of %d rows', success + fail, COUNT); | |
if (success + fail == COUNT) { | |
callback (success, fail); | |
} | |
}; | |
})(); | |
// Everything's done. Let's show some stats. | |
var done = function (success, fail) { | |
console.timeEnd('Insert'); | |
console.log('Attempted %d of %d inserts with %d successes and %d failures.', | |
success + fail, COUNT, success, fail); | |
Pool.close(); | |
} | |
// This is almost function composition | |
Pool.init(CONNECTIONS, function (err) { | |
if (err) { | |
console.error('Could not fill connection pool', err); | |
return Pool.close(); // Some may succeed before failure | |
} | |
insert(COUNT, function (err, row) { | |
// Don't check err here - pass it through | |
inserted(err, row, done); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment