mirror of
https://github.com/ansible/tower-cli.git
synced 2026-02-06 09:47:55 +01:00
381 lines
15 KiB
Python
381 lines
15 KiB
Python
# Copyright 2017, Ansible by Red Hat
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import absolute_import, division
|
|
|
|
import functools
|
|
import inspect
|
|
import json
|
|
import yaml
|
|
import math
|
|
import re
|
|
from copy import copy
|
|
|
|
import six
|
|
|
|
import click
|
|
|
|
from tower_cli.conf import settings, with_global_options
|
|
from tower_cli.utils import parser, debug, secho
|
|
from tower_cli.cli.action import ActionSubcommand
|
|
from tower_cli.exceptions import MultipleRelatedError
|
|
from tower_cli.cli.types import StructuredInput
|
|
|
|
|
|
class ResSubcommand(click.MultiCommand):
|
|
"""A subcommand that implements all command methods on the
|
|
Resource.
|
|
"""
|
|
def __init__(self, resource, *args, **kwargs):
|
|
self.resource = resource
|
|
self.resource_name = getattr(
|
|
resource, 'resource_name',
|
|
resource.__module__.split('.')[-1]
|
|
)
|
|
self.resource_name = self.resource_name.replace('_', ' ')
|
|
super(ResSubcommand, self).__init__(
|
|
*args,
|
|
help=self.resource.cli_help,
|
|
**kwargs
|
|
)
|
|
|
|
def list_commands(self, ctx):
|
|
"""Return a list of all methods decorated with the
|
|
@resources.command decorator.
|
|
"""
|
|
return self.resource.commands
|
|
|
|
def _auto_help_text(self, help_text):
|
|
"""Given a method with a docstring, convert the docstring
|
|
to more CLI appropriate wording, and also disambiguate the
|
|
word "object" on the base class docstrings.
|
|
"""
|
|
# Delete API docs if there are any.
|
|
api_doc_delimiter = '=====API DOCS====='
|
|
begin_api_doc = help_text.find(api_doc_delimiter)
|
|
if begin_api_doc >= 0:
|
|
end_api_doc = help_text.rfind(api_doc_delimiter) + len(api_doc_delimiter)
|
|
help_text = help_text[:begin_api_doc] + help_text[end_api_doc:]
|
|
# Convert the word "object" to the appropriate type of
|
|
# object being modified (e.g. user, organization).
|
|
an_prefix = ('a', 'e', 'i', 'o')
|
|
if not self.resource_name.lower().startswith(an_prefix):
|
|
help_text = help_text.replace('an object',
|
|
'a %s' % self.resource_name)
|
|
if self.resource_name.lower().endswith('y'):
|
|
help_text = help_text.replace(
|
|
'objects',
|
|
'%sies' % self.resource_name[:-1],
|
|
)
|
|
help_text = help_text.replace('object', self.resource_name)
|
|
|
|
# Convert some common Python terms to their CLI equivalents.
|
|
help_text = help_text.replace('keyword argument', 'option')
|
|
help_text = help_text.replace('raise an exception',
|
|
'abort with an error')
|
|
|
|
# Convert keyword arguments specified in docstrings enclosed
|
|
# by backticks to switches.
|
|
for match in re.findall(r'`([\w_]+)`', help_text):
|
|
option = '--%s' % match.replace('_', '-')
|
|
help_text = help_text.replace('`%s`' % match, option)
|
|
|
|
# Done; return the new help text.
|
|
return help_text
|
|
|
|
def _echo_method(self, method):
|
|
"""Given a method, return a method that runs the internal
|
|
method and echos the result.
|
|
"""
|
|
@functools.wraps(method)
|
|
def func(*args, **kwargs):
|
|
# Echo warning if this method is deprecated.
|
|
if getattr(method, 'deprecated', False):
|
|
debug.log('This method is deprecated in Tower 3.0.', header='warning')
|
|
|
|
result = method(*args, **kwargs)
|
|
|
|
# If this was a request that could result in a modification
|
|
# of data, print it in Ansible coloring.
|
|
color_info = {}
|
|
if isinstance(result, dict) and 'changed' in result:
|
|
if result['changed']:
|
|
color_info['fg'] = 'yellow'
|
|
else:
|
|
color_info['fg'] = 'green'
|
|
|
|
# Piece together the result into the proper format.
|
|
format = getattr(self, '_format_%s' % (getattr(method, 'format_freezer', None) or settings.format))
|
|
output = format(result)
|
|
|
|
# Perform the echo.
|
|
secho(output, **color_info)
|
|
return func
|
|
|
|
def _format_json(self, payload):
|
|
"""Convert the payload into a JSON string with proper
|
|
indentation and return it.
|
|
"""
|
|
return json.dumps(payload, indent=2)
|
|
|
|
def _format_yaml(self, payload):
|
|
"""Convert the payload into a YAML string with proper
|
|
indentation and return it.
|
|
"""
|
|
return parser.ordered_dump(payload, Dumper=yaml.SafeDumper,
|
|
default_flow_style=False)
|
|
|
|
def _format_id(self, payload):
|
|
"""Echos only the id"""
|
|
if 'id' in payload:
|
|
return str(payload['id'])
|
|
if 'results' in payload:
|
|
return ' '.join([six.text_type(item['id']) for item in payload['results']])
|
|
raise MultipleRelatedError('Could not serialize output with id format.')
|
|
|
|
@staticmethod
|
|
def get_print_value(data, col):
|
|
value = data.get(col, 'N/A')
|
|
is_bool = isinstance(value, bool)
|
|
value = six.text_type(value)
|
|
if is_bool:
|
|
value = value.lower()
|
|
return value
|
|
|
|
def _format_human(self, payload):
|
|
"""Convert the payload into an ASCII table suitable for
|
|
printing on screen and return it.
|
|
"""
|
|
page = None
|
|
total_pages = None
|
|
|
|
# What are the columns we will show?
|
|
columns = [field.name for field in self.resource.fields
|
|
if field.display or settings.description_on and
|
|
field.name == 'description']
|
|
columns.insert(0, 'id')
|
|
|
|
# Save a dictionary-by-name of fields for later use
|
|
fields_by_name = {}
|
|
for field in self.resource.fields:
|
|
fields_by_name[field.name] = field
|
|
|
|
# Sanity check: If there is a "changed" key in our payload
|
|
# and little else, we print a short message and not a table.
|
|
# this specifically applies to deletion
|
|
if 'changed' in payload and 'id' not in payload:
|
|
return 'OK. (changed: {0})'.format(
|
|
six.text_type(payload['changed']).lower(),
|
|
)
|
|
|
|
# Sanity check: If there is no ID and no results, then this
|
|
# is unusual output; keep our table formatting, but plow
|
|
# over the columns-as-keys stuff above.
|
|
# this originally applied to launch/status/update methods
|
|
# but it may become deprecated
|
|
if 'id' not in payload and 'results' not in payload:
|
|
columns = [i for i in payload.keys()]
|
|
|
|
# Get our raw rows into a standard format.
|
|
if 'results' in payload:
|
|
raw_rows = payload['results']
|
|
if payload.get('count', 0) > len(payload['results']):
|
|
prev = payload.get('previous', 0) or 0
|
|
page = prev + 1
|
|
count = payload['count']
|
|
if payload.get('next', None):
|
|
total_pages = math.ceil(count / len(raw_rows))
|
|
else:
|
|
total_pages = page
|
|
else:
|
|
raw_rows = [payload]
|
|
|
|
# If we have no rows to display, return this information
|
|
# and don't do any further processing.
|
|
if not raw_rows:
|
|
return 'No records found.'
|
|
|
|
# Determine the width for each column.
|
|
widths = {}
|
|
for col in columns:
|
|
widths[col] = max(
|
|
len(col),
|
|
*[len(self.get_print_value(i, col)) for i in raw_rows]
|
|
)
|
|
fd = fields_by_name.get(col, None)
|
|
if fd is not None and fd.col_width is not None:
|
|
widths[col] = fd.col_width
|
|
|
|
# It's possible that the column widths will exceed our terminal
|
|
# width; if so, reduce column widths accordingly.
|
|
# TODO: Write this.
|
|
|
|
# Put together the divider row.
|
|
# This is easy and straightforward: it's simply a table divider
|
|
# using the widths calculated.
|
|
divider_row = ''
|
|
for col in columns:
|
|
divider_row += '=' * widths[col] + ' '
|
|
divider_row.rstrip()
|
|
|
|
# Put together the header row.
|
|
# This is also easy and straightforward; simply center the
|
|
# headers (which str.format does for us!).
|
|
header_row = ''
|
|
for col in columns:
|
|
header_row += ('{0:^%d}' % widths[col]).format(col) + ' '
|
|
header_row.rstrip()
|
|
|
|
# Piece together each row of data.
|
|
data_rows = []
|
|
for raw_row in raw_rows:
|
|
data_row = ''
|
|
for col in columns:
|
|
template = six.text_type('{0:%d}') % widths[col]
|
|
value = self.get_print_value(raw_row, col)
|
|
# Right-align certain native data types
|
|
if isinstance(raw_row.get(col, 'N/A'), (bool, int)):
|
|
template = template.replace('{0:', '{0:>')
|
|
# Truncate the cell entry if exceeds manually
|
|
# specified column width limit
|
|
fd = fields_by_name.get(col, None)
|
|
if fd is not None and fd.col_width is not None:
|
|
str_value = template.format(value or '')
|
|
if len(str_value) > fd.col_width:
|
|
value = str_value[:fd.col_width]
|
|
data_row += template.format(value or '') + ' '
|
|
data_rows.append(data_row.rstrip())
|
|
|
|
# Result the resulting table.
|
|
response = '\n'.join((
|
|
divider_row, header_row, divider_row,
|
|
'\n'.join(data_rows),
|
|
divider_row,
|
|
))
|
|
# Don't print page numbers for 1 page results
|
|
if page and total_pages != 1:
|
|
response += '(Page %d of %d.)' % (page, total_pages)
|
|
if payload.get('changed', False):
|
|
response = 'Resource changed.\n' + response
|
|
return response
|
|
|
|
def get_command(self, ctx, name):
|
|
"""Retrieve the appropriate method from the Resource,
|
|
decorate it as a click command, and return that method.
|
|
"""
|
|
# Sanity check: Does a method exist corresponding to this
|
|
# command? If not, None is returned for click to raise
|
|
# exception.
|
|
if not hasattr(self.resource, name):
|
|
return None
|
|
|
|
# Get the method.
|
|
method = getattr(self.resource, name)
|
|
|
|
# Get any attributes that were given at command-declaration
|
|
# time.
|
|
attrs = getattr(method, '_cli_command_attrs', {})
|
|
|
|
# If the help message comes from the docstring, then
|
|
# convert it into a message specifically for this resource.
|
|
help_text = inspect.getdoc(method)
|
|
attrs['help'] = self._auto_help_text(help_text or '')
|
|
|
|
# On some methods, we ignore the defaults, which are intended
|
|
# for writing and not reading; process this.
|
|
ignore_defaults = attrs.pop('ignore_defaults', False)
|
|
|
|
# Wrap the method, such that it outputs its final return
|
|
# value rather than returning it.
|
|
new_method = self._echo_method(method)
|
|
|
|
# Soft copy the "__click_params__", if any exist.
|
|
# This is the internal holding method that the click library
|
|
# uses to store @click.option and @click.argument directives
|
|
# before the method is converted into a command.
|
|
#
|
|
# Because self._echo_method uses @functools.wraps, this is
|
|
# actually preserved; the purpose of copying it over is
|
|
# so we can get our resource fields at the top of the help;
|
|
# the easiest way to do this is to load them in before the
|
|
# conversion takes place. (This is a happy result of Armin's
|
|
# work to get around Python's processing decorators
|
|
# bottom-to-top.)
|
|
click_params = getattr(method, '__click_params__', [])
|
|
new_method.__click_params__ = copy(click_params)
|
|
new_method = with_global_options(new_method)
|
|
|
|
# Write options based on the fields available on this resource.
|
|
fao = attrs.pop('use_fields_as_options', True)
|
|
if fao:
|
|
for field in reversed(self.resource.fields):
|
|
if not field.is_option:
|
|
continue
|
|
|
|
# If we got an iterable rather than a boolean,
|
|
# then it is a list of fields to use; check for
|
|
# presence in that list.
|
|
if not isinstance(fao, bool) and field.name not in fao:
|
|
continue
|
|
|
|
# Create the initial arguments based on the
|
|
# option value. If we have a different key to use
|
|
# (which is what gets routed to the Tower API),
|
|
# ensure that is the first argument.
|
|
args = [field.option]
|
|
if field.key:
|
|
args.insert(0, field.key)
|
|
|
|
# short name aliases for common flags
|
|
short_fields = {
|
|
'name': 'n',
|
|
'description': 'd',
|
|
'inventory': 'i',
|
|
'extra_vars': 'e'
|
|
}
|
|
if field.name in short_fields:
|
|
args.append('-'+short_fields[field.name])
|
|
|
|
# Apply the option to the method.
|
|
option_help = field.help
|
|
if isinstance(field.type, StructuredInput):
|
|
option_help += ' Use @ to get JSON or YAML from a file.'
|
|
if field.required:
|
|
option_help = '[REQUIRED] ' + option_help
|
|
elif field.read_only:
|
|
option_help = '[READ ONLY] ' + option_help
|
|
option_help = '[FIELD]' + option_help
|
|
click.option(
|
|
*args,
|
|
default=field.default if not ignore_defaults else None,
|
|
help=option_help,
|
|
type=field.type,
|
|
show_default=field.show_default,
|
|
multiple=field.multiple,
|
|
is_eager=False
|
|
)(new_method)
|
|
|
|
# Make a click Command instance using this method
|
|
# as the callback, and return it.
|
|
cmd = click.command(name=name, cls=ActionSubcommand, **attrs)(new_method)
|
|
|
|
# If this method has a `pk` positional argument,
|
|
# then add a click argument for it.
|
|
code = six.get_function_code(method)
|
|
if 'pk' in code.co_varnames:
|
|
click.argument('pk', nargs=1, required=False, type=str, metavar='[ID]')(cmd)
|
|
|
|
# Done; return the command.
|
|
return cmd
|