Skip to content

Instantly share code, notes, and snippets.

@ImDevinC
Created May 11, 2023 06:49
Show Gist options
  • Select an option

  • Save ImDevinC/69a2c5e952d0aac7e8b0b1179723414c to your computer and use it in GitHub Desktop.

Select an option

Save ImDevinC/69a2c5e952d0aac7e8b0b1179723414c to your computer and use it in GitHub Desktop.

Revisions

  1. ImDevinC created this gist May 11, 2023.
    76 changes: 76 additions & 0 deletions deploy.tf
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,76 @@
    locals {
    consumer_package_name = "message-consumer.zip"
    consumer_package_path = "${path.module}/dist/${local.consumer_package_name}"
    }

    resource "aws_lambda_function" "message_consumer" {
    s3_bucket = aws_s3_bucket_object.consumer_s3_source.bucket
    s3_key = aws_s3_bucket_object.consumer_s3_source.id
    s3_object_version = aws_s3_bucket_object.consumer_s3_source.version_id
    function_name = "apm-message-consumer"
    role = aws_iam_role.consumer.arn
    handler = "src/simple.handler"
    runtime = "nodejs16.x"
    source_code_hash = filesha256(local.consumer_package_path)
    timeout = 30
    environment {
    variables = {
    NODE_OPTIONS = "--require src/lambda-wrapper"
    SQS_URL = aws_sqs_queue.publisher.url
    }
    }
    vpc_config {
    subnet_ids = local.subnet_ids
    security_group_ids = local.security_group_ids
    }
    }

    resource "aws_iam_role" "consumer" {
    name = "apm-message-consumer-role"
    assume_role_policy = data.aws_iam_policy_document.assume_role.json
    }

    resource "aws_iam_policy" "consumer" {
    name = "apm-message-consumer-policy"
    policy = data.aws_iam_policy_document.consumer_policy.json
    }

    resource "aws_iam_role_policy_attachment" "consumer_attach" {
    role = aws_iam_role.consumer.name
    policy_arn = aws_iam_policy.consumer.arn
    }

    resource "aws_iam_role_policy_attachment" "consumer_AWSLambdaBasicExecutionRole" {
    role = aws_iam_role.consumer.name
    policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
    }

    resource "aws_iam_role_policy_attachment" "consumer_AWSLambdaVPCAccessExecutionRole" {
    role = aws_iam_role.consumer.name
    policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
    }

    data "aws_iam_policy_document" "consumer_policy" {
    statement {
    effect = "Allow"
    actions = [
    "sqs:ReceiveMessage",
    "sqs:DeleteMessage",
    "sqs:GetQueueAttributes"
    ]
    resources = [aws_sqs_queue.publisher.arn]
    }
    }

    resource "aws_s3_bucket_object" "consumer_s3_source" {
    bucket = "${var.deploy_config.environment}-lambdas"
    key = "${var.deploy_config.project}/${local.consumer_package_name}"
    source = local.consumer_package_path
    etag = md5(local.consumer_package_path)
    }

    resource "aws_lambda_event_source_mapping" "consumer_mapping" {
    event_source_arn = aws_sqs_queue.publisher.arn
    enabled = true
    function_name = aws_lambda_function.message_consumer.arn
    }
    49 changes: 49 additions & 0 deletions lambda-wrapper.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,49 @@
    import { diag, DiagConsoleLogger, DiagLogLevel } from "@opentelemetry/api";
    import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
    import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc";
    import { registerInstrumentations } from "@opentelemetry/instrumentation";
    import { awsLambdaDetector } from "@opentelemetry/resource-detector-aws";
    import {
    detectResourcesSync,
    envDetector,
    processDetector,
    } from "@opentelemetry/resources";
    import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base";
    import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";

    const initTracing = () => {
    diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
    const defaultCollector = "https://o11y.ops.internal.dev:4317"

    const resource = detectResourcesSync({
    detectors: [awsLambdaDetector, envDetector, processDetector],
    });

    const provider = new NodeTracerProvider({
    resource: resource,
    });

    const exporter = new OTLPTraceExporter({
    url: defaultCollector,
    });

    provider.addSpanProcessor(new BatchSpanProcessor(exporter));

    registerInstrumentations({
    tracerProvider: provider,
    instrumentations: [
    getNodeAutoInstrumentations({
    "@opentelemetry/instrumentation-aws-lambda": {
    disableAwsContextPropagation: true,
    },
    "@opentelemetry/instrumentation-fs": {
    enabled: false, // This is very noisy, and at least on lambda isn't very helpful
    },
    }),
    ],
    });

    provider.register();
    };

    initTracing();
    34 changes: 34 additions & 0 deletions simple.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,34 @@
    import { isSpanContextValid, SpanOptions, trace } from "@opentelemetry/api";
    import { parseTraceParent } from "@opentelemetry/core";
    import { Context, SQSEvent } from "aws-lambda";

    const tracer = trace.getTracerProvider().getTracer("message-consumer");

    const doWork = async (accountId: string) => {
    const span = tracer.startSpan("dowork");
    console.log("doing event", accountId);
    span.setAttribute("accountId", accountId);
    await new Promise((resolve) => setTimeout(resolve, 5000));
    span.end();
    };

    export const handler = async (event: SQSEvent, context: Context) => {
    for (const record of event.Records) {
    const traceParent =
    record.messageAttributes["traceparent"]?.stringValue ?? "";
    console.log("traceparent", traceParent);
    const parentCtx = parseTraceParent(traceParent);
    console.log("parentctx", JSON.stringify(parentCtx));
    const options: SpanOptions = {};
    if (parentCtx && isSpanContextValid(parentCtx)) {
    options.links = [{ context: parentCtx }];
    }
    console.log("options", JSON.stringify(options));
    await tracer.startActiveSpan("handler", options, async (span) => {
    await doWork(record.body);
    console.log("spanContext", JSON.stringify(span.spanContext()));
    span.end();
    });
    console.log("done");
    }
    };