1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-06 21:45:24 +01:00
Files
atomic/Atomic/images.py
Brent Baude 802836f43d WIP refactor
2016-11-07 13:15:53 -06:00

275 lines
12 KiB
Python

from Atomic import Atomic
from Atomic import util
from Atomic import info
from Atomic import verify
from Atomic import help as Help
from Atomic.mount import Mount
from Atomic.delete import Delete
import os
import sys
import json
import math
import shutil
import tempfile
import time
import argparse
def convert_size(size):
if size > 0:
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size, 1000)))
p = math.pow(1000, i)
s = round(size/p, 2) # pylint: disable=round-builtin,old-division
if s > 0:
return '%s %s' % (s, size_name[i])
return '0B'
def cli(subparser):
# atomic images
imagesp = subparser.add_parser("images",
help=_("operate on images"))
images_subparser = imagesp.add_subparsers(title='images subcommands',
description="operate on images",
help='additional help')
# atomic images delete
delete_parser = images_subparser.add_parser("delete",
help=_("mark image for deletion"),
epilog="Marks image. registry garbage-collection "
"when invoked will recover used disk space")
delete_parser.set_defaults(_class=Delete, func='delete_image')
deletegroup = delete_parser.add_mutually_exclusive_group()
deletegroup.add_argument("-f", "--force", default=False, dest="force",
action="store_true",
help=_("Force removal of local images, even if containers based on it exist. Default is False"))
deletegroup.add_argument("--remote", default=False, dest="remote",
action="store_true",
help=_("Delete image from remote repository"))
deletegroup.add_argument("--storage", default="", dest="storage",
help=_("Specify the storage from which to delete the image from. "
"If not specified and there are images with the same name in "
"different storages, you will be propted to specify."))
delete_parser.add_argument("delete_targets", nargs=argparse.ONE_OR_MORE,
help=_("container image(s)"))
if util.gomtree_available():
generate_parser = images_subparser.add_parser("generate",
help=_("generate image 'manifests' if missing"))
generate_parser.set_defaults(_class=Images, func='generate_validation_manifest')
Help.cli(images_subparser)
info.cli(images_subparser)
# atomic images list
list_parser = images_subparser.add_parser("list",
help=_("list container images on your system"),
epilog="atomic images by default will list all installed "
"container images on your system.")
list_parser.set_defaults(_class=Images, func='display_all_image_info')
list_parser.add_argument("-a", "--all", dest="all", default=False,
action="store_true",
help=_("Show all images, including intermediate images"))
list_parser.add_argument("-f", "--filter", dest="filter", metavar="FILTER",
action="append",
help=_("Filter output based on VARIABLE=VALUE format"))
list_parser.add_argument("-n", "--noheading", dest="heading", default=True,
action="store_false",
help=_("do not print heading when listing the images"))
list_parser.add_argument("--no-trunc", dest="truncate", default=True,
action="store_false",
help=_("Don't truncate output"))
list_parser.add_argument("-q", "--quiet", dest="quiet", default=False,
action="store_true",
help=_("Only display image IDs"))
list_parser.add_argument("--json", action='store_true',dest="json", default=False,
help=_("print in a machine parseable form"))
prune_parser = images_subparser.add_parser("prune",
help=_("delete unused 'dangling' images"),
epilog="Using the prune command, "
"will free up disk space deleting unused "
"'dangling' images")
prune_parser.set_defaults(_class=Delete, func='prune_images')
verify.cli(images_subparser)
info.cli_version(images_subparser)
class Images(Atomic):
def __init__(self):
super(Images, self).__init__()
def display_all_image_info(self):
def get_col_lengths(_images):
'''
Determine the max length of the repository and tag names
:param _images:
:return: a set with len of repository and tag
If there are no images, return 1, 1
'''
repo_tags = [[i["repo"], i["tag"]] for i in _images]
if repo_tags:
return max([len(x[0]) for x in repo_tags]) + 2,\
max([len(x[1]) for x in repo_tags]) + 2
else:
return 1, 1
_images = self.images()
if self.args.json:
json.dump(_images, sys.stdout)
return
if len(_images) >= 0:
_max_repo, _max_tag = get_col_lengths(_images)
if self.args.truncate:
_max_id = 14
else:
_max_id = 65
col_out = "{0:2} {1:" + str(_max_repo) + "} {2:" + str(_max_tag) + \
"} {3:" + str(_max_id) + "} {4:18} {5:14} {6:10}"
if self.args.heading and not self.args.quiet:
util.write_out(col_out.format(" ",
"REPOSITORY",
"TAG",
"IMAGE ID",
"CREATED",
"VIRTUAL SIZE",
"TYPE"))
for image in _images:
if self.args.filter:
image_info = {"repo" : image['repo'], "tag" : image['tag'], "id" : image['id'],
"created" : image['created'], "size" : image['virtual_size'], "type" : image['type'],
"dangling": "{}".format(image['is_dangling'])}
if not self._filter_include_image(image_info):
continue
if self.args.quiet:
util.write_out(image['id'])
else:
indicator = ""
if image["is_dangling"]:
indicator += "*"
elif image["used_image"]:
indicator += ">"
if image["vulnerable"]:
space = " " if len(indicator) < 1 else ""
if util.is_python2:
indicator = indicator + self.skull + space
else:
indicator = indicator + str(self.skull, "utf-8") + space
util.write_out(col_out.format(indicator, image['repo'], image['tag'], image['id'], image['created'], image['virtual_size'], image['type']))
util.write_out("")
return
def images(self):
_images = self.get_images(get_all=self.args.all)
all_image_info = []
if len(_images) >= 0:
vuln_ids = self.get_vulnerable_ids()
all_vuln_info = json.loads(self.get_all_vulnerable_info())
used_image_ids = [x['ImageID'] for x in self.get_containers()]
for image in _images:
image_dict = dict()
if not image["RepoTags"]:
continue
if ':' in image["RepoTags"][0]:
repo, tag = image["RepoTags"][0].rsplit(":", 1)
else:
repo, tag = image["RepoTags"][0], ""
if "Created" in image:
created = time.strftime("%F %H:%M", time.localtime(image["Created"]))
else:
created = ""
if "VirtualSize" in image:
virtual_size = convert_size(image["VirtualSize"])
else:
virtual_size = ""
image_dict["is_dangling"] = self.is_dangling(repo)
image_dict["used_image"] = image["Id"] in used_image_ids
image_dict["vulnerable"] = image["Id"] in vuln_ids
image_id = image["Id"][:12] if self.args.truncate else image["Id"]
image_type = image['ImageType']
image_dict["repo"] = repo
image_dict["tag"] = tag
image_dict["id"] = image_id
image_dict["created"] = created
image_dict["virtual_size"] = virtual_size
image_dict["type"] = image_type
image_dict["image_id"] = image["ImageId"]
if image_dict["vulnerable"]:
image_dict["vuln_info"] = all_vuln_info[image["Id"]]
else:
image_dict["vuln_info"] = dict()
all_image_info.append(image_dict)
return all_image_info
def generate_validation_manifest(self):
"""
Generates a gomtree validation manifest for a non-system image and stores it in
ATOMIC_VAR_LIB
:param:
:return: None
"""
_images = self.get_images(get_all=True)
for image in _images:
atomic_var_lib = util.ATOMIC_VAR_LIB
if not image["RepoTags"]:
continue
iid = image["RepoTags"][0]
if image["ImageType"] == "system":
continue
if iid == "<none>:<none>" or iid == "<none>":
continue
if os.path.exists(os.path.join(atomic_var_lib, "gomtree-manifests/%s.mtree" % iid)):
continue
manifestname = os.path.join(atomic_var_lib, "gomtree-manifests/%s.mtree" % iid)
dname = os.path.dirname(manifestname)
if not os.path.exists(dname):
os.makedirs(dname)
tmpdir = tempfile.mkdtemp()
m = Mount()
m.args = []
m.image = iid
m.mountpoint = tmpdir
m.mount()
r = util.generate_validation_manifest(img_rootfs=tmpdir, keywords="type,uid,gid,mode,size,sha256digest")
m.unmount()
with open(manifestname,"w",0) as f:
f.write(r.stdout)
shutil.rmtree(tmpdir)
def _filter_include_image(self, image_info):
filterables = ["repo", "tag", "id", "created", "size", "type", "dangling"]
for i in self.args.filter:
var, value = str(i).split("=")
var = var.lower()
if var == "repository":
var = "repo"
if var == "image":
var = "id"
if var not in filterables: # Default to allowing all images through for non-existing filterable
continue
if value not in image_info[var].lower():
return False
return True