Processing Scan Results Provided By GuardDuty S3 Malware Scanning

Vinayak Pandey
3 min readSep 5, 2024

--

Reference: https://medium.com/aws-tip/s3-malware-scanning-using-guardduty-936b4436c3a4

https://docs.aws.amazon.com/guardduty/latest/ug/monitor-with-eventbridge-s3-malware-protection.html

In this post, we’ll see how we can process scan results provided by S3 Malware Scanning Using GuardDuty Scan.

Step 1: Create a bucket and follow https://aws.amazon.com/blogs/aws/introducing-amazon-guardduty-malware-protection-for-amazon-s3/ to setup Malware protection for S3. You can specify a prefix so that only objects uploaded to that prefix are scanned.

Step 2: Create an SQS queue to store scan results.

Step 3: Create an Eventbridge rule with following pattern

{
"detail-type": ["GuardDuty Malware Protection Object Scan Result"],
"source": ["aws.guardduty"],
"detail.s3ObjectDetails.bucketName": ["<YOUR_BUCKET_NAME>"],

}

and set SQS queue as target

Step 4: Create a python script with following code. This script will poll SQS queue and if object scan result is NO_THREATS_FOUND, it will move the object to processed/ folder.

import boto3
import time
import json
from botocore.exceptions import ClientError

# Replace with your queue URL
QUEUE_URL = '<SQS_QUEUE_URL>'

# Initialize SQS client
sqs = boto3.client('sqs', region_name='us-east-1') # e.g., 'us-east-1'

# Initialize S3 client
s3 = boto3.client('s3', region_name='us-east-1') # e.g., 'us-east-1'

def process_message(message):
"""
Function to process the received message.
Extract details from the message body assuming it's in JSON format.
"""
try:
# Parse the message body assuming it's in JSON format
body = json.loads(message['Body'])

# Extract details from the parsed JSON object
# Replace these keys with the actual keys in your message body
bucketName = body['detail']['s3ObjectDetails']['bucketName']
sourceKey=body['detail']['s3ObjectDetails']['objectKey']
destinationKey='processed/'+ sourceKey.split('/', 1)[1]
scanResult=body['detail']['scanResultDetails']['scanResultStatus']
if scanResult == "NO_THREATS_FOUND":
print(f"Scan completed for {sourceKey} with result NO_THREATS_FOUND")
elif scanResult == "THREATS_FOUND":
print(f"Scan completed for {sourceKey} with result THREATS_FOUND.File will be deleted")
s3.delete_object(Bucket= bucketName, Key=sourceKey)
print(f"Deleted original {sourceKey}")

try:
# Check Scan Results
if scanResult == "NO_THREATS_FOUND":
print(f"Scan completed for {sourceKey} with result NO_THREATS_FOUND")

# Check if the object exists in the source bucket
s3.head_object(Bucket=bucketName, Key=sourceKey)

# Copy the object to the new location
copy_source = {'Bucket': bucketName, 'Key': sourceKey}
try:
s3.copy_object(CopySource=copy_source, Bucket=bucketName, Key=destinationKey)
print(f"Copied {sourceKey} to {destinationKey}")
except ClientError as e:
print(f"Failed to copy {sourceKey}: {e}")
raise e
except ClientError as e:
if e.response['Error']['Code'] == '404':
raise Exception(f"Object {sourceKey} does not exist in the source path.")
else:
print(f"An error occurred: {e}")
raise e
else:
# Delete the original object
s3.delete_object(Bucket= bucketName, Key=sourceKey)
print(f"Deleted original {sourceKey}")
except json.JSONDecodeError:
print("Failed to parse message body as JSON")
except KeyError as e:
print(f"Missing expected key in message body: {e}")

def poll_queue():
while True:
try:
# Receive messages from the queue
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10, # Max is 10
WaitTimeSeconds=10, # Long polling
VisibilityTimeout=30 # Time the message remains hidden after being read
)

# Check if there are messages
messages = response.get('Messages', [])
if messages:
for message in messages:
# Process each message
process_message(message)

# Delete the message after processing
try:
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=message['ReceiptHandle']
)
print(f"Deleted message: {message['MessageId']}")
except ClientError as e:
print(f"Failed to delete message: {e}")

else:
print("No messages in queue...")

except ClientError as e:
print(f"Error receiving messages: {e}")

# Delay before polling again (adjust as needed)
time.sleep(5)

if __name__ == "__main__":
poll_queue()

Step 5: Run the script and upload some files to upload folder. Once the scan is completed and no threats found, then the file will be moved to processed/ folder.

--

--

Vinayak Pandey
Vinayak Pandey

Written by Vinayak Pandey

Experienced Cloud Engineer with a knack of automation. Linkedin profile: https://www.linkedin.com/in/vinayakpandeyit/

Responses (1)