PacketAI's CloudWatch collector (lambda function for AWS) sends logs to PacketAI for automated anomaly detection. Here is the code for the lambda function:
Copy import https from 'https' ;
import zlib from 'zlib' ;
const PAI_REQUEST_TIMEOUT_MS = 10000 ;
const PAI_HTTP_URL = "vector-ingester-logpatterns.packetai.co" ;
const PAI_HTTP_PORT = 443 ;
const PAI_CLUSTER_NAME = "aws" ;
const PAI_API_KEY = "YOUR_API_KEY_HERE" ;
const X_PAI_TOKEN = "YOUR_PAI_TOKEN" ;
// entry point
export const handler = async (event) => {
const payload = Buffer .from ( event . awslogs .data , 'base64' );
function trimCharacters (appName) {
return appName .toLowerCase () .replace ( /[ ^ a-z0-9_-]/ g , "" ) .substring ( 0 , 48 );
}
function getAppName (logGroupName , logStreamName) {
let appName = 'default' ;
// /aws/eks/myekscluster/cluster
const logparts = logGroupName .split ( '/' );
let matched;
// remove empty first element
if (logparts[ 0 ] === '' ) {
logparts .splice ( 0 , 1 );
}
if ( logparts . length > 2 && logparts[ 0 ] == 'aws' ) {
switch (logparts[ 1 ]) {
case 'eks' :
matched = logStreamName .toLowerCase () .match ( /(. * )-([0-9a-f] * ) $ / );
appName = matched != null && matched . length > 2 ? matched[ 1 ] : logparts[ 2 ];
break ;
case 'ecs' :
matched = logStreamName .toLowerCase () .match ( "/(.*)-([0-9a-f]+)$/" );
appName = matched != null && matched . length > 2 ? matched[ 1 ] : logStreamName .toLowerCase ();
break ;
case 'rds' :
// /aws/rds/instance/mariadb-test1/error
appName = logparts . length > 3 ? logparts[ 3 ] : 'default' ;
break ;
case 'lambda' :
// /aws/lambda/deleteOldSnapshots
// logparts = ['aws', 'lambda', 'deleteOldSnapshots']
appName = logparts . length > 2 ? logparts[ 2 ] : 'default' ;
break ;
case 'kinesisfirehose' :
// /aws/kinesisfirehose/deleteOldSnapshots
// logparts = ['aws', 'kinesisfirehose', 'deleteOldSnapshots']
appName = logparts . length > 2 ? logparts[ 2 ] : 'default' ;
break ;
case 'codebuild' :
// /aws/codebuild/dev
// logparts = ['aws', 'codebuild', 'dev']
appName = logparts . length > 2 ? logparts[ 2 ] : 'default' ;
break ;
default :
appName = logparts .slice ( 1 ) .join ( '-' );
break ;
}
} else {
appName = logparts .join ( '-' );
}
return trimCharacters (appName);
}
function parseEvent (logEvent , logGroupName , logStreamName) {
const appName = getAppName (logGroupName , logStreamName);
return {
// remove '\\n' character at the end of the event
message : logEvent . message .trim () ,
logGroupName ,
logStreamName ,
'packetai.cluster_name' : trimCharacters ( PAI_CLUSTER_NAME ) ,
'packetai.app_name' : appName ,
'@timestamp' : new Date ( logEvent .timestamp) .toISOString ()
};
}
function postEventsToPacketAI (parsedEvents) {
let eventList = [];
parsedEvents .map ((events) => {
return eventList .push (events);
});
eventList = JSON .stringify (eventList , null , 0 )
try {
const options = {
hostname : PAI_HTTP_URL ,
port : PAI_HTTP_PORT ,
path : `/fluent/log` ,
method : 'POST' ,
headers : {
'Content-Type' : 'application/json' ,
'Content-Length' : eventList . length ,
'X-PAI-TOKEN' : X_PAI_TOKEN ,
'X-PAI-IID' : PAI_API_KEY ,
timeout : PAI_REQUEST_TIMEOUT_MS ,
}
};
const req = https .request (options , (res) => {
res .on ( 'data' , (data) => {
if ( res .statusCode >= 200 && res .statusCode <= 299 ) {
return ;
}
});
res .on ( 'end' , () => {
console .log ( 'No more data in response.' );
return ;
});
})
.on ( 'error' , (err) => {
console .log ( 'problem with request:' , err .toString ());
return ;
})
.on ( 'timeout' , (err) => {
console .log ( 'request timedout:' , err .toString ());
req .destroy ();
return ;
});
req .write (eventList);
req .end ();
} catch (ex) {
console .log ( ex .message);
throw ` ${ ex .message } ` ;
}
}
zlib .gunzip (payload , (error , result) => {
if (error) {
throw ` ${ error } ` ;
} else {
const resultParsed = JSON .parse ( result .toString ( 'ascii' ));
const parsedEvents = resultParsed . logEvents .map ((logEvent) => {
const event = parseEvent (logEvent , resultParsed .logGroup , resultParsed .logStream)
return event;
});
postEventsToPacketAI (parsedEvents);
}
});
};