Last active
January 18, 2024 08:08
-
-
Save iMilnb/27726a5004c0d4dc3dba3de01c65c575 to your computer and use it in GitHub Desktop.
Revisions
-
iMilnb revised this gist
Jun 22, 2017 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,7 @@ // v1.1.2 // // this lambda is the one automatically created by AWS // when creating a CWL to ES stream using the AWS Console. // I just added the `endpoint` variable handling. // var https = require('https'); -
iMilnb created this gist
Jun 21, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,14 @@ ### Rationale This snippet is a sample showing how to implement CloudWatch Logs streaming to ElasticSearch using `terraform`. I wrote this `gist` because I didn't found a clear, end-to-end example on how to achieve this task. In particular, I understood the `resource "aws_lambda_permission" "cloudwatch_allow"` part by reading a couple of bug reports plus this [stackoverflow post](https://stackoverflow.com/questions/38407660/terraform-configuring-cloudwatch-log-subscription-delivery-to-lambda). The `js` file is actually the _Lambda_ function automatically created by _AWS_ when creating this pipeline through the web console. I only added a `endpoint` variable handling so it is configurable from `terraform`. ### Usage Create a `cwl2eslambda.zip` file containing `cwl2es.js` at the root level. Invoke `terraform plan` to check for basic errors, then `terraform apply`. This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,254 @@ // v1.1.2 // // this lambda is the one automatically created by AWS // when creating a CWL to ES stream using the AWS CLI. // I just added the `endpoint` variable handling. // var https = require('https'); var zlib = require('zlib'); var crypto = require('crypto'); const ENV = process.env; var endpoint = ENV.es_endpoint; exports.handler = function(input, context) { // decode input from base64 var zippedInput = new Buffer(input.awslogs.data, 'base64'); // decompress the input zlib.gunzip(zippedInput, function(error, buffer) { if (error) { context.fail(error); return; } // parse the input from JSON var awslogsData = JSON.parse(buffer.toString('utf8')); // transform the input to Elasticsearch documents var elasticsearchBulkData = transform(awslogsData); // skip control messages if (!elasticsearchBulkData) { console.log('Received a control message'); context.succeed('Control message handled successfully'); return; } // post documents to the Amazon Elasticsearch Service post(elasticsearchBulkData, function(error, success, statusCode, failedItems) { console.log('Response: ' + JSON.stringify({ "statusCode": statusCode })); if (error) { console.log('Error: ' + JSON.stringify(error, null, 2)); if (failedItems && failedItems.length > 0) { console.log("Failed Items: " + JSON.stringify(failedItems, null, 2)); } context.fail(JSON.stringify(error)); } else { console.log('Success: ' + JSON.stringify(success)); context.succeed('Success'); } }); }); }; function transform(payload) { if (payload.messageType === 'CONTROL_MESSAGE') { return null; } var bulkRequestBody = ''; payload.logEvents.forEach(function(logEvent) { var timestamp = new Date(1 * logEvent.timestamp); // index name format: cwl-YYYY.MM.DD var indexName = [ 'cwl-' + timestamp.getUTCFullYear(), // year ('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month ('0' + timestamp.getUTCDate()).slice(-2) // day ].join('.'); var source = buildSource(logEvent.message, logEvent.extractedFields); source['@id'] = logEvent.id; source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString(); source['@message'] = logEvent.message; source['@owner'] = payload.owner; source['@log_group'] = payload.logGroup; source['@log_stream'] = payload.logStream; var action = { "index": {} }; action.index._index = indexName; action.index._type = payload.logGroup; action.index._id = logEvent.id; bulkRequestBody += [ JSON.stringify(action), JSON.stringify(source), ].join('\n') + '\n'; }); return bulkRequestBody; } function buildSource(message, extractedFields) { if (extractedFields) { var source = {}; for (var key in extractedFields) { if (extractedFields.hasOwnProperty(key) && extractedFields[key]) { var value = extractedFields[key]; if (isNumeric(value)) { source[key] = 1 * value; continue; } jsonSubString = extractJson(value); if (jsonSubString !== null) { source['$' + key] = JSON.parse(jsonSubString); } source[key] = value; } } return source; } jsonSubString = extractJson(message); if (jsonSubString !== null) { return JSON.parse(jsonSubString); } return {}; } function extractJson(message) { var jsonStart = message.indexOf('{'); if (jsonStart < 0) return null; var jsonSubString = message.substring(jsonStart); return isValidJson(jsonSubString) ? jsonSubString : null; } function isValidJson(message) { try { JSON.parse(message); } catch (e) { return false; } return true; } function isNumeric(n) { return !isNaN(parseFloat(n)) && isFinite(n); } function post(body, callback) { var requestParams = buildRequest(endpoint, body); var request = https.request(requestParams, function(response) { var responseBody = ''; response.on('data', function(chunk) { responseBody += chunk; }); response.on('end', function() { var info = JSON.parse(responseBody); var failedItems; var success; if (response.statusCode >= 200 && response.statusCode < 299) { failedItems = info.items.filter(function(x) { return x.index.status >= 300; }); success = { "attemptedItems": info.items.length, "successfulItems": info.items.length - failedItems.length, "failedItems": failedItems.length }; } var error = response.statusCode !== 200 || info.errors === true ? { "statusCode": response.statusCode, "responseBody": responseBody } : null; callback(error, success, response.statusCode, failedItems); }); }).on('error', function(e) { callback(e); }); request.end(requestParams.body); } function buildRequest(endpoint, body) { var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/); var region = endpointParts[2]; var service = endpointParts[3]; var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, ''); var date = datetime.substr(0, 8); var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date); var kRegion = hmac(kDate, region); var kService = hmac(kRegion, service); var kSigning = hmac(kService, 'aws4_request'); var request = { host: endpoint, method: 'POST', path: '/_bulk', body: body, headers: { 'Content-Type': 'application/json', 'Host': endpoint, 'Content-Length': Buffer.byteLength(body), 'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN, 'X-Amz-Date': datetime } }; var canonicalHeaders = Object.keys(request.headers) .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; }) .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; }) .join('\n'); var signedHeaders = Object.keys(request.headers) .map(function(k) { return k.toLowerCase(); }) .sort() .join(';'); var canonicalString = [ request.method, request.path, '', canonicalHeaders, '', signedHeaders, hash(request.body, 'hex'), ].join('\n'); var credentialString = [ date, region, service, 'aws4_request' ].join('/'); var stringToSign = [ 'AWS4-HMAC-SHA256', datetime, credentialString, hash(canonicalString, 'hex') ] .join('\n'); request.headers.Authorization = [ 'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString, 'SignedHeaders=' + signedHeaders, 'Signature=' + hmac(kSigning, stringToSign, 'hex') ].join(', '); return request; } function hmac(key, str, encoding) { return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding); } function hash(str, encoding) { return crypto.createHash('sha256').update(str, 'utf8').digest(encoding); } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,91 @@ variable "es_endpoint" { type = "string" default = "elasticsearch.endpoint.es.amazonaws.com" } variable "cwl_endpoint" { type = "string" default = "logs.eu-central-1.amazonaws.com" } resource "aws_cloudwatch_log_group" "syslog-loggroup" { name = "syslog" retention_in_days = 14 } resource "aws_iam_role" "lambda_elasticsearch_execution_role" { name = "lambda_elasticsearch_execution_role" assume_role_policy = <<EOF { "Version": "2012-10-17", "Statement": [ { "Action": "sts:AssumeRole", "Principal": { "Service": "lambda.amazonaws.com" }, "Effect": "Allow" } ] } EOF } resource "aws_iam_role_policy" "lambda_elasticsearch_execution_policy" { name = "lambda_elasticsearch_execution_policy" role = "${aws_iam_role.lambda_elasticsearch_execution_role.id}" policy = <<EOF { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:*:*:*" ] }, { "Effect": "Allow", "Action": "es:ESHttpPost", "Resource": "arn:aws:es:*:*:*" } ] } EOF } resource "aws_lambda_function" "cwl_stream_lambda" { filename = "cwl2eslambda.zip" function_name = "LogsToElasticsearch" role = "${aws_iam_role.lambda_elasticsearch_execution_role.arn}" handler = "exports.handler" source_code_hash = "${base64sha256(file("cwl2eslambda.zip"))}" runtime = "nodejs4.3" environment { variables = { es_endpoint = "${var.es_endpoint}" } } } resource "aws_lambda_permission" "cloudwatch_allow" { statement_id = "cloudwatch_allow" action = "lambda:InvokeFunction" function_name = "${aws_lambda_function.cwl_stream_lambda.arn}" principal = "${var.cwl_endpoint}" source_arn = "${aws_cloudwatch_log_group.syslog-loggroup.arn}" } resource "aws_cloudwatch_log_subscription_filter" "cloudwatch_logs_to_es" { depends_on = ["aws_lambda_permission.cloudwatch_allow"] name = "cloudwatch_logs_to_elasticsearch" log_group_name = "${aws_cloudwatch_log_group.syslog-loggroup.name}" filter_pattern = "" destination_arn = "${aws_lambda_function.cwl_stream_lambda.arn}" }