1
0
mirror of https://github.com/ansible/tower-cli.git synced 2026-02-06 18:47:28 +01:00
Files
tower-cli/tower_cli/utils/parser.py
2019-03-21 21:07:24 +05:30

177 lines
6.6 KiB
Python

# Copyright 2015, Ansible, Inc.
# Alan Rominger <arominger@ansible.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import json
import ast
import shlex
import sys
import six
from tower_cli import exceptions as exc
from tower_cli.utils import debug
from tower_cli.utils.data_structures import OrderedDict
def parse_kv(var_string):
"""Similar to the Ansible function of the same name, parses file
with a key=value pattern and stores information in a dictionary,
but not as fully featured as the corresponding Ansible code."""
return_dict = {}
# Output updates dictionaries, so return empty one if no vals in
if var_string is None:
return {}
# Python 2.6 / shlex has problems handling unicode, this is a fix
fix_encoding_26 = False
if sys.version_info < (2, 7) and '\x00' in shlex.split(u'a')[0]:
fix_encoding_26 = True
# Also hedge against Click library giving non-string type
is_unicode = False
if fix_encoding_26 or not isinstance(var_string, str):
if isinstance(var_string, six.text_type):
var_string = var_string.encode('UTF-8')
is_unicode = True
else:
var_string = str(var_string)
# Use shlex library to split string by quotes, whitespace, etc.
for token in shlex.split(var_string):
# Second part of fix to avoid passing shlex unicode in py2.6
if (is_unicode):
token = token.decode('UTF-8')
if fix_encoding_26:
token = six.text_type(token)
# Look for key=value pattern, if not, process as raw parameter
if '=' in token:
(k, v) = token.split('=', 1)
# If '=' are unbalanced, then stop and warn user
if len(k) == 0 or len(v) == 0:
raise Exception
# If possible, convert into python data type, for instance "5"->5
try:
return_dict[k] = ast.literal_eval(v)
except Exception:
return_dict[k] = v
else:
# scenario where --extra-vars=42, will throw error
raise Exception
return return_dict
def string_to_dict(var_string, allow_kv=True, require_dict=True):
"""Returns a dictionary given a string with yaml or json syntax.
If data is not present in a key: value format, then it return
an empty dictionary.
Attempts processing string by 3 different methods in order:
1. as JSON 2. as YAML 3. as custom key=value syntax
Throws an error if all of these fail in the standard ways."""
# try:
# # Accept all valid "key":value types of json
# return_dict = json.loads(var_string)
# assert type(return_dict) is dict
# except (TypeError, AttributeError, ValueError, AssertionError):
try:
# Accept all JSON and YAML
return_dict = yaml.load(var_string, Loader=yaml.SafeLoader)
if require_dict:
assert type(return_dict) is dict
except (AttributeError, yaml.YAMLError, AssertionError):
# if these fail, parse by key=value syntax
try:
assert allow_kv
return_dict = parse_kv(var_string)
except Exception:
raise exc.TowerCLIError(
'failed to parse some of the extra '
'variables.\nvariables: \n%s' % var_string
)
return return_dict
def process_extra_vars(extra_vars_list, force_json=True):
"""Returns a string that is valid JSON or YAML and contains all the
variables in every extra_vars_opt inside of extra_vars_list.
Args:
parse_kv (bool): whether to allow key=value syntax.
force_json (bool): if True, always output json.
"""
# Read from all the different sources and put into dictionary
extra_vars = {}
extra_vars_yaml = ""
for extra_vars_opt in extra_vars_list:
# Load file content if necessary
if extra_vars_opt.startswith("@"):
with open(extra_vars_opt[1:], 'r') as f:
extra_vars_opt = f.read()
# Convert text markup to a dictionary conservatively
opt_dict = string_to_dict(extra_vars_opt, allow_kv=False)
else:
# Convert text markup to a dictionary liberally
opt_dict = string_to_dict(extra_vars_opt, allow_kv=True)
# Rolling YAML-based string combination
if any(line.startswith("#") for line in extra_vars_opt.split('\n')):
extra_vars_yaml += extra_vars_opt + "\n"
elif extra_vars_opt != "":
extra_vars_yaml += yaml.dump(
opt_dict, default_flow_style=False) + "\n"
# Combine dictionary with cumulative dictionary
extra_vars.update(opt_dict)
# Return contents in form of a string
if not force_json:
try:
# Conditions to verify it is safe to return rolling YAML string
try_dict = yaml.load(extra_vars_yaml, Loader=yaml.SafeLoader)
assert type(try_dict) is dict
debug.log('Using unprocessed YAML', header='decision', nl=2)
return extra_vars_yaml.rstrip()
except Exception:
debug.log('Failed YAML parsing, defaulting to JSON',
header='decison', nl=2)
if extra_vars == {}:
return ""
return json.dumps(extra_vars, ensure_ascii=False)
def ordered_dump(data, Dumper=yaml.Dumper, **kws):
"""Expand PyYAML's built-in dumper to support parsing OrderedDict. Return
a string as parse result of the original data structure, which includes
OrderedDict.
Args:
data: the data structure to be dumped(parsed) which is supposed to
contain OrderedDict.
Dumper: the yaml serializer to be expanded and used.
kws: extra key-value arguments to be passed to yaml.dump.
"""
class OrderedDumper(Dumper):
pass
def _dict_representer(dumper, data):
return dumper.represent_mapping(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
data.items())
OrderedDumper.add_representer(OrderedDict,
_dict_representer)
return yaml.dump(data, None, OrderedDumper, **kws)