1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-05 18:45:01 +01:00
Files
atomic/Atomic/verify.py
Brent Baude d67849a602 Use fully-qualified image name in info() and verify()
We now use the fully qualified image name (if not provided)
when dealing with atomic info and verify.  This is because
skopeo requires a fq image name to do remote inspection. As
an upside, it also makes atomic a little more user friendly
as well.

tests/integration/test_info.sh

Temporarly disabled a test due to an issue with skopeo
2016-03-21 09:05:28 -05:00

302 lines
12 KiB
Python

from . import util
from . import Atomic
import os
from docker.errors import NotFound
from operator import itemgetter
from .atomic import AtomicError
class Verify(Atomic):
DEBUG = False
def verify(self):
"""
Primary def for atomic verify
:return: None
"""
def fix_layers(layers):
"""
Takes the input of layers (get_layers()) and adds a key
and value for index. Also checks if the Tag value is not
blank but name is, puts tag into name.
:param layers:
:return: updated list of layers
"""
for layer in layers:
layer['index'] = layers.index(layer)
if layer['Tag'] is not "" and layer['Name'] is "":
layer['Name'] = layer['Tag']
return layers
# Check if the input is an image id associated with more than one
# repotag. If so, error out.
if self.is_iid(self.image):
self.get_fq_name(self._inspect_image())
# The input is not an image id
else:
try:
iid = self._is_image(self.image)
self.image = self.get_fq_name(self._inspect_image(iid))
except AtomicError:
self._no_such_image()
layers = fix_layers(self.get_layers())
if self.DEBUG:
for l in layers:
util.output_json(l)
uniq_names = list(set(x['Name'] for x in layers if x['Name'] != ''))
base_images = self.get_tagged_images(uniq_names, layers)
if self.DEBUG:
for b in base_images:
util.output_json(b)
if self.args.verbose:
self._print_verify_verbose(base_images, self.image)
# Do we have any layers that are not up to date?
elif not all([False for x in base_images if x['local_nvr'] != x['latest_nvr']]):
self._print_verify(base_images, self.image)
else:
# Check if any of the base_images do not have versioning information
versions = [x['local_nvr'] for x in base_images] + [x['latest_nvr'] for x in base_images]
if 'Version unavailable' in versions:
util.writeOut("\nWARNING: One or more of the image layers does not have")
util.writeOut("{}versioning information. Printing each image layer".format(" " * 9))
util.writeOut("{}verbosely.".format(" " * 9))
self._print_verify_verbose(base_images, self.image)
else:
# Didn't detect any version differences, do nothing
pass
def get_tagged_images(self, names, layers):
"""
Returns a dict with image names and its tag name.
:param names:
:param layers:
:return: list of sorted dicts (by index)
"""
base_images = []
for name in names:
remote = False
_match = next((x for x in layers if x['Name'] == name and x['Tag'] is not ''), None)
local_nvr = ""
if _match is not None:
if self.is_repo_from_local_registry(_match['Id']):
local_nvr = latest_version = _match['Version']
remote = True
else:
local_nvr = self.get_local_latest_version(name)
latest_version = self.get_latest_remote_version(_match['Tag'], name)
no_version = (latest_version == "")
iid = _match["Id"]
tag = _match['Tag']
_index = self.get_index(name, layers, iid)
else:
_index = self.get_index(name, layers)
layer = layers[_index]
if layer["Version"] is not "" and layer['Name'] is not "":
iid = layer['Id']
local_nvr = layer['Version']
no_version = False
image = self.d.inspect_image(iid)
labels = image.get('Config', []).get('Labels', [])
if 'Authoritative_Registry' in labels and 'Name' in labels:
tag = os.path.join(labels['Authoritative_Registry'], labels['Name'])
if self.is_repo_from_local_registry(iid):
# Inspect again by tag in case the image isnt the latest
try:
latest_version = self.d.inspect_image(tag)['Version']
except NotFound:
latest_version = layer['Version']
else:
# Do a remote inspect of images
latest_version = self.get_latest_remote_version(tag)
remote = True
else:
tag = "Unknown"
try:
latest_version = self.get_latest_remote_version(name)
except NotFound:
latest_version = "Unknown"
else:
iid = "Unknown"
latest_version = self.get_local_latest_version(name)
local_nvr = name
tag = "Unknown"
remote = False
no_version = True
base_images.append({'iid': iid,
'name': name,
'local_nvr': local_nvr,
'latest_nvr': latest_version,
'remote': remote,
'no_version': no_version,
'tag': tag,
'index': _index
})
return sorted(base_images, key=itemgetter('index'))
def is_repo_from_local_registry(self, input_repo):
"""
Determine is a given repo comes from a local-only registry
:param input_repo: str repository name
:return: bool
"""
# We need to check if there are other images with the
# the same IID because the input might not be fully
# qualified
iid = self.d.inspect_image(input_repo)['Id']
# Get a flat list of repo names associated with the iid
similar = [_repo for repos in [x['RepoTags'] for x in self.d.images()
if x['Id'] == iid] for _repo in repos]
results = []
for repo_ in similar:
(reg, repo, tag) = util._decompose(repo_)
results.append(self.is_registry_local(reg))
return False if not all(results) else True
def is_registry_local(self, registry):
"""
Determine if a given registry is local only
:param registry: str registry name
:return: bool
"""
return False if registry in self.get_registries() else True
def get_registries(self):
"""
Gets the names of the registries per /etc/sysconfig/conf
:return: a list of the registries
"""
registries = []
docker_info = self.d.info()
if 'RegistryConfig' not in docker_info:
raise ValueError("This docker version does not export its registries.")
for _index in docker_info['RegistryConfig']['IndexConfigs']:
registries.append(_index)
return registries
@staticmethod
def _print_verify(base_images, image):
"""
The standard non-verbose print for atomic verify
:param base_images:
:param image:
:return: None
"""
util.writeOut("\n{} contains images or layers that have updates:".format(image))
for _image in base_images:
local = _image['local_nvr']
latest = _image['latest_nvr']
if local != latest:
util.writeOut("\n{0} '{1}' has an update to '{2}'"
.format(" " * 5, local, latest))
util.writeOut("\n")
@staticmethod
def _print_verify_verbose(base_images, image):
"""
Implements a verbose printout of layers. Can be called with
atomic verify -v or if we detect some layer does not have
versioning information.
:param base_images:
:param image:
:return: None
"""
def max_name(base_images):
no_version_match = [len(x['tag']) + len(x['local_nvr']) + 5 for x in base_images if x['no_version']]
return max([len(x['local_nvr']) for x in base_images] + no_version_match)
_max = max_name(base_images)
_max_name = 30 if _max < 30 else _max
three_col = " {0:" + \
str(_max_name) + "} {1:" + \
str(_max_name) + "} {2:1}"
util.writeOut("\n{} contains the following images:\n".format(image))
util.writeOut(three_col.format("Local Version", "Latest Version", ""))
util.writeOut(three_col.format("-------------", "--------------", ""))
for _image in base_images:
local = _image['local_nvr']
latest = _image['latest_nvr']
if _image['no_version']:
tag = _image['tag']
local = "{0} ({1})".format(tag, local)
latest = "{0} ({1})".format(tag, latest)
remote = "*" if local != latest else ""
util.writeOut(three_col.format(local, latest, remote))
util.writeOut("\n * = version difference\n")
@staticmethod
def get_index(name, layers, _id="0"):
"""
Adds indexs to the base_image dict and returns sorted
:param name:
:param layers:
:param _id:
:return: sorted list of base_images
"""
try:
try:
_match = (x for x in layers if x["Id"] == _id).__next__()
except:
_match = (x for x in layers if x["Id"] == _id).next()
except StopIteration:
# We were unable to associate IDs due to the local image being set
# to intermediate by docker bc it is outdated. Therefore we find
# the first instance by name for the index
try:
_match = (x for x in layers if x["Name"] == name).__next__()
except:
_match = (x for x in layers if x["Name"] == name).next()
return _match['index']
def get_local_latest_version(self, name):
"""
Obtain the latest version of a local image
:param name:
:return: str of vnr
"""
images = self.get_images()
for image in images:
if 'Labels' in image and image['Labels'] is not None:
if self.pull_label(image, 'Name') == name:
return self.assemble_nvr(image)
else:
continue
return "{}-Version unavailable".format(name)
def get_latest_remote_version(self, tag, name=None):
r_inspect = util.skopeo(tag)
if 'Labels' in r_inspect['Config'] \
and r_inspect['Config']['Labels'] is not None:
latest_version = self.assemble_nvr(r_inspect['Config'], image_name=name)
else:
latest_version = "Version unavailable"
return latest_version
def assemble_nvr(self, image, image_name=None):
"""
Simple formatting def for NVR
:param image:
:return: str
"""
name = self.pull_label(image, 'Name')
version = self.pull_label(image, 'Version')
release = self.pull_label(image, 'Release')
nvr = "%s-%s-%s" % (name, version, release)
if any(True for x in [name, version, release] if x is None):
return "{}-Version unavailable".format(image_name)
else:
return nvr
@staticmethod
def get_local_version(name, layers):
for layer in layers:
if layer['Name'] is name:
return layer['Version'] if 'Version' in layer \
else "Version unavailable"
@staticmethod
def pull_label(image, key):
if key in image["Labels"]:
return image['Labels'][key]