Last active
October 30, 2020 12:33
-
-
Save heri16/ae9b3b0e036d891b9ba76f10cc123703 to your computer and use it in GitHub Desktop.
AWS Lambda to filter CloudTrail management logs in S3 (before Athena)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { chain } = require('stream-chain'); | |
const { parser } = require('stream-json'); | |
const { pick } = require('stream-json/filters/Pick'); | |
const { filter } = require('stream-json/filters/Filter'); | |
const { streamValues } = require('stream-json/streamers/StreamValues'); | |
const { disassembler } = require('stream-json/Disassembler'); | |
const { stringer } = require('stream-json/Stringer'); | |
const ST = require('stream-template'); | |
const aws = require('aws-sdk'); | |
const s3 = new aws.S3(); | |
const fs = require('fs'); | |
const zlib = require('zlib'); | |
const defaultFilterOpts = { | |
arrayPath: '^Records', | |
eventSource: 'sts.amazonaws.com', | |
eventName: 'AssumeRoleWithWebIdentity' | |
} | |
const processJSON = ({ fileIn, filterOpts = defaultFilterOpts }, callback) => { | |
const { arrayPath, eventSource, eventName } = filterOpts; | |
// assembling a processing pipe: | |
const recordsOut = chain([ | |
fileIn, | |
// gunzip the potentially huge file using small memory | |
zlib.createGunzip(), | |
// we have to parse JSON | |
parser(), | |
// we'll pick values from the array | |
pick({ filter: new RegExp(`${arrayPath}\\.\\d+`) }), | |
// assemble them to JS values and stream them | |
streamValues({ | |
objectFilter: ({ key, stack, current }) => { | |
if (key === null && stack.length === 0) { | |
// let's filter them according to our criteria | |
if (current.hasOwnProperty('eventSource') && current.eventSource !== eventSource) return false; | |
if (current.hasOwnProperty('eventName')) return current.eventName === eventName; | |
} | |
} | |
}), | |
// disassemble a stream of objects back to tokens | |
disassembler(), | |
// normalize disassembler output | |
pick({ filter: 'value' }), | |
// back to JSON as an array of objects | |
stringer({ makeArray: true }) | |
], {writableObjectMode: false, readableObjectMode: false}); | |
// it is always good keep an eye on errors | |
recordsOut.on('error', callback || console.error); | |
// add back JSON wrapper | |
const jsonOut = ST`{"Records":${recordsOut}}`; | |
// gzip the potentially huge file using small memory | |
const gzip = zlib.Gzip() | |
// If the Readable stream emits an error during processing, the Writable destination is not closed automatically. | |
// If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks. | |
jsonOut.on('error', err => gzip.destroy(err)); | |
// return the gzip stream (with backpressure support) | |
return jsonOut.pipe(gzip); | |
// Pipe the result to stdout so we can see it | |
// jsonOut.pipe(process.stdout); | |
} | |
// Cannot use async as nested callback required | |
exports.handler = function(event, context, callback) { | |
const evt = event.Records[0]; | |
const Bucket = evt.s3.bucket.name; | |
const Key = evt.s3.object.key; | |
const VersionId = evt.s3.object.versionId; | |
const params = { Bucket, Key, VersionId }; | |
s3.headObject(params).promise().then((prevMeta) => { | |
delete prevMeta.LastModified; | |
delete prevMeta.ContentLength; | |
delete prevMeta.ETag; | |
const fileIn = s3.getObject(params).createReadStream(); | |
const stream = processJSON({ fileIn }, callback); | |
const uploadParams = { ...params, ...prevMeta, Key: `assumeRole\${Key}`, Body: stream }; | |
return s3.putObject(uploadParams).promise(); | |
}).then(data => callback(null, data), callback); | |
}; | |
// For local testing only | |
const filename = '080428581103_CloudTrail_us-east-1_20201028T0825Z_kN5ZaldemkDIwXsL.json.gz'; | |
const fileIn = fs.createReadStream(filename); | |
const fileOut = fs.createWriteStream('out-' + filename); | |
processJSON({ fileIn }).pipe(fileOut).on('error', console.error); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "cloudtrail-filter", | |
"version": "1.0.0", | |
"description": "", | |
"main": "filter.js", | |
"scripts": { | |
"start": "node .", | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"author": "", | |
"license": "ISC", | |
"dependencies": { | |
"aws-sdk": "^2.782.0", | |
"stream-json": "^1.7.1", | |
"stream-template": "^0.0.10" | |
} | |
} |
This lambda uses the excellent stream-json library.
JSON.parse()
is not used as it will crash if the size of the log file exceeds the available memory on AWS lambda.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
All management events are delivered every 5 minutes to S3 by CloudTrial (which is free of charge if "set up a trail that delivers a single copy of management events in each region").
Athena scans these log files that may contain management events that are not relevant to us, and grows in size over time.
This Lambda s3 trigger helps to filter out only the management events that needs to be kept long-term, to reduce processing costs by Athena (which charges for the number of bytes scanned).