Get Notified About ECR And Lambda Inspector Scan Findings

Vinayak Pandey
2 min readMar 5, 2024

Reference: https://docs.aws.amazon.com/inspector/latest/user/findings-managing-automating-responses.html

Recently we had a requirement to get notified about ECR and Lambda inspector scan findings. We could have created an eventbridge rule like this

{
"source": ["aws.inspector2"],
"detail-type": ["Inspector2 Finding"],
"detail": {
"severity": ["HIGH", "CRITICAL"],
"status": ["ACTIVE"]
}
}

but this rule will also get triggered for inspector findings for other services like EC2. We also wanted to have some flexibility to exempt some resources used for testing and ignore some CVEs.

For this requirement, we created 2 Lambda functions using Python 3.12 runtime, one for ECR scan results and another for Lambda scan results.

ECR Scan Result Lambda Code:

import boto3
import json
ecr_repo_list=['repoA','repB','repoC']
jmespath_expression = 'sort_by(imageDetails, &to_string(imagePushedAt))[-1].imageTags'
client = boto3.client('ecr',region_name='ap-southeast-1')
sns_client = boto3.client('sns',region_name='ap-southeast-1')
paginator = client.get_paginator('describe_images')

def lambda_handler(event, context):
for ecr_repo_name in ecr_repo_list:
high_severity_vulns=0
critical_severity_vulns=0
iterator = paginator.paginate(repositoryName=ecr_repo_name,maxResults=1000,)
filter_iterator = iterator.search(jmespath_expression)
latest_image_tag = list(filter_iterator)[0]
latest_image_scan_result = client.describe_image_scan_findings(repositoryName=ecr_repo_name,imageId={'imageTag': latest_image_tag})
vuln_status=latest_image_scan_result['imageScanFindings']['findingSeverityCounts']
if 'HIGH' in vuln_status:
high_severity_vulns=vuln_status['HIGH']
if 'CRITICAL' in vuln_status:
critical_severity_vulns=vuln_status['CRITICAL']
if(high_severity_vulns > 0 or critical_severity_vulns > 0):
message = {"version": "1.0","source": "custom","content": {
"description": ecr_repo_name+":"+latest_image_tag+" image contains "+str(critical_severity_vulns)+" critical and "+str( high_severity_vulns)+" high severity vulnerabilities"}}
response = sns_client.publish(TargetArn=<SNS_ARN>,
Message=json.dumps({'default': json.dumps(message)}),
MessageStructure='json')

Change values for ecr_repo_list and <SNS_ARN> accordingly.

Here we are fetching inspector findings for latest image and sending the results to SNS.

Our message format is compatible with AWS Chatbot custom notification schema(Refer to https://docs.aws.amazon.com/chatbot/latest/adminguide/custom-notifs.html#sample-custom-notifs) so that these results can be sent to Slack using AWS Chatbot.

IAM Permisson For Lambda: AWSLambdaBasicExecutionRole along with following permission

IAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"ecr:DescribeImageScanFindings",
"inspector2:ListCoverage",
"ecr:DescribeImages",
"inspector2:ListFindings"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": <SNS_ARN>
}
]
}

Lambda Scan Result Lambda Code:

import json
import boto3

sns_client = boto3.client('sns',region_name='ap-southeast-1')

exempted_cves=['CWE-319','CWE-117','CWE-93']
exempted_functions=['functionA','functionB']

def lambda_inspector_findings(region):
inspector = boto3.client('inspector2',region_name=region)
paginator = inspector.get_paginator('list_findings')
page_iterator = paginator.paginate(filterCriteria={
'findingStatus': [
{
'comparison': 'EQUALS',
'value': 'ACTIVE'
},
],
'resourceType': [
{
'comparison': 'EQUALS',
'value': 'AWS_LAMBDA_FUNCTION'
},
],
})

for data in page_iterator:
for finding in data['findings']:
suppress_finding=False
for exempted_cve in exempted_cves:
if exempted_cve in finding['title']:
suppress_finding=True
break
if suppress_finding==False:
for resource in finding['resources']:
if resource['details']['awsLambdaFunction']['functionName'] not in exempted_functions:
message = {"version": "1.0","source": "custom","content": {
"description": finding['title'] +" vulnerability found in Lambda function: "+resource['id']}}
sns_client.publish(TargetArn=<SNS_ARN>,
Message=json.dumps({'default': json.dumps(message)}),
MessageStructure='json')

def lambda_handler(event, context):
lambda_inspector_findings("us-east-1")
lambda_inspector_findings("ap-southeast-1")

Change values for exempted_cves, exempted_functions and <SNS_ARN> accordingly.

IAM Permisson For Lambda: AWSLambdaBasicExecutionRole along with following permission

IAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"inspector2:ListCoverage",
"inspector2:ListFindings"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": <SNS_ARN>
}
]
}

Deploy the functions and create an eventbridge rule to trigger them on daily/weekly basis.

--

--