1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-06 12:45:57 +01:00
Files
atomic/Atomic/atomic.py
Brent Baude 2b744bfdbb Honor proxy usage
If HTTP[S]_PROXY is defined, honor it in python requests usage
as well as pass it on to skopeo.

If http[s]_proxy is defined in atomic.conf, use it; however, environment
variables will override these if defined.

Added --insecure to Atomic push so the user can override the logic
(or lack thereof) around deducing if a registry is insecure.  Also
needed for integration tests.

Closes: #964
Approved by: rhatdan
2017-04-13 16:13:07 +00:00

612 lines
20 KiB
Python

import os
import sys
import json
import pipes
from .client import AtomicDocker
from .syscontainers import SystemContainers
import requests
from . import util
import re
from Atomic.backends._docker_errors import NoDockerDaemon, DockerObjectNotFound
from docker.errors import NotFound
from .discovery import RegistryInspect, RegistryInspectError
def find_repo_tag(d, Id, image_name):
def image_in_repotags(image_name, repotags):
if image_name in repotags:
return image_name
for repotag in repotags:
if repotag.startswith("{}:".format(image_name)):
return image_name
return None
if not find_repo_tag.images:
find_repo_tag.images = d.images()
for image in find_repo_tag.images:
repo_tag = image_in_repotags(image_name, image['RepoTags'])
if repo_tag is not None:
return repo_tag
if Id == image["Id"]:
return image["RepoTags"][0]
return ""
find_repo_tag.images = None
class Atomic(object):
results = '/var/lib/atomic'
skull = (u"\u2620").encode('utf-8')
def __init__(self):
self.d = AtomicDocker()
self.args = None
self.command = None
self.name = None
self.image = None
self.spc = False
self.system = False
self.setvalues = None
self.inspect = None
self.backend = None
self.user = None
self.force = False
self._images = []
self.containers = False
self.images_cache = []
self.images_all_cache = []
self.active_containers = []
self.docker_cmd = None
self.debug = False
self.is_python2 = (int(sys.version[0])) < 3
self.useTTY = True
self.syscontainers = SystemContainers()
self.run_opts = None
self.atomic_config = util.get_atomic_config()
self.local_tokens = {}
util.set_proxy()
def __enter__(self):
return self
def __exit__(self, typ, value, traceback):
try:
self.d.close()
except NoDockerDaemon:
pass
def docker_binary(self):
if not self.docker_cmd:
self.docker_cmd = util.default_docker()
return self.docker_cmd
def get_label(self, label, image=None):
inspect = self._inspect_image(image)
cfg = inspect.get("Config", None)
if cfg:
labels = cfg.get("Labels", [])
if labels and label in labels:
return labels[label]
return ""
def force_delete_containers(self):
if self._inspect_image():
image = self.image
if self.image.find(":") == -1:
image += ":latest"
for c in self.get_containers():
if c["Image"] == image:
self.d.remove_container(c["Id"], force=True)
def pull(self):
prevstatus = ""
for line in self.d.pull(self.image, stream=True):
bar = json.loads(line)
status = bar['status']
if prevstatus != status:
util.write_out(status, "")
if 'id' not in bar:
continue
if status == "Downloading":
util.write_out(bar['progress'] + " ")
elif status == "Extracting":
util.write_out("Extracting: " + bar['id'])
elif status == "Pull complete":
pass
elif status.startswith("Pulling"):
util.write_out("Pulling: " + bar['id'])
prevstatus = status
util.write_out("")
def set_args(self, args):
self.args = args
try:
self.image = args.image
except (NameError, AttributeError):
pass
try:
self.command = args.command
except (NameError, AttributeError):
pass
try:
self.spc = args.spc
except (NameError, AttributeError):
pass
try:
self.system = args.system
except (NameError, AttributeError):
pass
try:
self.name = args.name
except (NameError, AttributeError):
pass
try:
self.force = args.force
except (NameError, AttributeError):
pass
try:
self.user = args.user
except (NameError, AttributeError):
pass
if not self.name and self.image is not None:
self.name = self.image.split("/")[-1].split(":")[0]
if self.spc:
self.name = self.name + "-spc"
if self.system or self.user:
self.name = self.syscontainers.get_default_system_name(self.image)
try:
if not self.name and args.container:
self.name = args.container
except (NameError, AttributeError):
pass
self.syscontainers.set_args(self.args)
def _getconfig(self, key, default=None):
assert self.inspect is not None
cfg = self.inspect.get("Config")
if cfg is None:
return default
val = cfg.get(key, default)
if val is None:
return default
return val
def _get_cmd(self):
return self._getconfig("Cmd", ["/bin/sh"])
def _get_labels(self):
return self._getconfig("Labels", [])
def _inspect_image(self, image=None):
image = image or self.image
try:
if self.syscontainers.has_image(image):
return self.syscontainers.inspect_system_image(image)
return self.d.inspect_image(image)
except (NotFound, requests.exceptions.ConnectionError):
pass
return None
def _inspect_container(self, name=None):
if name is None:
name = self.name
try:
return self.d.inspect_container(name)
except (NotFound, requests.exceptions.ConnectionError):
pass
return None
def _get_args(self, label):
labels = self._get_labels()
for l in [label, label.lower(), label.capitalize(), label.upper()]:
if l in labels:
return labels[l].split()
return None
#def ps -> Atomic/ps.py
#def run -> Atomic/run.py
def quote(self, args):
return list(map(pipes.quote, args))
def cmd_env(self):
newenv = dict(os.environ)
newenv['NAME'] = self.name or ""
newenv['IMAGE'] = self.image or ""
if hasattr(self.args, 'opt1') and self.args.opt1:
newenv['OPT1'] = os.path.expandvars(self.args.opt1)
if hasattr(self.args, 'opt2') and self.args.opt2:
newenv['OPT2'] = os.path.expandvars(self.args.opt2)
if hasattr(self.args, 'opt3') and self.args.opt3:
newenv['OPT3'] = os.path.expandvars(self.args.opt3)
if not hasattr(self.args, 'PWD'):
newenv['PWD'] = os.getcwd()
default_uid = "0"
with open("/proc/self/loginuid") as f:
val = f.readline()
if int(val) <= 2147483647:
default_uid = val
if "SUDO_UID" not in newenv:
newenv["SUDO_UID"] = default_uid
if 'SUDO_GID' not in newenv:
newenv["SUDO_GID"] = default_uid
if self.run_opts is not None:
newenv["RUN_OPTS"] = self.run_opts
return newenv
def gen_cmd(self, cargs):
args = []
for c in cargs:
if c == "IMAGE":
args.append(self.image)
continue
if c == "IMAGE=IMAGE":
args.append("IMAGE=%s" % self.image)
continue
if c == "NAME=NAME":
args.append("NAME=%s" % self.name)
continue
if c == "NAME":
args.append(self.name)
continue
args.append(c)
if self.is_python2:
return " ".join([x.decode('utf-8') for x in args])
else:
return " ".join(args)
def get_fq_name(self, image_info):
if not image_info['RepoTags']:
return None
if len(image_info['RepoTags']) > 1:
if self.image in image_info['RepoTags']:
return self.image
possibles = []
for i in image_info['RepoTags']:
try:
possibles.append(self.get_fq_image_name(i))
except RegistryInspectError:
possibles.append(None)
if all(x==possibles[0] for x in possibles):
return possibles[0]
raise ValueError("\n{} is tagged with multiple repositories. "
"Try adding a tag to your input.\n".format(self.image))
return image_info['RepoTags'][0]
def is_iid(self):
for i in self.get_images():
if i['Id'].startswith(self.image):
return True
return False
def _no_such_image(self):
raise ValueError("Could not find any image matching '{}'"
.format(self.args.image))
def is_dangling(self, image):
if image == "<none>":
return True
return False
def _container_exists(self, name):
try:
return self.syscontainers.get_checkout(name) or self._inspect_container(name)
except ValueError:
return None
def help(self):
if os.path.exists("/usr/bin/rpm-ostree"):
return _('Atomic Management Tool')
else:
return _('Atomic Container Tool')
def _get_layer(self, image):
def get_label(label):
return self.get_label(label, image["Id"])
image = self._inspect_image(image)
if not image:
raise ValueError("Image '%s' does not exist" % self.image)
version = ("%s-%s-%s" % (get_label("Name"), get_label("Version"),
get_label("Release"))).strip("-")
if 'Parent' in image:
parent = image['Parent']
else:
parent = ""
return({"Id": image['Id'], "Name": get_label("Name"),
"Version": version, "RepoTags": image['RepoTags'],
"Parent": parent})
def get_layers(self):
layers = []
layer = self._get_layer(self.image)
layers.append(layer)
while layer["Parent"] != "":
layer = self._get_layer(layer["Parent"])
layers.append(layer)
return layers
def display(self, cmd):
util.write_out(cmd)
def sub_env_strings(self, in_string):
"""
Performs substitutions on an input string based on defined
environment variables.
:param in_string: string to perform the subs on
:return: string
"""
# Perform variable subs
in_string = util.expandvars(in_string, environ=self.cmd_env())
# Replace undefined variables with blank
in_string = re.sub(r'\$\{?\S*\}?', '', in_string)
# Solve whitespacing
in_string = " ".join(in_string.split())
return in_string
def ping(self):
'''
Check if the docker daemon is running; if not, exit with
message and return code 1
'''
try:
self.d.ping()
except requests.exceptions.ConnectionError:
raise NoDockerDaemon()
def _is_container(self, identifier, active=False):
'''
Checks is the identifier is a container ID or container name. If
it is, returns the full container ID. Else it will return an
AtomicError. Takes optional keyword active, which signifies
that you want to only deal with active containers.
'''
if active:
active_cons = self.get_active_containers()
active_con_ids = [x['Id'] for x in active_cons]
cons = active_cons
else:
cons = self.get_containers()
# First check if the container exists by whatever
# identifier was given
self.inspect = self._inspect_container(name=identifier)
if self.inspect is not None:
# Inspect found a match
if not active:
return self.inspect['Id']
else:
# Check if the container is active
if self.inspect['Id'] in active_con_ids:
return self.inspect['Id']
err_append = "Refine your search to narrow results."
# The identifier might be a partial name?
con_ids = []
for con in cons:
for name in con['Names']:
if name.startswith("/{0}".format(identifier)):
con_ids.append(con['Id'])
break
# More than one match was found
if len(con_ids) > 1:
raise AtomicError("Multiple matches were found for {0}. {1}"
.format(identifier, err_append))
# No matches were found
elif len(con_ids) < 1:
active_err = '' if not active else 'active '
error_msg = "Unable to find {0}container '{1}'".format(active_err,
identifier)
raise AtomicError(error_msg)
else:
self.inspect = self._inspect_container(con_ids[0])
return con_ids[0]
def _is_image(self, identifier):
'''
Checks is the identifier is a image ID or a matches an image name.
If it finds a match, it returns the full image ID. Else it will
return an AtomicError.
'''
err_append = "Refine your search to narrow results."
image_info = self.get_images()
inspect = self._inspect_image(image=identifier)
if inspect is not None:
self.inspect = inspect
return inspect['Id']
name_search = util.image_by_name(identifier, images=image_info)
if len(name_search) > 0:
if len(name_search) > 1:
tmp_image = dict((x['Id'], x['RepoTags']) for x in image_info)
repo_tags = []
for name in name_search:
for repo_tag in tmp_image.get(name['Id']):
if repo_tag.find(identifier) > -1:
repo_tags.append(repo_tag)
raise ValueError("Found more than one image possibly "
"matching '{0}'. They are:\n {1} \n{2}"
.format(identifier, "\n ".join(repo_tags),
err_append))
return name_search[0]['Id']
# No dice
raise AtomicError
def is_duplicate_image(self, image):
try:
if self.syscontainers.has_image(image) and self.d.inspect_image(image):
return True
return False
except (NotFound, requests.exceptions.ConnectionError):
pass
return False
def get_input_id(self, identifier):
'''
Determine if the input "identifier" is valid. Return the container or
image ID when true and raise a ValueError when not
'''
try:
return self._is_image(identifier)
except AtomicError:
pass
try:
return self._is_container(identifier)
except AtomicError:
pass
if self.syscontainers.has_image(identifier):
return identifier
if self.syscontainers.get_checkout(identifier):
return identifier
raise DockerObjectNotFound(identifier)
def _get_docker_images(self, get_all=False):
try:
images = self.d.images(all=get_all)
for i in images:
i["ImageType"] = "Docker"
i["ImageId"] = i["Id"]
except requests.exceptions.ConnectionError:
raise NoDockerDaemon()
return images
def get_images(self, get_all=False):
'''
Wrapper function that should be used instead of querying docker
multiple times for a list of images.
'''
if get_all:
if len(self.images_all_cache) == 0:
self.images_all_cache = self._get_docker_images(get_all=True) + self.syscontainers.get_system_images(get_all=True)
return self.images_all_cache
else:
if len(self.images_cache) == 0:
self.images_cache = self._get_docker_images() + self.syscontainers.get_system_images()
return self.images_cache
def get_containers(self):
'''
Wrapper function that should be used instead of querying docker
multiple times for a list of containers
'''
if not self.containers:
self.containers = self.d.containers(all=True)
return self.containers + self.syscontainers.get_containers()
def get_active_containers(self, refresh=False):
'''
Wrapper function for obtaining active containers. Should be used
instead of direct queries to docker
'''
if len(self.active_containers) == 0 or refresh:
self.active_containers = self.d.containers(all=False)
return self.active_containers
def set_debug(self):
if self.args.debug:
self.debug = True
def get_all_vulnerable_info(self):
"""
Read and parse the /var/lib/atomic/scan_summary.json object.
"""
try:
return json.loads(open(os.path.join(self.results, "scan_summary.json"), "r").read())
except (IOError, ValueError):
return {}
def get_vulnerable_ids(self):
"""
Reads in /var/lib/atomic/scan_summary.json and returns a list of all
the uuids that are vulnerable
:return:
"""
try:
summary_results = json.loads(open(os.path.join(self.results, "scan_summary.json"), "r").read())
vuln_ids = []
for uuid in summary_results.keys():
if 'Vulnerable' in summary_results[uuid]:
if summary_results[uuid]['Vulnerable']:
vuln_ids.append(uuid)
return vuln_ids
except IOError:
return []
def get_local_tokens(self):
if len(self.local_tokens) == 0:
self.local_tokens = self.load_local_tokens()
return self.local_tokens
@staticmethod
def load_local_tokens():
tokens = {}
token_file_name = os.path.expanduser('~/.docker/config.json')
if not os.path.exists(token_file_name):
return {}
with open(token_file_name) as token_file:
token_data = json.load(token_file)
try:
for registry in token_data['auths']:
reg_key = registry
if registry == 'https://index.docker.io/v1/':
reg_key = 'docker.io'
tokens[reg_key] = token_data['auths'][registry]['auth']
except KeyError:
# Just return a blank dict
pass
return tokens
def get_fq_image_name(self, input_image):
registry, repo, image, tag, digest = util.Decompose(input_image).all
if not image:
raise ValueError('Error parsing input: "{}" invalid'.format(input_image))
if all([True if x else False for x in [registry, image, tag]]):
img = registry
if repo:
img += "/{}".format(repo)
img += "/{}:{}".format(image, tag)
return img
if not registry:
ri = RegistryInspect(registry, repo, image, tag, digest=digest, debug=self.args.debug, orig_input=self.image)
return ri.find_image_on_registry()
class AtomicError(Exception):
pass