AWS Cloudwatch

PacketAI's CloudWatch collector (lambda function for AWS) sends logs to PacketAI for automated anomaly detection. Here is the code for the lambda function:

import https from 'https';
import zlib from 'zlib';

const PAI_REQUEST_TIMEOUT_MS = 10000;

const PAI_HTTP_URL = "vector-ingester-logpatterns.packetai.co";
const PAI_HTTP_PORT = 443;
const PAI_CLUSTER_NAME = "aws";
const PAI_API_KEY = "YOUR_API_KEY_HERE";
const X_PAI_TOKEN = "YOUR_PAI_TOKEN";


// entry point
export const handler = async (event) => {
    const payload = Buffer.from(event.awslogs.data, 'base64');
    function trimCharacters(appName) {
        return appName.toLowerCase().replace(/[^a-z0-9_-]/g, "").substring(0, 48);
    }
    function getAppName(logGroupName, logStreamName) {
        let appName = 'default';
        // /aws/eks/myekscluster/cluster
        const logparts = logGroupName.split('/');
        let matched;
        // remove empty first element
        if (logparts[0] === '') {
            logparts.splice(0, 1);
        }
        if (logparts.length > 2 && logparts[0] == 'aws') {
            switch (logparts[1]) {
                case 'eks':
                    matched = logStreamName.toLowerCase().match(/(.*)-([0-9a-f]*)$/);
                    appName = matched != null && matched.length > 2 ? matched[1] : logparts[2];
                    break;
                case 'ecs':
                    matched = logStreamName.toLowerCase().match("/(.*)-([0-9a-f]+)$/");
                    appName = matched != null && matched.length > 2 ? matched[1] : logStreamName.toLowerCase();
                    break;
                case 'rds':
                    // /aws/rds/instance/mariadb-test1/error
                    appName = logparts.length > 3 ? logparts[3] : 'default';
                    break;
                case 'lambda':
                    // /aws/lambda/deleteOldSnapshots
                    // logparts = ['aws', 'lambda', 'deleteOldSnapshots']
                    appName = logparts.length > 2 ? logparts[2] : 'default';
                    break;
                case 'kinesisfirehose':
                    // /aws/kinesisfirehose/deleteOldSnapshots
                    // logparts = ['aws', 'kinesisfirehose', 'deleteOldSnapshots']
                    appName = logparts.length > 2 ? logparts[2] : 'default';
                    break;
                case 'codebuild':
                    // /aws/codebuild/dev
                    // logparts = ['aws', 'codebuild', 'dev']
                    appName = logparts.length > 2 ? logparts[2] : 'default';
                    break;
                default:
                    appName = logparts.slice(1).join('-');
                    break;
            }
        } else {
            appName = logparts.join('-');
        }
        return trimCharacters(appName);
    }
    function parseEvent(logEvent, logGroupName, logStreamName) {
        const appName = getAppName(logGroupName, logStreamName);
        return {
            // remove '\\n' character at the end of the event
            message: logEvent.message.trim(),
            logGroupName,
            logStreamName,
            'packetai.cluster_name': trimCharacters(PAI_CLUSTER_NAME),
            'packetai.app_name': appName,
            '@timestamp': new Date(logEvent.timestamp).toISOString()
        };
    }
    function postEventsToPacketAI(parsedEvents) {
        let eventList = [];
        parsedEvents.map((events) => {
            return eventList.push(events);
        });
        eventList = JSON.stringify(eventList, null, 0)
        try {
            const options = {
                hostname: PAI_HTTP_URL,
                port: PAI_HTTP_PORT,
                path: `/fluent/log`,
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Content-Length': eventList.length,
                    'X-PAI-TOKEN': X_PAI_TOKEN,
                    'X-PAI-IID': PAI_API_KEY,
                    timeout: PAI_REQUEST_TIMEOUT_MS,
                }
            };
            const req = https.request(options, (res) => {
                res.on('data', (data) => {
                    if (res.statusCode >= 200 && res.statusCode <= 299) {
                        return;
                    }
                });
                res.on('end', () => {
                    console.log('No more data in response.');
                    return;
                });
            })
                .on('error', (err) => {
                    console.log('problem with request:', err.toString());
                    return;
                })
                .on('timeout', (err) => {
                    console.log('request timedout:', err.toString());
                    req.destroy();
                    return;
                });
            req.write(eventList);
            req.end();
        } catch (ex) {
            console.log(ex.message);
            throw `${ex.message}`;
        }
    }
    zlib.gunzip(payload, (error, result) => {
        if (error) {
            throw `${error}`;
        } else {
            const resultParsed = JSON.parse(result.toString('ascii'));
            const parsedEvents = resultParsed.logEvents.map((logEvent) => {
                const event = parseEvent(logEvent, resultParsed.logGroup, resultParsed.logStream)
                return event;
            });
            postEventsToPacketAI(parsedEvents);
        }
    });
};

Preparation

  1. If you have an existing Lambda function associated with the log group to be set up, you must go to AWS CloudWatch page and delete the existing subscription filter, otherwise you will get this error message: “An error occurred when creating the trigger: The log group host-log already has an enabled subscription filter associated with it.”

  2. If you do not have an existing role with Lambda execution permission, you should got to AWS IAM service to create a role for running Lambda functions.

Installation

  1. Create a new lambda function with the code above

    To create a new Lambda function

    1. Select “Author from scratch”

    2. Provide the following base information:

      • Function Name: packetai-cloudwatch

      • Runtime: Node.js.18.x

    3. Click on “Create function”

  2. Click on Designer and click on “Add a trigger”. Type “CloudWatch Logs” and choose your log group.

Configuration

No additional configuration is required

Last updated