Skip to content

Instantly share code, notes, and snippets.

@johnlpage
Created June 2, 2026 10:23
Show Gist options
  • Select an option

  • Save johnlpage/591a25468592235e5c5a7aa4950d1802 to your computer and use it in GitHub Desktop.

Select an option

Save johnlpage/591a25468592235e5c5a7aa4950d1802 to your computer and use it in GitHub Desktop.
db = db.getSiblingDB("payments")
db.createCollection("card", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["cardId", "balance", "limit", "recentTx"],
additionalProperties: false,
properties: {
_id: { bsonType: "objectId" },
cardId: { bsonType: "string" }, // Can go in _id
balance: { bsonType: "decimal" },
error: { bsonType: "string", maxLength: 0 },
suspended: { bsonType: "bool" },
limit: { bsonType: "decimal" },
expiryDate: { bsonType: "date" },
"recentTx": {
bsonType: "array",
items: {
bsonType: "date"
}
}
}
}
}
});
db.card.createIndex(
{ cardId: 1 },
{ unique: true }
);
db.createCollection("ledger", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["cardId", "value", "vendorId", "txnDate", "paymentId"],
additionalProperties: false,
properties: {
_id: { bsonType: "objectId" },
paymentId: { bsonType: "binData" }, //Can go in _id
error: { bsonType: "string", maxLength: 0 },
cardId: { bsonType: "string" },
value: { bsonType: "decimal" },
vendorId: { bsonType: "string" },
txnDate: { bsonType: "date" }
}
}
}
});
db.ledger.createIndex(
{ paymentId: 1 },
{ unique: true }
);
db.createCollection("vendor", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["vendorId", "recentTxn", "maxTxnPerHour"],
additionalProperties: false,
properties: {
_id: { bsonType: "objectId" },
vendorId: { bsonType: "string" }, // Can go in _id
error: { bsonType: "string", maxLength: 0 },
recentTxn: {
bsonType: "array",
items: {
bsonType: "date"
}
},
maxTxnPerHour: { bsonType: "int" }
}
}
}
});
db.vendor.createIndex(
{ vendorId: 1 },
{ unique: true }
);
// Add a Card and Vendor for testing uf they arent there already
// - this is outside the transaction for simplicity but could be inside if we wanted to test the rollback of these as well
db.card.replaceOne({ cardId: "1234-5678-9101-1123" },
{
cardId: "1234-5678-9101-1123", balance: new Decimal128("0"), limit: new Decimal128("5000"),
recentTx: [], expiryDate: new Date(Date.now() + 1000 * 60 * 60 * 24)
}, { upsert: true })
db.card.replaceOne({ cardId: "1234-5678-9101-1124" },
{
cardId: "1234-5678-9101-1124", balance: new Decimal128("0"), limit: new Decimal128("5000"),
recentTx: [], suspended: true, expiryDate: new Date(Date.now() + 1000 * 60 * 60 * 24)
}, { upsert: true })
db.vendor.replaceOne({ vendorId: "91121021" },
{ vendorId: "91121021", recentTxn: [], maxTxnPerHour: 10 }, { upsert: true })
const dupPaymentId = new UUID();
const payments = [{
cardId: "1234-5678-9101-1123",
value: new Decimal128("1337"),
vendorId: "91121021",
txnDate: new Date(),
paymentId: dupPaymentId
}, {
cardId: "1234-5678-9101-1123",
value: new Decimal128("1337"),
vendorId: "91121021",
txnDate: new Date(),
paymentId: dupPaymentId //Duplicate paymentId to trigger error
},
{
cardId: "1234-5678-9101-1123",
value: new Decimal128("20000"), // Exceeds limit to trigger error
vendorId: "91121021",
txnDate: new Date(),
paymentId: new UUID()
}, {
cardId: "1234-5678-9101-1124",
value: new Decimal128("100"), // Card is suspended
vendorId: "91121021",
txnDate: new Date(),
paymentId: new UUID()
},
{
cardId: "1234-5678-9101-1123",
value: new Decimal128("1337"),
vendorId: "91121121",
txnDate: new Date(),
paymentId: new UUID()
},
]
const session = db.getMongo().startSession({
readPreference:
{ mode: "primary" }
});
const transactionOptions = {
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" }
};
for (let payment of payments) {
try {
session.withTransaction( () => { txnFunction(payment, session); },
transactionOptions);
print("Payment processed.");
} catch (e) {
print("Payment failed: " + (e.message || e));
}
}
session.endSession();
function txnFunction(payment, session) {
const adminDb = session.getDatabase("admin");
const nsInfo = [{ ns: "payments.card" }, { ns: "payments.ledger" }, { ns: "payments.vendor" }]
const [CARD, LEDGER, VENDOR] = [0, 1, 2]; //Offsets in nsInfo
const ordered = true; //False will run in parallel but wont stop on first error
// Record in the Ledger - this is a transaction so will be roll back on fail
const insertLedgerEntry = {
insert: LEDGER,
document: payment
}
// Update the Card, but, if the balance + new txn value exceeds the limit
// , we want to fail the transaction and roll back the ledger entry.
// We can do that by setting an error message field which is not in the schema
// to trigger a validation error - we also want to do this if the card is
// suspended or expired, or if we have too many transactions in the last
// hour for rate limiting purposes. We also want to add the transaction
// date to an array of recent transactions for rate limiting purposes,
// and remove any transactions from that array that are more than an hour old.
const updateCardBalance = {
update: CARD,
filter: { cardId: payment.cardId },
upsert: true, // This will force an error vs a NO-OP
updateMods: [
{ $set: { error: "$$REMOVE" } }, // Clear any existing error
// If this is an upsert it will be missing expiryDate so $replaceWith a skeleton
{ $replaceWith: { $cond: [{ $not: "$expiryDate" }, { cardId: payment.cardId, balance: 0, limit: new NumberLong("0"), recentTx: [] }, "$$ROOT"] } },
// Set an error if the card is suspended
{ $set: { error: { $cond: ["$suspended", "card suspended", "$error"] } } },
// If it expired
{ $set: { error: { $cond: [{ $lt: ["$expiryDate", payment.txnDate] }, "card expired", "$error"] } } },
//Set the new balance
{ $set: { balance: { $add: ["$balance", payment.value] } } },
// Set error if we go over the limit
{ $set: { error: { $cond: [{ $gt: [{ $add: ["$balance", payment.value] }, "$limit"] }, "exceeds credit limit", "$error"] } } },
// Add this txn to recentTx for rate limiting purposes
{ $set: { recentTx: { $concatArrays: ["$recentTx", [payment.txnDate]] } } },
// Remove all transactions > 1 hour old from recentTx
{ $set: { recentTx: { $filter: { input: "$recentTx", as: "tx", cond: { $gt: ["$$tx", { $subtract: [payment.txnDate, 1000 * 60 * 60] }] } } } } },
// Raise an error if we have more than 5 transactions in the last hour for rate limiting purposes
{ $set: { error: { $cond: [{ $gt: [{ $size: { $ifNull: ["$recentTx", []] } }, 5] }, "too many recent transactions on card", "$error"] } } },
// If the card didnt exist we end up here as part of upsert but with no expiryDate field
// - so we can use that to trigger a "card not found" error
],
}
const verifyVendor = {
update: VENDOR,
filter: { vendorId: payment.vendorId },
upsert: true, // This will force an error vs a NO-OP
updateMods: [
{ $set: { error: "$$REMOVE" } }, // Clear any existing error
{ $replaceWith: { $cond: [{ $not: "$_id" }, { vendorId: payment.vendorId, maxTxnPerHour: 0, recentTxn: [] }, "$$ROOT"] } },
// Set an error if we have too many transactions in the last hour for rate limiting purposes. We also want to add the transaction date to an array of recent transactions for rate limiting purposes, and remove any transactions from that array that are more than an hour old.
{ $set: { recentTxn: { $concatArrays: ["$recentTxn", [payment.txnDate]] } } },
{ $set: { recentTxn: { $filter: { input: "$recentTxn", as: "tx", cond: { $gt: ["$$tx", { $subtract: [payment.txnDate, 1000 * 60 * 60] }] } } } } },
{ $set: { error: { $cond: [{ $gt: [{ $size: { $ifNull: ["$recentTxn", []] } }, "$maxTxnPerHour"] }, "too many recent transactions for vendor", "$error"] } } },
{ $set: { error: { $cond: [{ $not: "$_id" }, "vendor not found", "$error"] } } },
],
}
const ops = [insertLedgerEntry, updateCardBalance, verifyVendor]
// using runCommand (needed as mongosh doesnt let you pass a session
// to bulkWrite like drivers do - means we need to throw on any error
// ourselves to avoid auto retry.
let res = adminDb.runCommand({ bulkWrite: 1, nsInfo, ordered, ops });
if (res.ok !== 1) {
throw new Error(`bulkWrite command failed: ${tojson(res)}`);
}
if ((res.nErrors ?? 0) > 0) {
throw new Error(`bulkWrite write error: ${extractErrorConsideredValues(res)}`);
}
return res;
}
function extractErrorConsideredValues(mongoResponse) {
const extractedValues = [];
const firstBatch = mongoResponse?.cursor?.firstBatch || {};
// Iterate through each item in the firstBatch object
for (const item of Object.values(firstBatch)) {
if (item.errmsg) {
extractedValues.push(item.errmsg);
}
const schemaRules = item?.errInfo?.details?.schemaRulesNotSatisfied || {};
for (const rule of Object.values(schemaRules)) {
const properties = rule?.propertiesNotSatisfied || {};
for (const prop of Object.values(properties)) {
// Check if the property name is 'error'
if (prop?.propertyName === 'error' && prop?.details) {
for (const detail of Object.values(prop.details)) {
if (detail?.consideredValue !== undefined) {
extractedValues.push(detail.consideredValue);
}
}
}
}
}
}
return extractedValues;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment