mirror of
https://github.com/projectatomic/atomic.git
synced 2026-02-06 03:45:28 +01:00
Added the ability to define custom docker args in the plugin configuration files for things like bind mounting dirs from the host to the scanner image. When parsing the atomic scan json files, we can now handle 'Vulnerabilities' or 'Results' for keys. Added a --debug switch to the base atomic command to allow for more specifics when an unwanted exception is raised. Fixed minor bug where if no image/container is provided, the scanner would still execute.
316 lines
13 KiB
Python
316 lines
13 KiB
Python
from . import Atomic
|
|
from . import util
|
|
from datetime import datetime
|
|
import os
|
|
from shutil import rmtree
|
|
import json
|
|
import sys
|
|
|
|
|
|
class Scan(Atomic):
|
|
"""
|
|
Scan class that can generically work any scanner
|
|
"""
|
|
DEBUG = False
|
|
results = '/var/lib/atomic'
|
|
|
|
def __init__(self):
|
|
super(Scan, self).__init__()
|
|
self.scan_dir = None
|
|
self.rootfs_paths = []
|
|
self.cur_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
|
|
self.chroot_dir = '/run/atomic/{}'.format(self.cur_time)
|
|
self.results_dir = None
|
|
self.scan_content = {}
|
|
self.atomic_config = util.get_atomic_config()
|
|
self.scanners = util.get_scanners()
|
|
|
|
def scan(self):
|
|
def get_scan_info(scanner, scan_type):
|
|
for i in self.scanners:
|
|
if i['scanner_name'] == scanner:
|
|
for x in i['scans']:
|
|
if x['name'] == scan_type:
|
|
return i['image_name'], x['args'], i.get('custom_args')
|
|
|
|
if self.args.list:
|
|
self.print_scan_list()
|
|
|
|
scan_type = self.get_scan_type()
|
|
# Load the atomic config file and check scanner settings
|
|
yaml_error = "The image name or scanner arguments for '{}' is not " \
|
|
"defined in /etc/atomic.conf".format(self.args.scanner)
|
|
|
|
scanner_image_name, scanner_args, custom_args = get_scan_info(self.args.scanner, scan_type)
|
|
|
|
if not isinstance(scanner_args, list):
|
|
raise ValueError("The scanner arguments for {} must be in list"
|
|
" ([]) form.".format(self.args.scanner))
|
|
|
|
if None in [scanner_image_name, scanner_args]:
|
|
raise ValueError(yaml_error)
|
|
|
|
self.results_dir = os.path.join(self.results, self.args.scanner, self.cur_time)
|
|
scan_list = self._get_scan_list()
|
|
for i in scan_list:
|
|
self.scan_content[i['Id']] = i.get('input')
|
|
|
|
# mount all the rootfs
|
|
self._mount_scan_rootfs(scan_list)
|
|
|
|
docker_args = ['docker', 'run', '-it', '--rm', '-v', '/etc/localtime:/etc/localtime',
|
|
'-v', '{}:{}'.format(self.chroot_dir, '/scanin'), '-v',
|
|
'{}:{}'.format(self.results_dir, '/scanout')]
|
|
|
|
# Assemble the cmd line for the scan
|
|
scan_cmd = docker_args
|
|
if custom_args is not None:
|
|
scan_cmd = scan_cmd + custom_args
|
|
scan_cmd = scan_cmd + [scanner_image_name] + scanner_args
|
|
|
|
# Show the command being run
|
|
util.writeOut(" ".join(scan_cmd))
|
|
|
|
# do the scan
|
|
util.check_call(scan_cmd)
|
|
|
|
# umount all the rootfs
|
|
self._umount_rootfs_in_dir()
|
|
|
|
# output results
|
|
self.output_results()
|
|
|
|
# record environment
|
|
self.record_environment()
|
|
|
|
def _get_scan_list(self):
|
|
|
|
def gen_images():
|
|
slist = []
|
|
for image in self.get_images():
|
|
image['input'] = image['Id']
|
|
slist.append(image)
|
|
return slist
|
|
|
|
def gen_containers():
|
|
slist = []
|
|
for con in self.get_containers():
|
|
con['input'] = con['Id']
|
|
slist.append(con)
|
|
return slist
|
|
|
|
if self.args.images:
|
|
scan_list = gen_images()
|
|
elif self.args.containers:
|
|
scan_list = gen_containers()
|
|
elif self.args.all:
|
|
scan_list = gen_containers() + gen_images()
|
|
else:
|
|
scan_list = []
|
|
images = self.get_images()
|
|
containers = self.get_containers()
|
|
for scan_input in self.args.scan_targets:
|
|
docker_object = (next((item for item in containers
|
|
if item['Id'] == self.get_input_id(scan_input)), None))
|
|
docker_object = docker_object if docker_object is not None \
|
|
else (next((item for item in images if item['Id'] == self.get_input_id(scan_input)), None))
|
|
docker_object['input'] = scan_input
|
|
scan_list.append(docker_object)
|
|
if len(scan_list) < 1:
|
|
raise ValueError("You must provide at least one container or image for atomic "
|
|
"scan. See 'atomic scan --help' for more information")
|
|
|
|
return scan_list
|
|
|
|
def _mount_scan_rootfs(self, scan_list):
|
|
if not os.path.exists(self.chroot_dir):
|
|
os.makedirs(self.chroot_dir)
|
|
if self.DEBUG:
|
|
util.writeOut("Created {}".format(self.chroot_dir))
|
|
for docker_object in scan_list:
|
|
mount_path = os.path.join(self.chroot_dir, docker_object['Id'])
|
|
os.mkdir(mount_path)
|
|
if self.DEBUG:
|
|
util.writeOut("Created {}".format(mount_path))
|
|
self.mount(mountpoint=mount_path, image=docker_object['Id'])
|
|
if self.DEBUG:
|
|
util.writeOut("Mounted {} to {}".format(docker_object, mount_path))
|
|
|
|
def _umount_rootfs_in_dir(self):
|
|
for _dir in self.get_rootfs_paths():
|
|
rootfs_dir = os.path.join(self.chroot_dir, _dir)
|
|
self.unmount(rootfs_dir)
|
|
|
|
# Clean up temporary containers
|
|
|
|
if not self.DEBUG:
|
|
# Remove the temporary container dirs
|
|
rmtree(rootfs_dir)
|
|
else:
|
|
util.writeOut("Unmounted {}".format(rootfs_dir))
|
|
if not self.DEBUG:
|
|
rmtree(self.chroot_dir)
|
|
|
|
def get_rootfs_paths(self):
|
|
"""
|
|
Returns the list of rootfs paths (not fully qualified); if defined,
|
|
returns self.rootfs_paths, else defines and returns it
|
|
:return: list
|
|
"""
|
|
def _get_rootfs_paths():
|
|
return next(os.walk(self.chroot_dir))[1]
|
|
|
|
if len(self.rootfs_paths) < 1:
|
|
self.rootfs_paths = _get_rootfs_paths()
|
|
return self.rootfs_paths
|
|
|
|
def output_results(self):
|
|
"""
|
|
Write results of the scan to stdout
|
|
:return: None
|
|
"""
|
|
json_files = self._get_json_files()
|
|
for json_file in json_files:
|
|
json_results = json.load(open(json_file))
|
|
uuid = os.path.basename(json_results['UUID'])
|
|
name1 = self._get_input_name_for_id(uuid)
|
|
if not self._is_iid(uuid):
|
|
name2 = uuid[:15]
|
|
else:
|
|
# Containers do not have repo names
|
|
if uuid not in [x['Id'] for x in self.get_containers()]:
|
|
name2 = self._get_repo_names(uuid)
|
|
else:
|
|
name2 = uuid[:15]
|
|
util.writeOut("\n{} ({})\n".format(name1, name2))
|
|
if json_results['Successful'].upper() == "FALSE":
|
|
util.writeOut("{}{} is not supported for this scan."
|
|
.format(' ' * 5, self._get_input_name_for_id(uuid)))
|
|
elif 'Vulnerabilities' in json_results and len(json_results['Vulnerabilities']) > 0:
|
|
util.writeOut("The following issues were found:\n")
|
|
for vul in json_results['Vulnerabilities']:
|
|
if 'Title' in vul:
|
|
util.writeOut("{}{}".format(' ' * 5, vul['Title']))
|
|
if 'Severity' in vul:
|
|
util.writeOut("{}Severity: {}".format(' ' * 5, vul['Severity']))
|
|
if 'Custom' in vul.keys() and len(vul['Custom']) > 0:
|
|
custom_field = vul['Custom']
|
|
self._output_custom(custom_field, 7)
|
|
util.writeOut("")
|
|
elif 'Results' in json_results and len(json_results['Results']) > 0:
|
|
util.writeOut("The following results were found:\n")
|
|
for result in json_results['Results']:
|
|
if 'Custom' in result.keys() and len(result['Custom']) > 0:
|
|
custom_field = result['Custom']
|
|
self._output_custom(custom_field, 7)
|
|
util.writeOut("")
|
|
else:
|
|
util.writeOut("{} passed the scan".format(self._get_input_name_for_id(uuid)))
|
|
util.writeOut("\nFiles associated with this scan are in {}.\n".format(self.results_dir))
|
|
|
|
def _output_custom(self, value, indent):
|
|
space = ' ' * indent
|
|
next_indent = indent + 2
|
|
if isinstance(value, dict):
|
|
for x in value:
|
|
if isinstance(value[x], dict):
|
|
util.writeOut("{}{}:".format(space, x))
|
|
self._output_custom(value[x], next_indent)
|
|
elif isinstance(value[x], list):
|
|
util.writeOut("{}{}:".format(space, x))
|
|
self._output_custom(value[x], next_indent)
|
|
else:
|
|
util.writeOut("{}{}: {}".format(space, x, value[x]))
|
|
elif isinstance(value, list):
|
|
for x in value:
|
|
if isinstance(x, dict):
|
|
self._output_custom(x, next_indent)
|
|
elif isinstance(x, list):
|
|
self._output_custom(x, next_indent)
|
|
else:
|
|
util.writeOut('{}{}'.format(space, x))
|
|
|
|
def _get_json_files(self):
|
|
json_files = []
|
|
for files in os.walk(self.results_dir):
|
|
for jfile in files[2]:
|
|
if jfile == 'json':
|
|
json_files.append(os.path.join(files[0], jfile))
|
|
return json_files
|
|
|
|
def _get_input_name_for_id(self, iid):
|
|
return self.scan_content[iid]
|
|
|
|
def _is_iid(self, input_name):
|
|
if input_name.startswith(self.scan_content[input_name]):
|
|
return True
|
|
return False
|
|
|
|
def _get_repo_names(self, docker_id):
|
|
_match = next((x for x in self.get_images() if x['Id'] == docker_id), None)
|
|
if _match is None:
|
|
_match = next((x for x in self.get_containers() if x['Id'] == docker_id), None)
|
|
if'<none>' in _match['RepoTags'][0]:
|
|
return docker_id[:15]
|
|
else:
|
|
return ', '.join(_match['RepoTags'])
|
|
|
|
def record_environment(self):
|
|
"""
|
|
Grabs a "snapshot" the results of docker info and inspect results for
|
|
all images and containers. Write it to results_dir/environment.json
|
|
:return: None
|
|
"""
|
|
|
|
environment = {}
|
|
environment['info'] = self.d.info()
|
|
environment['images'] = []
|
|
for iid in [x['Id'] for x in self.get_images()]:
|
|
environment['images'].append(self._inspect_image(image=iid))
|
|
|
|
environment['containers'] = []
|
|
for cid in [x['Id'] for x in self.get_containers()]:
|
|
environment['containers'].append(self._inspect_container(name=cid))
|
|
|
|
with open(os.path.join(self.results_dir, 'environment.json'), 'w') as f:
|
|
json.dump(environment, f, indent=4, separators=(',', ': '))
|
|
|
|
def get_scan_type(self):
|
|
default_scan_type = None
|
|
scan_types = []
|
|
for i in self.scanners:
|
|
if i['scanner_name'] == self.args.scanner:
|
|
default_scan_type = i.get('default_scan')
|
|
if self.args.scan_type is None and default_scan_type is None:
|
|
raise ValueError("No scan type was given and there is no "
|
|
"default scan type defined for '{}'".format(self.args.scanner))
|
|
for x in i['scans']:
|
|
scan_types.append(x['name'])
|
|
if self.args.scan_type is None:
|
|
return default_scan_type
|
|
elif self.args.scan_type in scan_types:
|
|
return self.args.scan_type
|
|
else:
|
|
raise ValueError("Unable to find the scan type '{}' for '{}'.".
|
|
format(self.args.scan_type, self.args.scanner))
|
|
|
|
def print_scan_list(self):
|
|
default_scanner = self.atomic_config.get('default_scanner')
|
|
if default_scanner is None:
|
|
default_scanner = ''
|
|
for scanner in self.scanners:
|
|
scanner_name = scanner['scanner_name']
|
|
df = '* ' if scanner_name == default_scanner else ''
|
|
default_scan_type = scanner.get('default_scan')
|
|
if default_scan_type is None:
|
|
raise ValueError("Invalid configuration file: At least one scan type must be "
|
|
"declared as the default for {}.".format(scanner_name))
|
|
util.writeOut("Scanner: {} {}".format(scanner_name, df))
|
|
util.writeOut("{}Image Name: {}".format(" " * 2, scanner['image_name']))
|
|
for scan_type in scanner['scans']:
|
|
df = '* ' if default_scan_type == scan_type['name'] else ''
|
|
util.writeOut("{}Scan type: {} {}".format(" " * 5, scan_type['name'], df))
|
|
util.writeOut("{}Description: {}\n".format(" " * 5, scan_type['description']))
|
|
util.writeOut("\n* denotes defaults")
|
|
sys.exit(0)
|