1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-05 18:45:01 +01:00
Files
atomic/tests/unit/test_trust.py
Aaron Weitekamp ddc72a61e8 Inline pubkeys in policy.json
Closes: #853
Approved by: rhatdan
2017-02-06 22:41:40 +00:00

218 lines
9.2 KiB
Python

import unittest
import json
import yaml
import os
import sys
import shutil
from contextlib import contextmanager
from Atomic.trust import Trust
import Atomic.util as util
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), "fixtures")
REGISTRIESD = "etc/containers/registries.d"
TEST_POLICY = os.path.join(os.path.join(FIXTURE_DIR, "etc/containers"), "policy.json")
def _new_enough():
py_version = sys.version_info
if (py_version.major, py_version.minor, py_version.micro) >= (2, 7, 6):
return True
return False
new_enough = _new_enough()
class TestAtomicTrust(unittest.TestCase):
class Args():
def __init__(self):
self.sigstoretype = "atomic"
self.registry = "docker.io"
self.pubkeys = []
self.pubkeysfile = [os.path.join(FIXTURE_DIR, "key1.pub")]
self.sigstore = "https://sigstore.example.com/sigs"
self.trust_type = "signedBy"
self.keytype = "GPGKeys"
self.assumeyes = True
self.json = False
self.debug = False
self.save = None
self.raw = False
def test_sigstoretype_map_web(self):
testobj = Trust()
self.assertEqual(testobj.get_sigstore_type_map("web"), "docker")
def test_sigstoretype_map_local(self):
testobj = Trust()
self.assertEqual(testobj.get_sigstore_type_map("local"), "dir")
def test_setup_default_policy(self):
args = self.Args()
args.sigstoretype = "web"
testobj = Trust()
testobj.set_args(args)
with open(os.path.join(FIXTURE_DIR, "default_policy.json"), 'r') as default:
policy_default = json.load(default)
policy_default = testobj.check_policy(policy_default, "docker")
policy_expected = {"default": [{"type": "insecureAcceptAnything" }], "transports": {"docker": {}, "docker-daemon": {"": [{"type": "insecureAcceptAnything"}]}}}
self.assertDictEqual(policy_default, policy_expected)
def test_new_registry_sigstore(self):
testobj = Trust(policy_filename = TEST_POLICY)
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.modify_registry_config("docker.io", "docker", "https://sigstore.example.com/sigs")
with open(os.path.join(FIXTURE_DIR, "configs/docker.io.yaml"), 'r') as f:
conf_expected = yaml.load(f)
with open(os.path.join(FIXTURE_DIR, "etc/containers/registries.d/docker.io.yaml"), 'r') as f:
conf_modified = yaml.load(f)
self.assertEqual(conf_expected, conf_modified)
def test_update_registry_sigstore(self):
testobj = Trust(policy_filename = TEST_POLICY)
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.modify_registry_config("docker.io", "docker", "https://sigstore.example.com/update")
with open(os.path.join(FIXTURE_DIR, "configs/docker.io.updated.yaml"), 'r') as f:
conf_expected = yaml.load(f)
with open(os.path.join(FIXTURE_DIR, "etc/containers/registries.d/docker.io.yaml"), 'r') as f:
conf_modified = yaml.load(f)
self.assertEqual(conf_expected, conf_modified)
def test_add_repo_sigstore(self):
testobj = Trust(policy_filename = TEST_POLICY)
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.modify_registry_config("docker.io/repo", "docker", "https://sigstore.acme.com/sigs")
with open(os.path.join(FIXTURE_DIR, "configs/docker.io-repo.yaml"), 'r') as f:
conf_expected = yaml.load(f)
with open(os.path.join(FIXTURE_DIR, "etc/containers/registries.d/docker.io-repo.yaml"), 'r') as f:
conf_modified = yaml.load(f)
self.assertEqual(conf_expected, conf_modified)
@unittest.skipUnless(new_enough, "Requires 2.7.6 or newer")
def test_add_trust_keys(self):
args = self.Args()
args.sigstore = None
testobj = Trust(policy_filename = TEST_POLICY)
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.set_args(args)
testobj.add()
with open(testobj.policy_filename, 'r') as f:
d = json.load(f)
self.assertEqual(d["transports"]["atomic"]["docker.io"][0]["keyPath"],
os.path.join(FIXTURE_DIR, "key1.pub"))
@unittest.skipUnless(new_enough, "Requires 2.7.6 or newer")
def test_modify_trust_2_keys(self):
args = self.Args()
args.sigstore = None
args.pubkeysfile = [os.path.join(FIXTURE_DIR, "key1.pub"), os.path.join(FIXTURE_DIR, "key2.pub")]
testobj = Trust(policy_filename = TEST_POLICY)
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.set_args(args)
testobj.add()
with open(testobj.policy_filename, 'r') as f:
d = json.load(f)
self.assertEqual(d["transports"]["atomic"]["docker.io"][1]["keyPath"],
os.path.join(FIXTURE_DIR, "key2.pub"))
@unittest.skipUnless(new_enough, "Requires 2.7.6 or newer")
def test_add_reject_type(self):
args = self.Args()
args.trust_type = "reject"
args.sigstoretype = "web"
args.pubkeys = []
args.registry = "registry.example.com/foo"
testobj = Trust(policy_filename = TEST_POLICY)
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.set_args(args)
testobj.add()
with open(testobj.policy_filename, 'r') as f:
d = json.load(f)
self.assertEqual(d["transports"]["docker"][args.registry][0]["type"],
args.trust_type)
@unittest.skipUnless(new_enough, "Requires 2.7.6 or newer")
def test_delete_trust(self):
args = self.Args()
args.pubkeys = []
args.sigstoretype = "web"
args.registry = "registry.example.com/foo"
args.pubkeys = None
testobj = Trust(policy_filename = TEST_POLICY)
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.set_args(args)
testobj.delete()
with open(testobj.policy_filename, 'r') as f:
d = json.load(f)
self.assertNotIn(args.registry, d["transports"]["docker"])
@contextmanager
def captured_output(self):
"""
Grab stdout/stderr for testing
"""
is_python2 = False
# StringIO is challenging to support on both python 2&3
if int(sys.version_info[0]) < 3:
is_python2 = True
import StringIO # pylint: disable=F0401
else:
from io import StringIO
if is_python2:
new_out, new_err = StringIO.StringIO(), StringIO.StringIO() # pylint: disable=E1101
else:
new_out, new_err = StringIO(), StringIO()
old_out, old_err = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = new_out, new_err
yield sys.stdout, sys.stderr
finally:
sys.stdout, sys.stderr = old_out, old_err
def test_trust_show(self):
args = self.Args()
testobj = Trust(policy_filename = os.path.join(FIXTURE_DIR, "show_policy.json"))
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.set_args(args)
with self.captured_output() as (out, _):
testobj.show()
with open(os.path.join(FIXTURE_DIR, "show_policy.output"), 'r') as f:
expected = f.read()
actual = out.getvalue()
self.assertEqual(expected, actual)
def test_trust_gpg_email_id(self):
args = self.Args()
testobj = Trust(policy_filename = os.path.join(FIXTURE_DIR, "show_policy.json"))
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.set_args(args)
actual = testobj.get_gpg_id(args.pubkeysfile)
self.assertEqual("security@redhat.com", actual)
def test_trust_gpg_noemail_id(self):
args = self.Args()
args.pubkeys = [os.path.join(FIXTURE_DIR, "key1.pub"), os.path.join(FIXTURE_DIR, "key2.pub")]
testobj = Trust(policy_filename = os.path.join(FIXTURE_DIR, "show_policy.json"))
testobj.atomic_config = util.get_atomic_config(atomic_config = os.path.join(FIXTURE_DIR, "atomic.conf"))
testobj.set_args(args)
actual = testobj.get_gpg_id(args.pubkeys)
self.assertEqual("security@redhat.com,Billy Bob", actual)
def tearDown(self):
test_artifacts = ["docker.io-repo.yaml", "docker.io.yaml", "registry.example.com-foo.yaml"]
for test_artifact in test_artifacts:
f = os.path.join(os.path.join(FIXTURE_DIR, REGISTRIESD), test_artifact)
if os.path.isfile(f):
os.remove(f)
@classmethod
def tearDownClass(cls):
"""
reset test policy.json
"""
shutil.copyfile(os.path.join(FIXTURE_DIR, "default_policy.json"), TEST_POLICY)
if __name__ == '__main__':
unittest.main()