1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-06 21:45:24 +01:00
Files
atomic/Atomic/trust.py
Jonathan Lebon d2b6a875d1 Atomic/trust.py: handle older gpg2 outputs
1. Decode gpg2 output as UTF-8.
2. Use startswith() and require ':' to be more strict.
3. Handle older gpg2 outputs in which the User-ID is part of the 'pub'
   line.

Closes: #724
Approved by: rhatdan
2016-10-26 18:46:48 +00:00

446 lines
20 KiB
Python

from . import util
from . import Atomic
import os
import argparse
import json
import yaml
import requests
import collections
TRANSPORT_TYPES = ["docker", "atomic", "dir"]
def cli(subparser):
# atomic trust
pubkeys_dir = util.get_atomic_config_item(['pubkeys_dir'])
trustp = subparser.add_parser("trust",
help=_("Manage system container trust policy"),
epilog="Manages the trust policy of the host system. "
"Trust policy describes a registry scope "
"that must be signed by public keys.")
commonp = argparse.ArgumentParser(add_help=False)
registry_help="""Registry to manage trust policy for, REGISTRY[/REPOSITORY].
Trust policy allows for nested scope.
Example: registry.example.com defines trust for entire registry.
Example: registry.example.com/acme defines trust for specific repository."""
sigstore_help="""Signature server type (default: web)
local: local file
web: remote web server
atomic: openshift-based atomic registry"""
commonp.add_argument("-k", "--pubkeys", nargs='?', default=[],
action="append", dest="pubkeys",
help=_("Absolute path of installed public key(s) to trust for TARGET. "
"May used multiple times to define multiple public keys. "
"File(s) must exist before using this command. "
"Default directory is %s" % pubkeys_dir))
commonp.add_argument("--sigstoretype", dest="sigstoretype", default="web",
choices=['atomic', 'local', 'web'],
help=sigstore_help)
commonp.add_argument("-t", "--type", dest="trust_type", default="signedBy",
choices=['insecureAcceptAnything', 'reject', 'signedBy'],
help="Trust type (default: signedBy)")
commonp.add_argument("--keytype", dest="keytype", default="GPGKeys",
help="Public key type (default: GPGKeys)")
commonp.add_argument("-s", "--sigstore", dest="sigstore",
help=_("URL and path of remote signature server, "
"https://sigstore.example.com/signatures. "
"Ignored with 'atomic' sigstoretype."))
commonp.add_argument("registry",
help=registry_help)
subparsers = trustp.add_subparsers()
addp = subparsers.add_parser("add", parents=[commonp],
help="Add a new trust policy for a registry")
addp.set_defaults(_class=Trust, func="add")
defaultp = subparsers.add_parser("default",
help="Modify default trust policy")
defaultp.add_argument("default_policy", choices=["reject", "accept"],
help="Default policy action")
defaultp.set_defaults(_class=Trust, func="modify_default")
deletep = subparsers.add_parser("delete",
help="Delete a trust policy for a registry")
deletep.add_argument("registry",
help=registry_help)
deletep.add_argument("--sigstoretype", dest="sigstoretype", default="web",
choices=['atomic', 'local', 'web'],
help=sigstore_help)
deletep.add_argument("--save-sigstore", dest="save", default=True, action="store_false",
help="Do not remove the registry sigstore configuration")
deletep.set_defaults(_class=Trust, func="delete")
showp = subparsers.add_parser("show",
help="Display trust policy for the system")
showp.add_argument('--raw', action='store_true', help="Output raw policy file")
showp.add_argument('-j', '--json', action='store_true', help="Output as json")
showp.set_defaults(_class=Trust, func="show")
class Trust(Atomic):
def __init__(self, policy_filename="/etc/containers/policy.json"):
"""
:param policy_filename: override policy filename
"""
super(Trust, self).__init__()
self.policy_filename = os.environ.get('TRUST_POLICY', policy_filename)
self.atomic_config = util.get_atomic_config()
def add(self, registry=None, pubkeys=None, sigstore=None, sigstoretype=None, keytype=None, trust_type=None):
"""
Add trust to policy.json file and optionally update registries.d registry configuration
:param sigstoretype: string, human-readable sigstore type, one of "atomic", "web", "local"
:param registry: the registry[/repo] to add policy for
:param pubkeys: list of pubkeys to add with trust_type "signedBy"
:param keytype: string, "GPGKeys"
:param trust_type: string, one of "signedBy", "insecureAcceptAnything", "reject"
:param sigstore: string, URL of signature server
"""
if registry is None:
registry=self.args.registry
if pubkeys is None:
pubkeys=self.args.pubkeys
if sigstoretype is None:
sigstoretype=self.args.sigstoretype
if keytype is None:
keytype=self.args.keytype
if trust_type is None:
trust_type=self.args.trust_type
if sigstore is None:
sigstore=self.args.sigstore
sstype = self.get_sigstore_type_map(sigstoretype)
mode = "r+" if os.path.exists(self.policy_filename) else "w+"
with open(self.policy_filename, mode) as policy_file:
if mode == "r+":
policy = json.load(policy_file)
policy = self.check_policy(policy, sstype)
if registry in policy["transports"][sstype]:
if not self.args.assumeyes:
confirm = util.input("Trust policy already defined for %s:%s\nDo you want to overwrite? (y/N) " % (sigstoretype, registry))
if not "y" in confirm.lower():
exit(0)
else:
policy={"transports":{sstype:{}}}
payload = []
for k in pubkeys:
if not os.path.exists(k):
raise ValueError("The public key file %s was not found. This file must exist to proceed." % k)
payload.append({ "type": trust_type, "keyType": keytype, "keyPath": k })
if trust_type == "signedBy":
if len(pubkeys) == 0:
raise ValueError("At least one public key must be defined for type 'signedBy'")
else:
payload.append({ "type": trust_type })
policy["transports"][sstype][registry] = payload
policy_file.seek(0)
json.dump(policy, policy_file, indent=4)
policy_file.truncate()
if sigstore:
if sstype == "atomic":
raise ValueError("Sigstore cannot be defined for sigstoretype 'atomic'")
else:
self.modify_registry_config(registry, sstype, sigstore)
def delete(self):
"""
Remove trust policy entry and sigstore
"""
sstype = self.get_sigstore_type_map(self.args.sigstoretype)
with open(self.policy_filename, 'r+') as policy_file:
policy = json.load(policy_file)
try:
del policy["transports"][sstype][self.args.registry]
except KeyError:
raise ValueError("Could not find trust policy defined for %s transport %s" %
(self.args.sigstoretype, self.args.registry))
policy_file.seek(0)
json.dump(policy, policy_file, indent=4)
policy_file.truncate()
if self.args.save:
self.modify_registry_config(self.args.registry, self.get_sigstore_type_map(self.args.sigstoretype))
def modify_default(self):
"""
Modify global trust policy default
"""
mode = "r+" if os.path.exists(self.policy_filename) else "w+"
with open(self.policy_filename, mode) as policy_file:
if mode == "r+":
policy = json.load(policy_file)
else:
policy = {}
default_type_map = { "accept": "insecureAcceptAnything", "reject": "reject" }
if "default" in policy:
policy["default"][0]["type"] = default_type_map[self.args.default_policy]
else:
policy["default"] = [ { "type": default_type_map[self.args.default_policy] }]
policy_file.seek(0)
json.dump(policy, policy_file, indent=4)
policy_file.truncate()
def install_pubkey(self, key_name, key_url):
"""
Installs remote public key to system config directory
:param key_name: id of key used as filename
:param key_url: download URI of public key
:return: pubkey path string or False
"""
pubkeys_dir = util.get_atomic_config_item(['pubkeys_dir'], util.get_atomic_config())
pubkey_file = "%s/%s" % (pubkeys_dir, key_name)
if not os.path.exists(pubkeys_dir):
os.mkdir(pubkeys_dir)
if os.path.exists(pubkey_file):
util.write_out("Public key %s already installed at %s" % (key_name, pubkey_file))
else:
r = requests.get(key_url)
if r.status_code == 200:
with open(pubkey_file, 'w') as pubfile:
pubfile.write(r.content)
util.write_out("Installed public key %s" % pubkey_file)
else:
util.write_out("WARNING: Could not download public key using URL %s." % key_url)
util.write_out("Download the public key manually and install as %s" % pubkey_file)
return pubkey_file
def check_policy(self, policy, sstype):
"""
Prep the policy file with required data structure
:param policy: policy data
:param sstype: sigstore type
:return: modified policy
"""
if not "transports" in policy:
policy["transports"] = {}
if not sstype in policy["transports"]:
policy["transports"][sstype] = {}
return policy
def modify_registry_config(self, registry, sstype, sigstore=None):
"""
Modify the registries.d configuration for a registry
:param registry: a registry name
:param sstype: mapped sigstore type
:param sigstore: a file:/// or https:// URL. Optional
"""
registry_config_path = util.get_atomic_config_item(["registry_confdir"], self.atomic_config)
registry_configs, _ = util.get_registry_configs(registry_config_path)
reg_info = util.have_match_registry(registry, registry_configs)
reg_yaml_file = "%s.yaml" % os.path.join(registry_config_path, registry.replace("/", "-"))
mode = "r+"
if (not reg_info or registry not in registry_configs):
if not os.path.exists(reg_yaml_file):
mode="w+"
else:
if not sigstore == registry_configs[registry]["sigstore"]:
reg_yaml_file = registry_configs[registry]["filename"]
self.write_registry_config_file(reg_yaml_file, registry, sstype, sigstore, mode=mode)
def write_registry_config_file(self, reg_file, registry, sstype="docker", sigstore=None, mode="r+"):
"""
Utility method to modify existing registry config file
:param reg_file: registry filename
:param registry: registry name
:param sstype: mapped sigstore type
:param sigstore: sigstore server. If None the sigstore section is deleted.
:param mode: file open mode
"""
with open(reg_file, mode) as f:
d = yaml.load(f)
if not sigstore:
del d[sstype][registry]
else:
d = { sstype: {}}
d[sstype][str(registry)] = { "sigstore": str(sigstore) }
f.seek(0)
yaml.dump(d, f, default_flow_style=False)
f.truncate()
def get_sigstore_type_map(self, sigstore_type):
"""
Given the user-friendly trust type, get the skopeo value
:param sigstore_type: one of web,local,atomic
:return: skopeo trust policy type string
"""
t = { "docker": "docker", "web": "docker", "local": "dir", "dir": "dir", "atomic": "atomic" }
if sigstore_type not in t:
raise ValueError("Invalid sigstore type %s" % sigstore_type)
return t[sigstore_type]
def discover_sigstore(self, pull_image):
"""
Check for registry/repo/sigstore metadata image
prompt user for trust on first use workflow
:param pull_image: image being pulled, used to discover matching sigstore meta image
"""
if not util.get_atomic_config_item(['discover_sigstores'], util.get_atomic_config()):
return True
registry, repo, _, _, _ = util.Decompose(pull_image).all
repo = repo.split('/')
registry_config_path = util.get_atomic_config_item(["registry_confdir"], self.atomic_config)
registry_configs, _ = util.get_registry_configs(registry_config_path)
sigstore_labels = False
scope = '/'.join([registry, repo[0]])
if not scope in registry_configs:
sigstore_labels = self.get_sigstore_image_metadata(registry, repo[0])
if not sigstore_labels:
scope = registry
if not scope in registry_configs:
sigstore_labels = self.get_sigstore_image_metadata(registry)
if self._validate_sigstore_labels(sigstore_labels):
if self.prompt_trust(sigstore_labels):
pubkey_path = self.install_pubkey(sigstore_labels['pubkey-id'], sigstore_labels['pubkey-url'])
explicit_sigstoretype = "web"
if "sigstore-type" in sigstore_labels:
explicit_sigstoretype = sigstore_labels['sigstore-type']
self.add(registry=scope, trust_type="signedBy", sigstoretype=explicit_sigstoretype, keytype="GPGKeys", pubkeys=[pubkey_path], sigstore=sigstore_labels['sigstore-url'])
def get_sigstore_image_metadata(self, registry, repo=None):
"""
Get sigstore metadata image
:param registry: registry string
:param repo: repo string
:return dict of labels or False
"""
_img = util.get_atomic_config_item(['sigstore_metadata_image'], util.get_atomic_config())
if repo:
sigstoreimage = '/'.join([registry, repo, _img])
else:
sigstoreimage = '/'.join([registry, _img])
try:
data = util.skopeo_inspect("docker://" + sigstoreimage, args=None)
except ValueError:
data = None
if data:
return data['Labels']
else:
return False
def _validate_sigstore_labels(self, labels):
"""
Validate sigstore metadata.
If there's a missing key or something we don't want to perform any automatic trust policy configuration
:param labels: unvalidated labels. Should be either dict or False
:return: True if labels are valid
"""
is_valid = False
if labels:
# sigstore-type is optional
expected_keys = ["pubkey-id", "pubkey-fingerprint", "pubkey-url", "sigstore-url"]
for k in expected_keys:
is_valid = k in labels
return is_valid
def prompt_trust(self, labels):
"""
Prompt user for trust on first use workflow
:param labels: dict of metadata labels defining sigstore trust
:return: True if user accepts
"""
util.write_out("ID: " + labels['pubkey-id'])
util.write_out("Fingerprint: " + labels['pubkey-fingerprint'])
util.write_out("Public key download URL: %s" % labels['pubkey-url'])
if not self.args.assumeyes:
confirm = util.input("Do you want to add trust policy for this registry? (y/N) ")
if not "y" in confirm.lower():
return False
return True
def _get_policy(self):
policy = None
mode = "r+" if os.path.exists(self.policy_filename) else "w+"
with open(self.policy_filename, mode) as policy_file:
if mode == "r+":
policy = json.load(policy_file)
else:
policy={ "default": [{ "type": "insecureAcceptAnything" }] }
policy_file.seek(0)
json.dump(policy, policy_file, indent=4)
policy_file.truncate()
return policy
def show_json(self, policy=None):
if not policy:
policy=self._get_policy()
table = {}
registry_config_path = util.get_atomic_config_item(["registry_confdir"], self.atomic_config)
registry_configs, _ = util.get_registry_configs(registry_config_path)
if "transports" in policy:
for tt in TRANSPORT_TYPES:
if tt in policy["transports"]:
for key, values in policy["transports"][tt].items():
table[key] = { "type": values[0]["type"] }
table[key]["keys"] = []
for v in values:
if "keyPath" in v:
table[key]["keys"].append(v["keyPath"])
if "keyData" in v:
table[key]["keys"].append(v["keyData"])
reg_info = util.have_match_registry(key, registry_configs)
if reg_info:
table[key]["sigstore"] = reg_info["sigstore"]
else:
table[key]["sigstore"] = ""
if "default" in policy:
table["* (default)"] = { "type": policy["default"][0]["type"], "keys": None, "sigstore": "" }
return collections.OrderedDict(sorted(table.items()))
def show(self):
"""
Display trust policy
If policy.json doesn't exist, create it, then display
"""
policy=self._get_policy()
if self.args.raw:
util.output_json(policy)
else:
sorted_table = self.show_json(policy)
if self.args.json:
util.output_json(sorted_table)
else:
for key, value in sorted_table.items():
util.write_out('{0:<35} {1:<6} {2:<29} {3}'.format(key, self.trusttype_map(value["type"]), self.get_gpg_id(value["keys"]), value["sigstore"]))
def get_gpg_id(self, keys):
"""
Return GPG identity, either bracketed <email> or ID string
comma separated if more than one key
see gpg2 parsing documentation:
http://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob_plain;f=doc/DETAILS
:param keys: list of gpg key file paths (keyPath) and/or inline key payload (keyData)
:return: comma-separated string of key ids or empty string
"""
if not keys:
return ""
keylist = None
for key in keys:
if not os.path.exists(key):
# extend when we support parsing inline 'keyData' base64 encoded key
return ""
else:
cmd = ["gpg2", "--with-colons", key]
try:
results = util.check_output(cmd).decode('utf-8')
except util.FileNotFound:
results = ""
lines = results.split('\n')
for line in lines:
if line.startswith("uid:") or line.startswith("pub:"):
uid = line.split(':')[9]
if not uid: # Newer gpg2 versions output dedicated 'uid'
continue # lines and leave it blank on 'pub' lines.
# bracketed email
parsed_uid = uid.partition('<')[-1].rpartition('>')[0]
if not parsed_uid:
parsed_uid = uid
if keylist:
keylist = ",".join([keylist, parsed_uid])
else:
keylist = parsed_uid
return keylist
def trusttype_map(self, trust_type):
t = { "insecureAcceptAnything": "accept", "signedBy": "signed", "reject": "reject" }
if trust_type not in t:
raise ValueError("Invalid trust type %s" % trust_type)
return t[trust_type]