PacketAI's CloudWatch collector (lambda function for AWS) sends logs to PacketAI for automated anomaly detection. Here is the code for the lambda function:
Copy import https from 'https';
import zlib from 'zlib';
const PAI_REQUEST_TIMEOUT_MS = 10000;
const PAI_HTTP_URL = "vector-ingester-logpatterns.packetai.co";
const PAI_HTTP_PORT = 443;
const PAI_CLUSTER_NAME = "aws";
const PAI_API_KEY = "YOUR_API_KEY_HERE";
const X_PAI_TOKEN = "YOUR_PAI_TOKEN";
// entry point
export const handler = async (event) => {
const payload = Buffer.from(event.awslogs.data, 'base64');
function trimCharacters(appName) {
return appName.toLowerCase().replace(/[^a-z0-9_-]/g, "").substring(0, 48);
}
function getAppName(logGroupName, logStreamName) {
let appName = 'default';
// /aws/eks/myekscluster/cluster
const logparts = logGroupName.split('/');
let matched;
// remove empty first element
if (logparts[0] === '') {
logparts.splice(0, 1);
}
if (logparts.length > 2 && logparts[0] == 'aws') {
switch (logparts[1]) {
case 'eks':
matched = logStreamName.toLowerCase().match(/(.*)-([0-9a-f]*)$/);
appName = matched != null && matched.length > 2 ? matched[1] : logparts[2];
break;
case 'ecs':
matched = logStreamName.toLowerCase().match("/(.*)-([0-9a-f]+)$/");
appName = matched != null && matched.length > 2 ? matched[1] : logStreamName.toLowerCase();
break;
case 'rds':
// /aws/rds/instance/mariadb-test1/error
appName = logparts.length > 3 ? logparts[3] : 'default';
break;
case 'lambda':
// /aws/lambda/deleteOldSnapshots
// logparts = ['aws', 'lambda', 'deleteOldSnapshots']
appName = logparts.length > 2 ? logparts[2] : 'default';
break;
case 'kinesisfirehose':
// /aws/kinesisfirehose/deleteOldSnapshots
// logparts = ['aws', 'kinesisfirehose', 'deleteOldSnapshots']
appName = logparts.length > 2 ? logparts[2] : 'default';
break;
case 'codebuild':
// /aws/codebuild/dev
// logparts = ['aws', 'codebuild', 'dev']
appName = logparts.length > 2 ? logparts[2] : 'default';
break;
default:
appName = logparts.slice(1).join('-');
break;
}
} else {
appName = logparts.join('-');
}
return trimCharacters(appName);
}
function parseEvent(logEvent, logGroupName, logStreamName) {
const appName = getAppName(logGroupName, logStreamName);
return {
// remove '\\n' character at the end of the event
message: logEvent.message.trim(),
logGroupName,
logStreamName,
'packetai.cluster_name': trimCharacters(PAI_CLUSTER_NAME),
'packetai.app_name': appName,
'@timestamp': new Date(logEvent.timestamp).toISOString()
};
}
function postEventsToPacketAI(parsedEvents) {
let eventList = [];
parsedEvents.map((events) => {
return eventList.push(events);
});
eventList = JSON.stringify(eventList, null, 0)
try {
const options = {
hostname: PAI_HTTP_URL,
port: PAI_HTTP_PORT,
path: `/fluent/log`,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': eventList.length,
'X-PAI-TOKEN': X_PAI_TOKEN,
'X-PAI-IID': PAI_API_KEY,
timeout: PAI_REQUEST_TIMEOUT_MS,
}
};
const req = https.request(options, (res) => {
res.on('data', (data) => {
if (res.statusCode >= 200 && res.statusCode <= 299) {
return;
}
});
res.on('end', () => {
console.log('No more data in response.');
return;
});
})
.on('error', (err) => {
console.log('problem with request:', err.toString());
return;
})
.on('timeout', (err) => {
console.log('request timedout:', err.toString());
req.destroy();
return;
});
req.write(eventList);
req.end();
} catch (ex) {
console.log(ex.message);
throw `${ex.message}`;
}
}
zlib.gunzip(payload, (error, result) => {
if (error) {
throw `${error}`;
} else {
const resultParsed = JSON.parse(result.toString('ascii'));
const parsedEvents = resultParsed.logEvents.map((logEvent) => {
const event = parseEvent(logEvent, resultParsed.logGroup, resultParsed.logStream)
return event;
});
postEventsToPacketAI(parsedEvents);
}
});
};