Processing Scan Results Provided By GuardDuty S3 Malware Scanning
Reference: https://medium.com/aws-tip/s3-malware-scanning-using-guardduty-936b4436c3a4
https://docs.aws.amazon.com/guardduty/latest/ug/monitor-with-eventbridge-s3-malware-protection.html
In this post, we’ll see how we can process scan results provided by S3 Malware Scanning Using GuardDuty Scan.
Step 1: Create a bucket and follow https://aws.amazon.com/blogs/aws/introducing-amazon-guardduty-malware-protection-for-amazon-s3/ to setup Malware protection for S3. You can specify a prefix so that only objects uploaded to that prefix are scanned.
Step 2: Create an SQS queue to store scan results.
Step 3: Create an Eventbridge rule with following pattern
{
"detail-type": ["GuardDuty Malware Protection Object Scan Result"],
"source": ["aws.guardduty"],
"detail.s3ObjectDetails.bucketName": ["<YOUR_BUCKET_NAME>"],
}
and set SQS queue as target
Step 4: Create a python script with following code. This script will poll SQS queue and if object scan result is NO_THREATS_FOUND, it will move the object to processed/ folder.
import boto3
import time
import json
from botocore.exceptions import ClientError
# Replace with your queue URL
QUEUE_URL = '<SQS_QUEUE_URL>'
# Initialize SQS client
sqs = boto3.client('sqs', region_name='us-east-1') # e.g., 'us-east-1'
# Initialize S3 client
s3 = boto3.client('s3', region_name='us-east-1') # e.g., 'us-east-1'
def process_message(message):
"""
Function to process the received message.
Extract details from the message body assuming it's in JSON format.
"""
try:
# Parse the message body assuming it's in JSON format
body = json.loads(message['Body'])
# Extract details from the parsed JSON object
# Replace these keys with the actual keys in your message body
bucketName = body['detail']['s3ObjectDetails']['bucketName']
sourceKey=body['detail']['s3ObjectDetails']['objectKey']
destinationKey='processed/'+ sourceKey.split('/', 1)[1]
scanResult=body['detail']['scanResultDetails']['scanResultStatus']
if scanResult == "NO_THREATS_FOUND":
print(f"Scan completed for {sourceKey} with result NO_THREATS_FOUND")
elif scanResult == "THREATS_FOUND":
print(f"Scan completed for {sourceKey} with result THREATS_FOUND.File will be deleted")
s3.delete_object(Bucket= bucketName, Key=sourceKey)
print(f"Deleted original {sourceKey}")
try:
# Check Scan Results
if scanResult == "NO_THREATS_FOUND":
print(f"Scan completed for {sourceKey} with result NO_THREATS_FOUND")
# Check if the object exists in the source bucket
s3.head_object(Bucket=bucketName, Key=sourceKey)
# Copy the object to the new location
copy_source = {'Bucket': bucketName, 'Key': sourceKey}
try:
s3.copy_object(CopySource=copy_source, Bucket=bucketName, Key=destinationKey)
print(f"Copied {sourceKey} to {destinationKey}")
except ClientError as e:
print(f"Failed to copy {sourceKey}: {e}")
raise e
except ClientError as e:
if e.response['Error']['Code'] == '404':
raise Exception(f"Object {sourceKey} does not exist in the source path.")
else:
print(f"An error occurred: {e}")
raise e
else:
# Delete the original object
s3.delete_object(Bucket= bucketName, Key=sourceKey)
print(f"Deleted original {sourceKey}")
except json.JSONDecodeError:
print("Failed to parse message body as JSON")
except KeyError as e:
print(f"Missing expected key in message body: {e}")
def poll_queue():
while True:
try:
# Receive messages from the queue
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10, # Max is 10
WaitTimeSeconds=10, # Long polling
VisibilityTimeout=30 # Time the message remains hidden after being read
)
# Check if there are messages
messages = response.get('Messages', [])
if messages:
for message in messages:
# Process each message
process_message(message)
# Delete the message after processing
try:
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=message['ReceiptHandle']
)
print(f"Deleted message: {message['MessageId']}")
except ClientError as e:
print(f"Failed to delete message: {e}")
else:
print("No messages in queue...")
except ClientError as e:
print(f"Error receiving messages: {e}")
# Delay before polling again (adjust as needed)
time.sleep(5)
if __name__ == "__main__":
poll_queue()
Step 5: Run the script and upload some files to upload folder. Once the scan is completed and no threats found, then the file will be moved to processed/ folder.