AWS lamdba’s are a really cool way to remove the need for specific hardware when running things like scheduled operations. The challenge we had was to find the latest 2 files in a large bucket which matched specific key prefixes. This is easy enough on smaller buckets as the listObjectsV2 call is limited to return 1000 items. What to do if you need to scan more?
The following example shows how you can achieve this. You need to fill in a couple parts:
- the bucket name
1this._bucket = "TODO:bucket-name"; - the filename / folder prefix
1"folder/filename_prefix" - the file suffixes
1this._priceSets = ["Full", "Delta"];
What’s really neat with Lambda’s is you can pass in parameters from the test event e.g.:
1 2 3 4 5 6 |
{ "accountAlias": "accountName", "environment": "qa", "region": "eu-west-1", "snsArn": "arn:aws:sns:eu-west-1:###:Channel name" } |
When this runs it will fire off SNS alerts if it finds the files to be out of date.
The key bit is the recursive calls in GetLatestFiles which finally triggers the callback from the parent function (ie the promise in GetLatestFileForType).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
"use strict"; const AWS = require("aws-sdk"); var Demo; (function (Demo) { class LambdaFunctionDemo { constructor() { this._priceSets = ["Full", "Delta"]; this._items = {}; this._s3 = new AWS.S3(); this._sns = new AWS.SNS(); } Run(event, finishCallback) { this._environment = event.environment; this._bucket = "TODO:bucket-name"; this._snsArn = event.snsArn; var promises = []; for (var priceSet of this._priceSets) { promises.push(this.InspectRoutePriceDelta(priceSet)); } Promise.all(promises) .then((results) => { console.log(results); finishCallback(null, results); }); } InspectRoutePriceDelta(priceSet) { this._items[priceSet] = []; return new Promise((resolve, reject) => { this.GetLatestFileForType(priceSet) .then((priceFile) => { if (this.CheckDataStatus(priceFile.LastModified, priceSet, priceFile.Key)) { this.FireFileStaleAlert(priceFile.Key, "Delta file"); resolve(`Data Stale: ${priceSet}.`); } else { resolve(`Data Fresh: ${priceSet}.`); } }) .catch(() => { this.FireFileMissingAlert(priceSet, "Delta file"); resolve(`Data Not Found: ${priceSet}`); }); }); } GetLatestFileForType(priceSet) { console.log("GetLatestFileForType: " + priceSet); return new Promise((resolve, reject) => { this.GetLatestFiles("folder/filename_prefix" + priceSet, null, priceSet, resolve); }); } GetLatestFiles(prefix, continuationToken, priceSet, callback) { console.log("GetLatestFiles: " + this._items[priceSet].length + " " + priceSet); return new Promise((resolve, reject) => { this.GetLatestFile(prefix, continuationToken) .then((data) => { data.Contents.forEach((a) => this._items[priceSet].push(a)); if (data.IsTruncated) { this.GetLatestFiles(prefix, data.NextContinuationToken, priceSet, callback) .then((recursiveData) => { }); } else { this._items[priceSet].sort((a, b) => (b.LastModified).getTime() - (a.LastModified).getTime()); console.log("there are: " + this._items[priceSet].length + " items for: " + priceSet); callback(this._items[priceSet][0]); } }); }); } GetLatestFile(prefix, continuationToken) { console.log(`Scanning folder for file: ${prefix} with token: ${continuationToken}`); var params = { Bucket: this._bucket, Prefix: prefix, ContinuationToken: continuationToken }; return this._s3.listObjectsV2(params).promise(); } CheckDataStatus(fileTimeStamp, dataType, key) { console.log("Validating " + dataType + " :" + key); var currentTimeStamp = new Date(); var timeDifferenceSeconds = (currentTimeStamp.getTime() - fileTimeStamp.getTime()) / 1000; console.log("TimeDifferenceSeconds = " + timeDifferenceSeconds); if (dataType == "Full") { return timeDifferenceSeconds > (25 * 60 * 60); } else { return timeDifferenceSeconds > (10 * 60); } } FireFileStaleAlert(fileKey, dataType) { console.error(fileKey + " on " + this._environment + " is stale in " + this._bucket); var snsParams = { Message: "ERROR: " + fileKey + " on " + this._environment + " is stale in " + this._bucket, Subject: this._environment + " " + dataType + " Data Stale", TopicArn: this._snsArn }; this.FireAlert(snsParams); } FireFileMissingAlert(fileKey, dataType) { console.error(fileKey + " on " + this._environment + " is missing in " + this._bucket); var snsParams = { Message: "ERROR: " + fileKey + " on " + this._environment + " is missing in " + this._bucket, Subject: this._environment + " " + dataType + " Data Missing", TopicArn: this._snsArn }; this.FireAlert(snsParams); } FireAlert(snsParams) { this._sns.publish(snsParams, (error, data) => { if (error) { console.error("Failed to send SNS message: " + error, error.stack); } else { console.log("Successfully sent SNS message"); } }); } } Demo.LambdaFunctionDemo = LambdaFunctionDemo; })(Demo || (Demo = {})); exports.handler = (event, context, finishCallback) => { var processor = new Demo.LambdaFunctionDemo(); processor.Run(event, finishCallback); }; |