mirror of
https://github.com/projectatomic/atomic.git
synced 2026-02-07 06:44:52 +01:00
Improve scan out and common input parsing
When an atomic scan is called without --detail, we print the name
of the container or image if that is provided. Prior, we only
printed the first 12 characters of the image or container ID. We
also no dynamically size the left-most column of the output, which
can accomodate various lengths of the image or container names.
Created a common input parser for when the user input. So if a
user inputs a partial name, we attempt to discern the actual
container or image being referenced. The were three definitions
added; _is_container, _is_image, and a wrapper function for both
called get_input_id. If either function can make a match to the
input identifier, it returns the image or container ID.
And lastly, to reduce the number of calls (queries) into docker,
there are now stub functions that should be called instead.
These stub functions are self-explainatory by their names, which
are: get_images, get_containers, and get_active_containers.
This commit is contained in:
120
Atomic/atomic.py
120
Atomic/atomic.py
@@ -96,6 +96,7 @@ class Atomic(object):
|
||||
self._images = []
|
||||
self.containers = False
|
||||
self.images_cache = None
|
||||
self.active_containers = False
|
||||
|
||||
def writeOut(self, output, lf="\n"):
|
||||
sys.stdout.flush()
|
||||
@@ -334,9 +335,11 @@ class Atomic(object):
|
||||
str(e))
|
||||
return None
|
||||
|
||||
def _inspect_container(self):
|
||||
def _inspect_container(self, name=None):
|
||||
if name is None:
|
||||
name = self.name
|
||||
try:
|
||||
return self.d.inspect_container(self.name)
|
||||
return self.d.inspect_container(name)
|
||||
except docker.errors.APIError:
|
||||
pass
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
@@ -433,6 +436,7 @@ class Atomic(object):
|
||||
BUS_NAME = "org.OpenSCAP.daemon"
|
||||
OBJECT_PATH = "/OpenSCAP/daemon"
|
||||
INTERFACE = "org.OpenSCAP.daemon.Interface"
|
||||
input_resolve = {}
|
||||
|
||||
if self.args.images:
|
||||
scan_list = self._get_all_image_ids()
|
||||
@@ -445,7 +449,9 @@ class Atomic(object):
|
||||
else:
|
||||
scan_list = []
|
||||
for scan_input in self.args.scan_targets:
|
||||
scan_list.append(self.get_input_id(scan_input))
|
||||
docker_id = self.get_input_id(scan_input)
|
||||
input_resolve[docker_id] = scan_input
|
||||
scan_list.append(docker_id)
|
||||
util.writeOut("\nScanning...\n")
|
||||
bus = dbus.SystemBus()
|
||||
try:
|
||||
@@ -464,19 +470,20 @@ class Atomic(object):
|
||||
|
||||
else:
|
||||
if not self.args.detail:
|
||||
clean = util.print_scan_summary(scan_return)
|
||||
clean = util.print_scan_summary(scan_return, input_resolve)
|
||||
else:
|
||||
clean = util.print_detail_scan_summary(scan_return)
|
||||
clean = util.print_detail_scan_summary(scan_return,
|
||||
input_resolve)
|
||||
if not clean:
|
||||
sys.exit(1)
|
||||
|
||||
def stop(self):
|
||||
self.inspect = self._inspect_container()
|
||||
if self.inspect is None:
|
||||
self.inspect = self._inspect_image()
|
||||
if self.inspect is None:
|
||||
raise ValueError("Container/Image '%s' does not exists" %
|
||||
self.name)
|
||||
try:
|
||||
cid = self._is_container(self.name, active=True)
|
||||
self.name = cid
|
||||
except AtomicError as error:
|
||||
util.writeOut(error)
|
||||
sys.exit(1)
|
||||
|
||||
args = self._get_args("STOP")
|
||||
if args:
|
||||
@@ -877,34 +884,55 @@ class Atomic(object):
|
||||
sys.stderr.write("\nUnable to communicate with docker daemon\n")
|
||||
sys.exit(1)
|
||||
|
||||
def _is_container(self, identifier):
|
||||
def _is_container(self, identifier, active=False):
|
||||
'''
|
||||
Checks is the identifier is a container ID or container name. If
|
||||
it is, returns the full container ID. Else it will return an
|
||||
AtomicError
|
||||
AtomicError. Takes optional keyword active, which signifies
|
||||
that you want to only deal with active containers.
|
||||
'''
|
||||
if active:
|
||||
active_cons = self.get_active_containers()
|
||||
active_con_ids = [x['Id'] for x in active_cons]
|
||||
cons = active_cons
|
||||
else:
|
||||
cons = self.get_containers()
|
||||
|
||||
# First check if the container exists by whatever
|
||||
# identifier was given
|
||||
self.inspect = self._inspect_container(name=identifier)
|
||||
if self.inspect is not None:
|
||||
# Inspect found a match
|
||||
if not active:
|
||||
return self.inspect['Id']
|
||||
else:
|
||||
# Check if the container is active
|
||||
if self.inspect['Id'] in active_con_ids:
|
||||
return self.inspect['Id']
|
||||
|
||||
err_append = "Refine your search to narrow results."
|
||||
cons = self.get_containers()
|
||||
cids = [x['Id'] for x in cons]
|
||||
con_index = [i for i, j in enumerate(cids) if j.startswith(identifier)]
|
||||
|
||||
if len(con_index) > 0:
|
||||
if len(con_index) > 1:
|
||||
CIDS = []
|
||||
for index in con_index:
|
||||
CIDS.append(cids[index])
|
||||
raise ValueError("Found multiple container IDs ({0}) that "
|
||||
" might match '{1}'. {2}"
|
||||
.format(" ".join(CIDS), identifier,
|
||||
err_append))
|
||||
return cids[con_index[0]]
|
||||
|
||||
# The identifier might be a partial name?
|
||||
con_ids = []
|
||||
for con in cons:
|
||||
if "/{0}".format(identifier) in con['Names']:
|
||||
return con['Id']
|
||||
for name in con['Names']:
|
||||
if name.startswith("/{0}".format(identifier)):
|
||||
con_ids.append(con['Id'])
|
||||
break
|
||||
|
||||
# No dice
|
||||
raise AtomicError
|
||||
# More than one match was found
|
||||
if len(con_ids) > 1:
|
||||
raise AtomicError("Multiple matches were found for {0}. {1}"
|
||||
.format(identifier, err_append))
|
||||
# No matches were found
|
||||
elif len(con_ids) < 1:
|
||||
active_err = '' if not active else 'active '
|
||||
error_msg = "Unable to find {0}container '{1}'".format(active_err,
|
||||
identifier)
|
||||
raise AtomicError(error_msg)
|
||||
else:
|
||||
self.inspect = self._inspect_container(con_ids[0])
|
||||
return con_ids[0]
|
||||
|
||||
def _is_image(self, identifier):
|
||||
'''
|
||||
@@ -914,20 +942,12 @@ class Atomic(object):
|
||||
'''
|
||||
err_append = "Refine your search to narrow results."
|
||||
image_info = self.get_images()
|
||||
iids = [x['Id'] for x in image_info]
|
||||
image_index = [i for i, j in enumerate(iids)
|
||||
if j.startswith(identifier)]
|
||||
|
||||
if len(image_index) > 0:
|
||||
if len(image_index) > 1:
|
||||
IDS = []
|
||||
for index in image_index:
|
||||
IDS.append(iids[index])
|
||||
raise ValueError("Found multiple image IDs ({0}) that might "
|
||||
"match '{1}'. {2}".format(" ".join(IDS),
|
||||
identifier,
|
||||
err_append))
|
||||
return iids[image_index[0]]
|
||||
inspect = self._inspect_image(image=identifier)
|
||||
|
||||
if inspect is not None:
|
||||
self.inspect = inspect
|
||||
return inspect['Id']
|
||||
|
||||
name_search = util.image_by_name(identifier, images=image_info)
|
||||
if len(name_search) > 0:
|
||||
@@ -943,7 +963,6 @@ class Atomic(object):
|
||||
.format(identifier, "\n ".join(repo_tags),
|
||||
err_append))
|
||||
return name_search[0]['Id']
|
||||
|
||||
# No dice
|
||||
raise AtomicError
|
||||
|
||||
@@ -979,8 +998,19 @@ class Atomic(object):
|
||||
'''
|
||||
if not self.containers:
|
||||
self.containers = self.d.containers(all=True)
|
||||
|
||||
return self.containers
|
||||
|
||||
def get_active_containers(self):
|
||||
'''
|
||||
Wrapper function for obtaining active containers. Should be used
|
||||
instead of direct queries to docker
|
||||
'''
|
||||
if not self.active_containers:
|
||||
self.active_containers = self.d.containers(all=False)
|
||||
|
||||
return self.active_containers
|
||||
|
||||
|
||||
class AtomicError(Exception):
|
||||
pass
|
||||
|
||||
@@ -33,6 +33,7 @@ def image_by_name(img_name, images=None):
|
||||
return reg, repo, tag
|
||||
|
||||
i_reg, i_rep, i_tag = _decompose(img_name)
|
||||
|
||||
# Correct for bash-style matching expressions.
|
||||
if not i_reg:
|
||||
i_reg = '*'
|
||||
@@ -53,6 +54,11 @@ def image_by_name(img_name, images=None):
|
||||
and matches(tag, i_tag):
|
||||
valid_images.append(i)
|
||||
break
|
||||
# Some repo after decompose end up with the img_name
|
||||
# at the end. i.e. rhel7/rsyslog
|
||||
if rep.endswith(img_name):
|
||||
valid_images.append(i)
|
||||
break
|
||||
return valid_images
|
||||
|
||||
|
||||
@@ -87,23 +93,52 @@ def output_json(json_data):
|
||||
writeOut(json.dumps(json_data, indent=4, separators=(',', ': ')))
|
||||
|
||||
|
||||
def print_scan_summary(json_data):
|
||||
def print_scan_summary(json_data, names=None):
|
||||
'''
|
||||
Print a summary of the data returned from a
|
||||
CVE scan.
|
||||
'''
|
||||
max_col_width = 50
|
||||
min_width = 15
|
||||
|
||||
def _max_width(data):
|
||||
max_name = 0
|
||||
for name in data:
|
||||
max_name = len(data[name]) if len(data[name]) > max_name \
|
||||
else max_name
|
||||
# If the max name length is less that max_width
|
||||
if max_name < min_width:
|
||||
max_name = min_width
|
||||
|
||||
# If the man name is greater than the max col leng
|
||||
# we wish to use
|
||||
if max_name > max_col_width:
|
||||
max_name = max_col_width
|
||||
|
||||
return max_name
|
||||
|
||||
clean = True
|
||||
template = "{0:15} {1:5} {2:5} {3:5} {4:5}"
|
||||
|
||||
if len(names) > 0:
|
||||
max_width = _max_width(names)
|
||||
template = "{0:" + str(max_width) + "} {1:5} {2:5} {3:5} {4:5}"
|
||||
sevs = ['critical', 'important', 'moderate', 'low']
|
||||
writeOut(template.format("Container/Image", "Cri", "Imp", "Med", "Low"))
|
||||
writeOut(template.format("---------------", "---", "---", "---", "---"))
|
||||
writeOut(template.format("-" * max_width, "---", "---", "---", "---"))
|
||||
res_summary = json_data['results_summary']
|
||||
for image in res_summary.keys():
|
||||
image_res = res_summary[image]
|
||||
if 'msg' in image_res.keys():
|
||||
tmp_tuple = (image_res['msg'], "", "", "", "")
|
||||
else:
|
||||
tmp_tuple = tuple([image[:12]] +
|
||||
if len(names) < 1:
|
||||
image_name = image[:max_width]
|
||||
else:
|
||||
image_name = names[image][-max_width:]
|
||||
if len(image_name) == max_col_width:
|
||||
image_name = '...' + image_name[-(len(image_name)-3):]
|
||||
|
||||
tmp_tuple = tuple([image_name] +
|
||||
[str(image_res[sev]) for sev in sevs])
|
||||
sev_results = [image_res[sev] for sev in
|
||||
sevs if image_res[sev] > 0]
|
||||
@@ -114,7 +149,7 @@ def print_scan_summary(json_data):
|
||||
return clean
|
||||
|
||||
|
||||
def print_detail_scan_summary(json_data):
|
||||
def print_detail_scan_summary(json_data, names=None):
|
||||
'''
|
||||
Print a detailed summary of the data returned from
|
||||
a CVE scan.
|
||||
@@ -129,8 +164,9 @@ def print_detail_scan_summary(json_data):
|
||||
writeOut("")
|
||||
writeOut(image[:12])
|
||||
if not image_res['isRHEL']:
|
||||
writeOut(image_template.format(
|
||||
"Result", "Not based on Red Hat Enterprise Linux"))
|
||||
writeOut(image_template.format("Result",
|
||||
"Not based on Red Hat"
|
||||
"Enterprise Linux"))
|
||||
continue
|
||||
else:
|
||||
writeOut(image_template.format("OS", image_res['os'].rstrip()))
|
||||
@@ -140,14 +176,14 @@ def print_detail_scan_summary(json_data):
|
||||
if sev in scan_results:
|
||||
clean = False
|
||||
writeOut(image_template.format(sev,
|
||||
str(scan_results[sev]['num'])))
|
||||
str(scan_results[sev]['num'])))
|
||||
for cve in scan_results[sev]['cves']:
|
||||
writeOut(cve_template.format("CVE", cve['cve_title']))
|
||||
writeOut(cve_template.format("CVE URL",
|
||||
cve['cve_ref_url']))
|
||||
cve['cve_ref_url']))
|
||||
writeOut(cve_template.format("RHSA ID",
|
||||
cve['rhsa_ref_id']))
|
||||
cve['rhsa_ref_id']))
|
||||
writeOut(cve_template.format("RHSA URL",
|
||||
cve['rhsa_ref_url']))
|
||||
cve['rhsa_ref_url']))
|
||||
writeOut("")
|
||||
return clean
|
||||
|
||||
Reference in New Issue
Block a user