Skip to content

Instantly share code, notes, and snippets.

@iMilnb
Last active January 18, 2024 08:08
Show Gist options
  • Save iMilnb/27726a5004c0d4dc3dba3de01c65c575 to your computer and use it in GitHub Desktop.
Save iMilnb/27726a5004c0d4dc3dba3de01c65c575 to your computer and use it in GitHub Desktop.

Revisions

  1. iMilnb revised this gist Jun 22, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion cwl2es.js
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,7 @@
    // v1.1.2
    //
    // this lambda is the one automatically created by AWS
    // when creating a CWL to ES stream using the AWS CLI.
    // when creating a CWL to ES stream using the AWS Console.
    // I just added the `endpoint` variable handling.
    //
    var https = require('https');
  2. iMilnb created this gist Jun 21, 2017.
    14 changes: 14 additions & 0 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,14 @@
    ### Rationale

    This snippet is a sample showing how to implement CloudWatch Logs streaming to ElasticSearch using `terraform`.
    I wrote this `gist` because I didn't found a clear, end-to-end example on how to achieve this task. In particular,
    I understood the `resource "aws_lambda_permission" "cloudwatch_allow"` part by reading a couple of bug reports plus
    this [stackoverflow post](https://stackoverflow.com/questions/38407660/terraform-configuring-cloudwatch-log-subscription-delivery-to-lambda).

    The `js` file is actually the _Lambda_ function automatically created by _AWS_ when creating this pipeline through the
    web console. I only added a `endpoint` variable handling so it is configurable from `terraform`.

    ### Usage

    Create a `cwl2eslambda.zip` file containing `cwl2es.js` at the root level.
    Invoke `terraform plan` to check for basic errors, then `terraform apply`.
    254 changes: 254 additions & 0 deletions cwl2es.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,254 @@
    // v1.1.2
    //
    // this lambda is the one automatically created by AWS
    // when creating a CWL to ES stream using the AWS CLI.
    // I just added the `endpoint` variable handling.
    //
    var https = require('https');
    var zlib = require('zlib');
    var crypto = require('crypto');

    const ENV = process.env;

    var endpoint = ENV.es_endpoint;

    exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer(input.awslogs.data, 'base64');

    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
    if (error) { context.fail(error); return; }

    // parse the input from JSON
    var awslogsData = JSON.parse(buffer.toString('utf8'));

    // transform the input to Elasticsearch documents
    var elasticsearchBulkData = transform(awslogsData);

    // skip control messages
    if (!elasticsearchBulkData) {
    console.log('Received a control message');
    context.succeed('Control message handled successfully');
    return;
    }

    // post documents to the Amazon Elasticsearch Service
    post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
    console.log('Response: ' + JSON.stringify({
    "statusCode": statusCode
    }));

    if (error) {
    console.log('Error: ' + JSON.stringify(error, null, 2));

    if (failedItems && failedItems.length > 0) {
    console.log("Failed Items: " +
    JSON.stringify(failedItems, null, 2));
    }

    context.fail(JSON.stringify(error));
    } else {
    console.log('Success: ' + JSON.stringify(success));
    context.succeed('Success');
    }
    });
    });
    };

    function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
    return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
    var timestamp = new Date(1 * logEvent.timestamp);

    // index name format: cwl-YYYY.MM.DD
    var indexName = [
    'cwl-' + timestamp.getUTCFullYear(), // year
    ('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
    ('0' + timestamp.getUTCDate()).slice(-2) // day
    ].join('.');

    var source = buildSource(logEvent.message, logEvent.extractedFields);
    source['@id'] = logEvent.id;
    source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
    source['@message'] = logEvent.message;
    source['@owner'] = payload.owner;
    source['@log_group'] = payload.logGroup;
    source['@log_stream'] = payload.logStream;

    var action = { "index": {} };
    action.index._index = indexName;
    action.index._type = payload.logGroup;
    action.index._id = logEvent.id;

    bulkRequestBody += [
    JSON.stringify(action),
    JSON.stringify(source),
    ].join('\n') + '\n';
    });
    return bulkRequestBody;
    }

    function buildSource(message, extractedFields) {
    if (extractedFields) {
    var source = {};

    for (var key in extractedFields) {
    if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
    var value = extractedFields[key];

    if (isNumeric(value)) {
    source[key] = 1 * value;
    continue;
    }

    jsonSubString = extractJson(value);
    if (jsonSubString !== null) {
    source['$' + key] = JSON.parse(jsonSubString);
    }

    source[key] = value;
    }
    }
    return source;
    }

    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
    return JSON.parse(jsonSubString);
    }

    return {};
    }

    function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
    }

    function isValidJson(message) {
    try {
    JSON.parse(message);
    } catch (e) { return false; }
    return true;
    }

    function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
    }

    function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);

    var request = https.request(requestParams, function(response) {
    var responseBody = '';
    response.on('data', function(chunk) {
    responseBody += chunk;
    });
    response.on('end', function() {
    var info = JSON.parse(responseBody);
    var failedItems;
    var success;

    if (response.statusCode >= 200 && response.statusCode < 299) {
    failedItems = info.items.filter(function(x) {
    return x.index.status >= 300;
    });

    success = {
    "attemptedItems": info.items.length,
    "successfulItems": info.items.length - failedItems.length,
    "failedItems": failedItems.length
    };
    }

    var error = response.statusCode !== 200 || info.errors === true ? {
    "statusCode": response.statusCode,
    "responseBody": responseBody
    } : null;

    callback(error, success, response.statusCode, failedItems);
    });
    }).on('error', function(e) {
    callback(e);
    });
    request.end(requestParams.body);
    }

    function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');

    var request = {
    host: endpoint,
    method: 'POST',
    path: '/_bulk',
    body: body,
    headers: {
    'Content-Type': 'application/json',
    'Host': endpoint,
    'Content-Length': Buffer.byteLength(body),
    'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
    'X-Amz-Date': datetime
    }
    };

    var canonicalHeaders = Object.keys(request.headers)
    .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 :
    1; })
    .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
    .join('\n');

    var signedHeaders = Object.keys(request.headers)
    .map(function(k) { return k.toLowerCase(); })
    .sort()
    .join(';');

    var canonicalString = [
    request.method,
    request.path, '',
    canonicalHeaders, '',
    signedHeaders,
    hash(request.body, 'hex'),
    ].join('\n');

    var credentialString = [ date, region, service, 'aws4_request' ].join('/');

    var stringToSign = [
    'AWS4-HMAC-SHA256',
    datetime,
    credentialString,
    hash(canonicalString, 'hex')
    ] .join('\n');

    request.headers.Authorization = [
    'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
    'SignedHeaders=' + signedHeaders,
    'Signature=' + hmac(kSigning, stringToSign, 'hex')
    ].join(', ');

    return request;
    }

    function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str,
    'utf8').digest(encoding);
    }

    function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
    }
    91 changes: 91 additions & 0 deletions main.tf
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,91 @@
    variable "es_endpoint" {
    type = "string"
    default = "elasticsearch.endpoint.es.amazonaws.com"
    }

    variable "cwl_endpoint" {
    type = "string"
    default = "logs.eu-central-1.amazonaws.com"
    }

    resource "aws_cloudwatch_log_group" "syslog-loggroup" {
    name = "syslog"
    retention_in_days = 14
    }

    resource "aws_iam_role" "lambda_elasticsearch_execution_role" {
    name = "lambda_elasticsearch_execution_role"
    assume_role_policy = <<EOF
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Action": "sts:AssumeRole",
    "Principal": {
    "Service": "lambda.amazonaws.com"
    },
    "Effect": "Allow"
    }
    ]
    }
    EOF
    }

    resource "aws_iam_role_policy" "lambda_elasticsearch_execution_policy" {
    name = "lambda_elasticsearch_execution_policy"
    role = "${aws_iam_role.lambda_elasticsearch_execution_role.id}"
    policy = <<EOF
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Action": [
    "logs:CreateLogGroup",
    "logs:CreateLogStream",
    "logs:PutLogEvents"
    ],
    "Resource": [
    "arn:aws:logs:*:*:*"
    ]
    },
    {
    "Effect": "Allow",
    "Action": "es:ESHttpPost",
    "Resource": "arn:aws:es:*:*:*"
    }
    ]
    }
    EOF
    }

    resource "aws_lambda_function" "cwl_stream_lambda" {
    filename = "cwl2eslambda.zip"
    function_name = "LogsToElasticsearch"
    role = "${aws_iam_role.lambda_elasticsearch_execution_role.arn}"
    handler = "exports.handler"
    source_code_hash = "${base64sha256(file("cwl2eslambda.zip"))}"
    runtime = "nodejs4.3"

    environment {
    variables = {
    es_endpoint = "${var.es_endpoint}"
    }
    }
    }

    resource "aws_lambda_permission" "cloudwatch_allow" {
    statement_id = "cloudwatch_allow"
    action = "lambda:InvokeFunction"
    function_name = "${aws_lambda_function.cwl_stream_lambda.arn}"
    principal = "${var.cwl_endpoint}"
    source_arn = "${aws_cloudwatch_log_group.syslog-loggroup.arn}"
    }

    resource "aws_cloudwatch_log_subscription_filter" "cloudwatch_logs_to_es" {
    depends_on = ["aws_lambda_permission.cloudwatch_allow"]
    name = "cloudwatch_logs_to_elasticsearch"
    log_group_name = "${aws_cloudwatch_log_group.syslog-loggroup.name}"
    filter_pattern = ""
    destination_arn = "${aws_lambda_function.cwl_stream_lambda.arn}"
    }