console.log('Loading event'); var CLOUDSEARCH_ENDPOINT = < INSERT HERE > var async = require('async'); var jpath = require('json-path') var zlib = require('zlib'); var aws = require('aws-sdk'); var s3 = new aws.S3({ apiVersion: '2006-03-01' }); var csd = new aws.CloudSearchDomain({ endpoint: CLOUDSEARCH_ENDPOINT, apiVersion: '2013-01-01' }); //These mappings use json-path //https://github.com/flitbit/json-path MAPPING = { "aws_region": "#/awsRegion", "error_message": "#/errorMessage", "event_id": "#/eventID", "event_name": "#/eventName", "event_source": "#/eventSource", "event_time": "#/eventTime", "source_ip_address": "#/sourceIPAddress", "user_agent": "#/userAgent", "user_identity_type": "#/userIdentity/type", "user_identity_arn": "#/userIdentity/arn", "user_identity_account_id": "#/userIdentity/accountId", "user_identity_user_name": "#/userIdentity/userName", } //http://docs.aws.amazon.com/cloudsearch/latest/developerguide/preparing-data.html function create_cs_request(id, fields) { request = {}; request['type'] = 'add'; request['id'] = id; request['fields'] = fields return request; } function get_s3_gz_json(bucket, key, cb) { async.waterfall([ //get json.gz function(callback) { s3.getObject({ Bucket: bucket, Key: key }, function(err, data) { console.log("Finished collecting S3 Object"); callback(err, data.Body); }); }, //gunzip the s3 object function(gz_json, callback) { zlib.gunzip(gz_json, function(err, dezipped) { var json_string = dezipped.toString('utf-8'); var json = JSON.parse(json_string); callback(err, json); }); }, //get the records function(json, callback) { records = jpath.resolve(json, "#/Records[*]") console.log("Found the following records", records); callback(null, records); }, ], function(err, result) { cb(err, result) }); } function download_records(records, callback) { async.concat(records, function(item, cb) { fields = {}; for (var prop in MAPPING) { ct_field_name = MAPPING[prop]; ct_field_value = jpath.resolve(item, ct_field_name)[0] //jpath always returns a list! fields[prop] = ct_field_value; } cs_request = create_cs_request(fields["event_id"], fields); console.log("created request", cs_request); cb(null, cs_request); }, function(err, record_requests) { callback(err, record_requests); }); }; function send_record_requests(requests, callback) { console.log("Publishing the following documents", requests); var params = { contentType: 'application/json', documents: JSON.stringify(requests) } csd.uploadDocuments(params, function(err, data) { callback(err); }); }; exports.handler = function(event, context) { console.log('Received event:'); console.log(JSON.stringify(event, null, ' ')); // Get the object from the event and show its content type var bucket = event.Records[0].s3.bucket.name; var key = event.Records[0].s3.object.key; var perform_task = async.compose(send_record_requests, download_records, get_s3_gz_json); perform_task(bucket, key, function(err, result) { if (err) { context.done("Error performing task: " + err); } else { context.done(null, ''); } }); };