Source code for ecr_scan_reporter.services_scanner
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille<john@compose-x.io>
import re
from boto3.session import Session
from compose_x_common.aws import get_assume_role_session
from compose_x_common.aws.ecr import PRIVATE_ECR_URI_RE
from compose_x_common.compose_x_common import keyisset
[docs]def list_all_task_definitions(definitions=None, next_token=None, ecs_session=None):
"""
Simple recursive function to list all the task definitions into an account+region.
:param list definitions:
:param str next_token:
:param boto3.session.Session ecs_session:
:return: list of active task definitions
:rtype: list
"""
if ecs_session is None:
ecs_session = Session()
client = ecs_session.client("ecs")
if definitions is None:
definitions = []
if next_token:
defs_r = client.list_task_definitions(nextToken=next_token, status='ACTIVE')
else:
defs_r = client.list_task_definitions(status='ACTIVE')
definitions += defs_r["taskDefinitionArns"]
if keyisset("nextToken", defs_r):
return list_all_task_definitions(definitions, defs_r["nextToken"], ecs_session)
return definitions
[docs]def list_container_definitions_images(task_definition, ecs_session=None):
"""
Simple function to list the images of a given task definition
:param str task_definition:
:param boto3.session.Session ecs_session:
:return: list of images
:rtype: list
"""
if ecs_session is None:
ecs_session = Session()
client = ecs_session.client("ecs")
task_def = client.describe_task_definition(
taskDefinition=task_definition,
include=[
'TAGS',
],
)["taskDefinition"]
images = [container["image"] for container in task_def["containerDefinitions"]]
return images
[docs]def build_services_images_registries(roles=None, lambda_session=None):
if lambda_session is None:
lambda_session = Session()
if roles and isinstance(roles, str):
roles = roles.split(",")
elif roles and not isinstance(roles, list):
raise TypeError("roles must either be comma delimited list string or a list of IAM roles")
images = []
if not roles:
task_definitions = list_all_task_definitions(ecs_session=lambda_session)
for task_def in task_definitions:
images += list_container_definitions_images(task_def, ecs_session=lambda_session)
else:
for role_arn in roles:
ecs_session = get_assume_role_session(lambda_session, role_arn, session_name="ECRScanner")
task_definitions = list_all_task_definitions(ecs_session=ecs_session)
for task_def in task_definitions:
images += list_container_definitions_images(task_def, ecs_session=ecs_session)
registries = transform_image_description(images)
return registries
[docs]def handle_ecs_discovery(roles=None, lambda_session=None):
if lambda_session is None:
lambda_session = Session()
registries = build_services_images_registries(roles, lambda_session)
account = lambda_session.client("sts").get_caller_identity()["Account"]
jobs = []
for registry_id, registry in registries.items():
if registry_id != account:
print(
f"The registry {registry_id} is in a different account. Not performing image scan. Skipping images",
registry.keys(),
)
continue
for repo_name, images in registry.items():
images_ids = []
for digest in images["imageDigests"]:
images_ids.append({"imageDigest": digest})
for tag in images["imageTags"]:
images_ids.append({"imageTag": tag})
jobs.append({"repositoryName": repo_name, "images": images_ids, "registryId": registry_id})
return jobs