Source code for ecr_scan_reporter.repos_scanner

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille<john@compose-x.io>

"""Main module."""

import re
import uuid
from json import dumps

from boto3 import session

from ecr_scan_reporter.common import chunked_iterable

"""
Module to trigger ECR Repositories scans
"""

DEFAULT_REGEXP = re.compile(r"^.*$")


[docs]def list_ecr_repos(repos=None, next_token=None, ecr_session=None): """ Function to retrieve all the ECR repositories :param repos: :param next_token: :param boto3.session.Session ecr_session: :return: """ if repos is None: repos = [] if not ecr_session: ecr_session = session.Session() client = ecr_session.client("ecr") if not next_token: res = client.describe_repositories() else: res = client.describe_repositories(nextToken=next_token) repos += res["repositories"] if "nextToken" in res and res["nextToken"]: return list_ecr_repos(repos=repos, next_token=res["nextToken"], ecr_session=ecr_session) return repos
[docs]def filter_repos_from_regexp(repos_list, repos_names_filter=None): """ Function to filter repositories based their name and a regular expression :param repos_list: :param repos_names_filter: :return: """ filtered_repos = [] if repos_names_filter and isinstance(repos_names_filter, str): repos_filter = re.compile(repos_names_filter) else: repos_filter = DEFAULT_REGEXP for repo in repos_list: if isinstance(repo, dict): if "repositoryName" not in repo.keys(): raise KeyError("Missing repository name from ") repo_name = repo["repositoryName"] elif isinstance(repo, str): repo_name = repo else: raise TypeError("The repo list must be a list of dicts or str. Got", type(repo)) if repos_filter.match(repo_name): filtered_repos.append(repo_name) return filtered_repos
[docs]def job_dispatcher(queue_url, repos, sqs_session=None): """ Sends a new job in SQS to distribute the images listing and scan for a given repository :param str queue_url: :param list[dict] repos: :param boto3.session.Session sqs_session: :return: """ if not sqs_session: sqs_session = session.Session() client = sqs_session.client("sqs") for repo in repos: print(f"Sending job for {repo['repositoryName']}") client.send_message(QueueUrl=queue_url, MessageBody=dumps(repo))