diff --git a/sops/__init__.py b/sops/__init__.py index c358e13a9..5b3e82e28 100644 --- a/sops/__init__.py +++ b/sops/__init__.py @@ -36,6 +36,13 @@ else: import json from collections import OrderedDict +try: + from collections.abc import MutableMapping + from collections.abc import MutableSequence +except ImportError: + from collections import MutableMapping + from collections import MutableSequence + if sys.version_info[0] == 3: raw_input = input @@ -564,7 +571,10 @@ def verify_or_create_sops_branch(tree, kms_arns=None, pgp_fps=None, tree['sops']['version'] = VERSION tree['sops']['unencrypted_suffix'] = UNENCRYPTED_SUFFIX - if 'kms' in tree['sops'] and isinstance(tree['sops']['kms'], list): + if ( + 'kms' in tree['sops'] + and isinstance(tree['sops']['kms'], MutableSequence) + ): # check that we have at least one ARN to work with for entry in tree['sops']['kms']: if (entry and 'arn' in entry and entry['arn'] != "" and @@ -572,7 +582,10 @@ def verify_or_create_sops_branch(tree, kms_arns=None, pgp_fps=None, return tree, need_new_data_key # if we're here, no data key was found in the kms entries - if 'pgp' in tree['sops'] and isinstance(tree['sops']['pgp'], list): + if ( + 'pgp' in tree['sops'] + and isinstance(tree['sops']['pgp'], MutableSequence) + ): # check that we have at least one fingerprint to work with for entry in tree['sops']['pgp']: if (entry and 'fp' in entry and entry['fp'] != "" and @@ -653,7 +666,7 @@ def update_master_keys(tree, key): with them, and store the new encrypted values. """ if 'kms' in tree['sops']: - if not isinstance(tree['sops']['kms'], list): + if not isinstance(tree['sops']['kms'], MutableSequence): panic("invalid KMS format in SOPS branch, must be a list") i = -1 for entry in tree['sops']['kms']: @@ -667,7 +680,7 @@ def update_master_keys(tree, key): tree['sops']['kms'][i] = updated if 'pgp' in tree['sops']: - if not isinstance(tree['sops']['pgp'], list): + if not isinstance(tree['sops']['pgp'], MutableSequence): panic("invalid PGP format in SOPS branch, must be a list") i = -1 for entry in tree['sops']['pgp']: @@ -815,11 +828,11 @@ def walk_and_decrypt(branch, key, aad=b'', stash=None, digest=None, if stash: stash[k] = {'has_stash': True} nstash = stash[k] - if isinstance(v, dict): + if isinstance(v, MutableMapping): branch[k] = walk_and_decrypt(v, key, aad=caad, stash=nstash, digest=digest, isRoot=False, unencrypted=unencrypted_branch) - elif isinstance(v, list): + elif isinstance(v, MutableSequence): branch[k] = walk_list_and_decrypt(v, key, aad=caad, stash=nstash, digest=digest, unencrypted=unencrypted_branch) @@ -857,11 +870,11 @@ def walk_list_and_decrypt(branch, key, aad=b'', stash=None, digest=None, if stash: stash[i] = {'has_stash': True} nstash = stash[i] - if isinstance(v, dict): + if isinstance(v, MutableMapping): kl.append(walk_and_decrypt(v, key, aad=aad, stash=nstash, digest=digest, isRoot=False, unencrypted=unencrypted)) - elif isinstance(v, list): + elif isinstance(v, MutableSequence): kl.append(walk_list_and_decrypt(v, key, aad=aad, stash=nstash, digest=digest, unencrypted=unencrypted)) @@ -950,12 +963,12 @@ def walk_and_encrypt(branch, key, aad=b'', stash=None, nstash = dict() if stash and k in stash: nstash = stash[k] - if isinstance(v, dict): + if isinstance(v, MutableMapping): # recursively walk the tree branch[k] = walk_and_encrypt(v, key, aad=caad, stash=nstash, digest=digest, isRoot=False, unencrypted=unencrypted_branch) - elif isinstance(v, list): + elif isinstance(v, MutableSequence): branch[k] = walk_list_and_encrypt(v, key, aad=caad, stash=nstash, digest=digest, unencrypted=unencrypted_branch) @@ -985,11 +998,11 @@ def walk_list_and_encrypt(branch, key, aad=b'', stash=None, digest=None, for i, v in enumerate(list(branch)): if stash and i in stash: nstash = stash[i] - if isinstance(v, dict): + if isinstance(v, MutableMapping): kl.append(walk_and_encrypt(v, key, aad=aad, stash=nstash, digest=digest, isRoot=False, unencrypted=unencrypted)) - elif isinstance(v, list): + elif isinstance(v, MutableSequence): kl.append(walk_list_and_encrypt(v, key, aad=aad, stash=nstash, digest=digest, unencrypted=unencrypted)) @@ -1134,6 +1147,14 @@ def get_key_from_kms(tree): entry['arn']) continue context = entry['context'] if 'context' in entry else {} + if ( + not isinstance(context, dict) + and isinstance(context, MutableMapping) + ): + # ruamel.yaml's CommentedMap no longer subclasses dict as of + # 0.15.52, but botocore only accepts dict subclasses, so unpack the + # context as an unordered, uncommented dict. + context = dict(**context) try: kms_response = kms.decrypt(CiphertextBlob=b64decode(enc), EncryptionContext=context) @@ -1292,7 +1313,10 @@ def write_file(tree, path=None, filetype=None): fd = tempfile.NamedTemporaryFile(suffix="."+filetype, delete=False) path = fd.name - if not isinstance(tree, dict) and not isinstance(tree, list): + if ( + not isinstance(tree, MutableMapping) + and not isinstance(tree, MutableSequence) + ): if path == 'stdout': sys.stdout.write(tree.encode('utf-8')) else: