1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-06 21:45:24 +01:00
Files
atomic/Atomic/scan.py
baude 9a5497f93d Atomic/scan.py: Add ability to remediate
In the case of some scanners, they generate a script to remediate the
images|containers they have scanned.  We needed to provide a hook
for this ability. We now read the scanner's configuration file looking
for a "remediation_script" key, whose value should be a fq path to the
remediation script.  The remediation script should be delivered via
the scanning image via atomic install.

As of now, we pass the id of the scanned object and its results directory
as named arguments to the remediation script.

Signed-off-by: baude <bbaude@redhat.com>

Closes: #1090
Approved by: baude
2017-09-22 15:53:57 +00:00

539 lines
23 KiB
Python

from . import Atomic
from . import mount
from . import util
from datetime import datetime
import os
from shutil import rmtree
import json
import sys
from Atomic.backendutils import BackendUtils
def cli(subparser):
# atomic scan
scanners = util.get_scanners()
scanp = subparser.add_parser(
"scan", help=_("scan an image or container for CVEs or configuration compliance"),
epilog="atomic scan <input> scans a container or image for CVEs or configuration compliance")
scanp.set_defaults(_class=Scan, func='scan')
scan_group = scanp.add_mutually_exclusive_group()
scanp.add_argument("scan_targets", nargs='*', help=_("container image"))
scanp.add_argument("--scanner", choices=[x['scanner_name'] for x in scanners], default=None, help=_("define the intended scanner"))
scanp.add_argument("--scan_type", default=None, help=_("define the intended scan type"))
scanp.add_argument("--list", action='store_true', default=False, help=_("List available scanners"))
scanp.add_argument("--remediate", action='store_true', default=False, help=_("Perform remediation of scanned object"))
scanp.add_argument("--scanner_args", default=None, help=_("Specify arguments to be passed to the scanner"))
disp_group = scanp.add_mutually_exclusive_group()
disp_group.add_argument("--verbose", action='store_true', default=False, help=_("Show more output from scanning container"))
disp_group.add_argument("--json", action='store_true', default=False, help=_("Output results in JSON format"))
scan_group.add_argument("--rootfs", nargs='?', default=[], action='append', help=_("Rootfs path to scan"))
scan_group.add_argument("--all", default=False, action='store_true', help=_("scan all images (excluding intermediate layers) and containers"))
scan_group.add_argument("--images", default=False, action='store_true', help=_("scan all images (excluding intermediate layers)"))
scan_group.add_argument("--containers", default=False, action='store_true', help=_("scan all containers"))
class Scan(Atomic):
"""
Scan class that can generically work any scanner
"""
def __init__(self):
super(Scan, self).__init__()
self.scan_dir = None
self.scan_list = []
self.rootfs_paths = []
self.cur_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
self.chroot_dir = '/run/atomic/{}'.format(self.cur_time)
self.results_dir = None
self.scan_content = {}
self.atomic_config = util.get_atomic_config()
self.scanners = util.get_scanners()
self.debug = False
self.rootfs_mappings = {}
self.scanner = None
self.mount_paths = {}
def get_scanners_list(self):
return self.scanners
def scan(self):
def get_scan_info(scanner, scan_type):
for i in self.scanners:
if i['scanner_name'] == scanner:
for x in i['scans']:
if x['name'] == scan_type:
return i['image_name'], x['args'], i.get('custom_args')
def set_scanner():
# If no scanners are defined
if len(self.scanners) == 0:
raise ValueError("No scanners are configured for your system.")
if self.args.scanner:
self.scanner = self.args.scanner
elif len(self.scanners) == 1:
self.scanner = self.scanners[0]['scanner_name']
# Use default
else:
self.scanner = self.atomic_config['default_scanner']
if self.scanner is None:
raise ValueError("You must specify a scanner (--scanner) or set a default in /etc/atomic.conf")
# Check if the scanner name is valid
if self.scanner not in [x['scanner_name'] for x in self.scanners]:
raise ValueError("Unknown scanner '{}' defined in {}".format(self.scanner, util.ATOMIC_CONF))
def get_additional_args():
if self.args.scanner_args is None:
return []
additional_args = []
for pair in self.args.scanner_args.split(","):
key, _, value = pair.partition("=")
additional_args.append("--" + key.strip())
if value:
additional_args.append(value.strip())
return additional_args
if self.args.debug:
self.debug = True
# Set debug bool
self.set_debug()
# Show list of scanners and exit
if self.args.list:
self.print_scan_list()
set_scanner()
scan_type = self.get_scan_type()
# Load the atomic config file and check scanner settings
yaml_error = "The image name or scanner arguments for '{}' is not " \
"defined in /etc/atomic.conf".format(self.scanner)
scanner_image_name, scanner_args, custom_args = get_scan_info(self.scanner, scan_type)
if not isinstance(scanner_args, list):
raise ValueError("The scanner arguments for {} must be in list"
" ([]) form.".format(self.scanner))
if None in [scanner_image_name, scanner_args]:
raise ValueError(yaml_error)
self.results_dir = os.path.join(self.results, self.scanner, self.cur_time)
# Get remediation script
if self.args.remediate:
for i in self.scanners:
remediation_script = i.get("remediation_script", None)
break
if remediation_script is None:
raise ValueError("{} has no remediation script.".format(self.scanner))
# Create the input directory
if not os.path.exists(self.chroot_dir):
os.makedirs(self.chroot_dir)
if self.debug:
util.write_out("Created {}".format(self.chroot_dir))
if len(self.args.rootfs) == 0:
self.scan_list = self._get_scan_list()
security_args = []
else:
# Check to make sure all the chroots provided are legit
for _path in self.args.rootfs:
if not os.path.exists(_path):
raise ValueError("The path {} does not exist".format(_path))
self.setup_rootfs_dirs()
# Have to disable SELinux checking of scanning not docker images
security_args = [ "--security-opt", "label:disable" ]
# Create the output directory
os.makedirs(self.results_dir)
docker_args = ['docker', 'run', '-t', '--rm', '-v', '/etc/localtime:/etc/localtime',
'-v', '{}:{}'.format(self.chroot_dir, '/scanin'), '-v',
'{}:{}:rw,Z'.format(self.results_dir, '/scanout')]
# Assemble the cmd line for the scan
scan_cmd = docker_args + security_args
if custom_args is not None:
scan_cmd = scan_cmd + custom_args
scan_cmd = scan_cmd + [scanner_image_name] + scanner_args + get_additional_args()
scan_cmd = self.sub_env_strings(" ".join(scan_cmd))
# Show the command being run
if self.useTTY and not self.args.json:
util.write_out(scan_cmd)
# Show stdout from container if --debug or --verbose
stdout = None if (self.args.verbose or self.args.debug) else open(os.devnull, 'w')
try:
if len(self.args.rootfs) == 0:
# mount all the rootfs
self._mount_scan_rootfs(self.scan_list)
if self.debug:
util.write_out("Creating the output dir at {}".format(self.results_dir))
# do the scan
util.check_call(scan_cmd, stdout=stdout, env=self.cmd_env())
# output results
if self.useTTY:
self.output_results()
# record environment
self.record_environment()
self.write_persistent_data()
finally:
# unmount all the rootfs
self._unmount_rootfs_in_dir()
if not self.useTTY:
return (json.dumps(self.get_scan_data()))
for json_file in self._get_json_files():
json_results = json.load(open(json_file))
if json_results['Successful'].upper() != 'TRUE':
return 1
# Perform remediation
if self.args.remediate:
for i in self.scan_list:
self.remediate(remediation_script, i.id, self.results_dir)
return 0
def _get_scan_list(self):
beu = BackendUtils()
if self.args.images:
scan_list = beu.get_images()
elif self.args.containers:
scan_list = beu.get_containers()
elif self.args.all:
scan_list = beu.get_images() + beu.get_containers()
else:
scan_list = []
for scan_target in self.args.scan_targets:
try:
# get_backend_and_container throws a ValueError when it cannot find anything
_, scan_obj = beu.get_backend_and_container_obj(scan_target)
except ValueError:
try:
# get_backend_and_image throws a ValueError when it cannot find anything
_, scan_obj = beu.get_backend_and_image_obj(scan_target)
except ValueError:
raise ValueError("Unable to locate the container or image '{}' locally. Check the "
"input name for typos or pull the image first.".format(scan_target))
scan_list.append(scan_obj)
return scan_list
def _get_roots_path_from_bind_name(self, in_bind_name):
for _path, bind_path in self.rootfs_mappings.items():
if bind_path == os.path.basename(os.path.split(in_bind_name)[0]):
return _path
def get_scan_data(self):
results = []
json_files = self._get_json_files()
for json_file in json_files:
results.append(json.load(open(json_file)))
return results
def _mount_scan_rootfs(self, scan_list):
for docker_object in scan_list:
mount_path = os.path.join(self.chroot_dir, docker_object.id.replace("/", "_"))
os.mkdir(mount_path)
if self.debug:
util.write_out("Created {}".format(mount_path))
new_mount_path = self.mount(mountpoint=mount_path, image=docker_object.id)
if new_mount_path != mount_path:
rmtree(mount_path)
os.symlink(new_mount_path, mount_path)
self.mount_paths[new_mount_path.rstrip('/')] = docker_object.id
if self.debug:
util.write_out("Mounted {} to {}".format(docker_object.id, mount_path))
def _unmount_rootfs_in_dir(self):
for rootfs_dir in self.get_rootfs_paths():
if len(self.args.rootfs) == 0:
try:
self.unmount(rootfs_dir)
except (ValueError, mount.MountError):
pass
else:
# Clean up bind mounts if the chroot feature is used
mcmd = ['umount', rootfs_dir]
util.check_call(mcmd)
# Clean up temporary containers
if not self.debug:
# Remove the temporary container dirs
rmtree(rootfs_dir)
else:
util.write_out("Unmounted {}".format(rootfs_dir))
if not self.debug:
rmtree(self.chroot_dir)
def get_rootfs_paths(self):
"""
Returns the list of rootfs paths (not fully qualified)
:return: list
"""
return self.mount_paths.keys()
def output_results(self):
"""
Write results of the scan to stdout
:return: None
"""
json_files = self._get_json_files()
for json_file in json_files:
json_results = json.load(open(json_file))
if self.args.json:
util.output_json(json_results)
else:
uuid = next(self.mount_paths[x] for x in self.mount_paths.keys() if
os.path.basename(json_results['UUID'].rstrip('/')) in x) if len(self.args.rootfs) == 0 \
else self._get_roots_path_from_bind_name(json_file)
name1 = uuid if len(self.args.rootfs) > 1 else self._get_input_name_for_id(uuid)
if len(self.args.rootfs) == 0 and not self._is_iid(uuid):
name2 = uuid[:15]
else:
# Containers do not have repo names
if len(self.args.rootfs) == 0 and uuid not in [x['Id'] for x in self.get_containers()]:
name2 = self._get_repo_names(uuid)
else:
name2 = uuid[:15]
util.write_out("\n{} ({})\n".format(name1, name2))
if json_results['Successful'].upper() == "TRUE":
if 'Custom' in json_results:
self._output_custom(json_results['Custom'], 3)
if 'Vulnerabilities' in json_results and len(json_results['Vulnerabilities']) > 0:
util.write_out("The following issues were found:\n")
for vul in json_results['Vulnerabilities']:
if 'Title' in vul:
util.write_out("{}{}".format(' ' * 5, vul['Title']))
if 'Severity' in vul:
util.write_out("{}Severity: {}".format(' ' * 5, vul['Severity']))
if 'Custom' in vul.keys() and len(vul['Custom']) > 0:
custom_field = vul['Custom']
self._output_custom(custom_field, 7)
util.write_out("")
elif 'Results' in json_results and len(json_results['Results']) > 0:
util.write_out("The following results were found:\n")
for result in json_results['Results']:
if 'Custom' in result.keys() and len(result['Custom']) > 0:
custom_field = result['Custom']
self._output_custom(custom_field, 7)
util.write_out("")
else:
util.write_out("{} passed the scan".format(self._get_input_name_for_id(uuid)))
else:
util.write_out("{}{} is not supported for this scan."
.format(' ' * 5, self._get_input_name_for_id(uuid)))
if not self.args.json:
util.write_out("\nFiles associated with this scan are in {}.\n".format(self.results_dir))
def _output_custom(self, value, indent):
space = ' ' * indent
next_indent = indent + 2
if isinstance(value, dict):
for x in value:
if isinstance(value[x], dict):
util.write_out("{}{}:".format(space, x))
self._output_custom(value[x], next_indent)
elif isinstance(value[x], list):
util.write_out("{}{}:".format(space, x))
self._output_custom(value[x], next_indent)
else:
util.write_out("{}{}: {}".format(space, x, value[x]))
elif isinstance(value, list):
for x in value:
if isinstance(x, dict):
self._output_custom(x, next_indent)
elif isinstance(x, list):
self._output_custom(x, next_indent)
else:
util.write_out('{}{}'.format(space, x))
def _get_json_files(self):
json_files = []
for files in os.walk(self.results_dir):
for jfile in files[2]:
if jfile == 'json':
json_files.append(os.path.join(files[0], jfile))
return json_files
def _get_object_by_id(self, iid):
scan_obj = next((x for x in self.scan_list if x.id == iid))
return scan_obj
def _get_input_name_for_id(self, iid):
if len(self.args.rootfs) > 0:
return iid
else:
return self._get_object_by_id(iid).input_name
def _is_iid(self, input_name):
scan_obj = next((x for x in self.scan_list if x.id == input_name))
if scan_obj.id == scan_obj.input_name:
return True
return False
def _get_repo_names(self, docker_id):
_match = next((x for x in self.get_images() if x['Id'] == docker_id), None)
if _match is None:
_match = next((x for x in self.get_containers() if x['Id'] == docker_id), None)
if'<none>' in _match['RepoTags'][0]:
return docker_id[:15]
else:
return ', '.join(_match['RepoTags'])
def record_environment(self):
"""
Grabs a "snapshot" the results of docker info and inspect results for
all images and containers. Write it to results_dir/environment.json
:return: None
"""
environment = {}
environment['info'] = self.d.info()
environment['images'] = []
for iid in [x['Id'] for x in self.get_images()]:
environment['images'].append(self._inspect_image(image=iid))
environment['containers'] = []
for cid in [x['Id'] for x in self.get_containers()]:
environment['containers'].append(self._inspect_container(name=cid))
with open(os.path.join(self.results_dir, 'environment.json'), 'w') as f:
json.dump(environment, f, indent=4, separators=(',', ': '))
def get_scan_type(self):
default_scan_type = None
scan_types = []
for i in self.scanners:
if i['scanner_name'] == self.scanner:
default_scan_type = i.get('default_scan')
if self.args.scan_type is None and default_scan_type is None:
raise ValueError("No scan type was given and there is no "
"default scan type defined for '{}'".format(self.args.scanner))
for x in i['scans']:
scan_types.append(x['name'])
if default_scan_type not in scan_types:
raise ValueError("The default scan type '{}' is not defined as a valid scan type for "
"the scanner '{}'".format(default_scan_type, self.scanner))
if self.args.scan_type is None:
return default_scan_type
elif self.args.scan_type in scan_types:
return self.args.scan_type
else:
raise ValueError("Unable to find the scan type '{}' for '{}'.".
format(self.args.scan_type, self.scanner))
def print_scan_list(self):
if len(self.scanners) == 0:
util.write_out("There are no scanners configured for this system.")
sys.exit(0)
default_scanner = self.atomic_config['default_scanner']
if default_scanner is None:
default_scanner = ''
for scanner in self.scanners:
scanner_name = scanner['scanner_name']
df = '* ' if scanner_name == default_scanner else ''
default_scan_type = scanner.get('default_scan')
if default_scan_type is None:
raise ValueError("Invalid configuration file: At least one scan type must be "
"declared as the default for {}.".format(scanner_name))
util.write_out("Scanner: {} {}".format(scanner_name, df))
util.write_out("{}Image Name: {}".format(" " * 2, scanner['image_name']))
for scan_type in scanner['scans']:
df = '* ' if default_scan_type == scan_type['name'] else ''
util.write_out("{}Scan type: {} {}".format(" " * 5, scan_type['name'], df))
util.write_out("{}Description: {}\n".format(" " * 5, scan_type['description']))
util.write_out("\n* denotes defaults")
sys.exit(0)
def mount(self, mountpoint, image):
m = mount.Mount()
m.set_args(self.args)
m.mountpoint = mountpoint
m.image = image
m.shared = True
m.mount(best_mountpoint_for_storage=True)
return m.mountpoint
def unmount(self, mountpoint):
m = mount.Mount()
m.set_args(self.args)
m.mountpoint = mountpoint
m.unmount()
def setup_rootfs_dirs(self):
for _dir in self.args.rootfs:
bind_dir = _dir.replace("/", "_")
chroot_scan_dir = os.path.join(self.chroot_dir, bind_dir)
os.mkdir(chroot_scan_dir)
mcmd = ['mount', '-o', 'ro,bind', _dir, chroot_scan_dir]
self.mount_paths[chroot_scan_dir] = chroot_scan_dir
util.check_call(mcmd)
self.rootfs_mappings[_dir] = bind_dir
def get_persist_data(self, json_results, json_file):
persist = {}
if json_results.get('Successful').upper() == "FALSE":
return {}
persist['UUID'] = json_results['UUID'].replace("/scanin/", "")
persist['Scanner'] = json_results['Scanner']
persist['Time'] = json_results['Time']
persist['Scan Type'] = json_results['Scan Type']
if 'Vulnerabilities' in json_results and len(json_results['Vulnerabilities']) > 0:
persist["Vulnerable"] = True
elif json_results.get('Vulnerable') is True:
persist["Vulnerable"] = True
else:
persist["Vulnerable"] = False
persist['json_file'] = json_file
return persist
def write_persistent_data(self):
new_data = dict()
json_files = self._get_json_files()
for json_file in json_files:
json_results = json.load(open(json_file))
uuid = os.path.basename(json_results['UUID']) if len(self.args.rootfs) == 0 \
else self._get_roots_path_from_bind_name(json_file)
new_data[uuid] = self.get_persist_data(json_results, json_file)
summary_file = os.path.join(self.results, "scan_summary.json")
if not os.path.exists(summary_file):
persistent_data = new_data
else:
persistent_data = json.loads(open(summary_file, "r").read())
iids = [x['Id'] for x in self.get_images()]
cids = [x['Id'] for x in self.get_containers()]
for uuid in new_data.keys():
if len(new_data[uuid]) > 0:
persistent_data[uuid] = new_data[uuid]
# Clean up old data
for uuid in list(persistent_data):
if uuid not in iids and uuid not in cids:
del persistent_data[uuid]
with open(summary_file, 'w') as f:
json.dump(persistent_data, f, indent=4)
def remediate(self, script, iid, results_dir):
util.check_call([sys.executable, script, '--id', iid, '--results_dir', results_dir])