1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-05 09:44:59 +01:00
Files
atomic/atomic_dbus_client.py
baude 72a805e1ee Add verification options to dbus
There were three unused verification options normally available to the
atomic CLI but absent in the dbus API: no_validate, verbose, and
storage.  These were added to the dbus client and server.

Signed-off-by: baude <bbaude@redhat.com>

Closes: #1086
Approved by: rhatdan
2017-08-30 17:58:58 +00:00

273 lines
11 KiB
Python
Executable File

#! /usr/bin/python3 -Es
import sys
import dbus
import time
from Atomic import util
from slip.dbus import polkit
import dbus.service
import dbus.mainloop.glib
ATOMIC_CONFIG = util.get_atomic_config()
_storage = ATOMIC_CONFIG.get('default_storage', "docker")
class AtomicDBus (object):
def __init__(self):
self.bus = dbus.SystemBus()
self.dbus_object = self.bus.get_object("org.atomic", "/org/atomic/object")
@polkit.enable_proxy
def ContainersList(self):
return self.dbus_object.ContainersList(dbus_interface="org.atomic")
@polkit.enable_proxy
def ContainersDelete(self, containers, containers_all=False, force=False, storage=_storage):
if not isinstance(containers, (list, tuple)):
containers = [ containers ]
return self.dbus_object.ContainersDelete(containers, containers_all, force, storage, dbus_interface="org.atomic" )
@polkit.enable_proxy
def ContainersTrim(self):
return self.dbus_object.ContainersTrim(dbus_interface="org.atomic", timeout = 2147400)
@polkit.enable_proxy
def Diff(self, first, second, rpms=False, no_files=False, names_only=False, diff_keywords=None, metadata=False):
if not diff_keywords:
diff_keywords = []
assert(isinstance(diff_keywords, list))
if diff_keywords is None:
diff_keywords = []
return self.dbus_object.Diff(first, second, rpms, no_files, names_only, diff_keywords, metadata, dbus_interface="org.atomic", timeout = 2147400)
@polkit.enable_proxy
def Stop(self, name):
return self.dbus_object.Stop(name, dbus_interface="org.atomic", timeout = 2147400)
@polkit.enable_proxy
def StorageExport(self, graph, export_location, force):
self.dbus_object.StorageExport(graph, export_location, force, dbus_interface="org.atomic")
@polkit.enable_proxy
def StorageImport(self, graph, import_location):
self.dbus_object.StorageImport(graph, import_location, dbus_interface="org.atomic")
@polkit.enable_proxy
def StorageModify(self, devices, driver):
self.dbus_object.StorageModify(devices, driver, dbus_interface="org.atomic")
@polkit.enable_proxy
def StorageReset(self):
self.dbus_object.StorageReset(dbus_interface="org.atomic")
@polkit.enable_proxy
def AsyncScan(self, scan_targets, scanner, scan_type, rootfs, _all, images, containers):
token = self.dbus_object.ScheduleScan(scan_targets, scanner, scan_type, rootfs, _all, images, containers, dbus_interface="org.atomic", timeout = 2147400)
while(True):
ret = self.dbus_object.GetScanResults(token, dbus_interface="org.atomic")
if ret:
break
time.sleep(1)
return ret
@polkit.enable_proxy
def ImagesDelete(self, images, force=False, remote=False, storage='docker'):
if not isinstance(images, (list, tuple)):
images = [ images ]
return self.dbus_object.ImagesDelete(images, force, remote, storage, dbus_interface="org.atomic")
@polkit.enable_proxy
def ImagesTag(self, src, target, storage='docker'):
return self.dbus_object.ImagesTag(src, target, storage, dbus_interface="org.atomic")
@polkit.enable_proxy
def ImagesHelp(self, image):
return self.dbus_object.ImagesHelp(image, dbus_interface="org.atomic")
@polkit.enable_proxy
def ImagesInfo(self, image, remote=False):
return self.dbus_object.ImagesInfo(image, remote, dbus_interface="org.atomic")
@polkit.enable_proxy
def ImagesList(self):
return self.dbus_object.ImagesList(dbus_interface="org.atomic")
@polkit.enable_proxy
def ImagesPrune(self):
return self.dbus_object.ImagesPrune(dbus_interface="org.atomic")
@polkit.enable_proxy
def ImagePush(self, image, pulp, satellite, verify_ssl, url, username, password, activation_key, repo_id,
registry_type, sign_by, gnupghome, insecure, anonymous):
return self.dbus_object.ImagePush(image, pulp, satellite, verify_ssl, url, username, password,
activation_key, repo_id, registry_type, sign_by, gnupghome,
insecure, anonymous)
# The ImagesPull method will pull the specified image
@polkit.enable_proxy
def ImagePull(self, image, storage="docker", reg_type=""):
return self.dbus_object.ImagePull(image, storage, reg_type, dbus_interface="org.atomic", timeout = 2147400)
def ImageUpdate(self, image, force=False):
return self.dbus_object.ImageUpdate(image, force, dbus_interface="org.atomic")
@polkit.enable_proxy
def ImageVersion(self, image, recurse=False):
return self.dbus_object.ImageVersion(image, recurse, dbus_interface="org.atomic")
@polkit.enable_proxy
def Install(self, image, name='', system=False, remote=False, storage=_storage, user=False, system_package='auto', setvalues=''):
if not name:
name = image
if not setvalues:
setvalues = []
if not isinstance(setvalues, (list, tuple)):
setvalues = [ setvalues ]
return self.dbus_object.Install(image, name, system, remote, storage, user, system_package, setvalues, dbus_interface="org.atomic", timeout = 2147400)
@polkit.enable_proxy
def MountImage(self, src, dest, options="", live=False, shared=False):
ret = self.dbus_object.MountImage(src, dest, options, live, shared, dbus_interface="org.atomic")
return ret
# The Run method will create and run a container on the specified image
@polkit.enable_proxy
def Run(self, image, name=None, spc=False, detach=False, ignore=False, command=None):
if not name:
name = image
if not command:
command = []
if not isinstance(command, (list, tuple)):
command = [ command ]
return self.dbus_object.Run(image, name, spc, detach, ignore, command, dbus_interface="org.atomic", timeout = 2147400)
@polkit.enable_proxy
def Scan(self, scan_targets, scanner, scan_type, rootfs, _all, images, containers):
return self.dbus_object.Scan(scan_targets, scanner, scan_type, rootfs, _all, images, containers, dbus_interface="org.atomic", timeout = 2147400)
@polkit.enable_proxy
def ScanList(self):
ret = self.dbus_object.ScanList(dbus_interface="org.atomic")
return ret
@polkit.enable_proxy
def Sign(self, images, sign_by, signature_path = "", gnupghome=""):
if not isinstance(images, (list, tuple)):
images = [ images ]
return self.dbus_object.Sign(images, sign_by, signature_path, gnupghome, dbus_interface="org.atomic")
@polkit.enable_proxy
def Top(self, containers=None, options=""):
if not containers:
containers=[]
if not isinstance(containers, (list, tuple)):
containers = [ containers ]
return self.dbus_object.Top(containers, options, dbus_interface="org.atomic")
@polkit.enable_proxy
def TrustAdd(self, registry, trusttype="reject", pubkeys="", keytype="GPGKeys", sigstore="", sigstoretype="web"):
return self.dbus_object.TrustAdd(registry, trusttype, pubkeys, keytype, sigstore, sigstoretype, dbus_interface="org.atomic")
@polkit.enable_proxy
def TrustDefaultPolicy(self, policy):
return self.dbus_object.TrustDefaultPolicy(policy, dbus_interface="org.atomic")
@polkit.enable_proxy
def TrustDelete(self, registry, trusttype="None"):
return self.dbus_object.TrustDelete(registry, trusttype, dbus_interface="org.atomic")
@polkit.enable_proxy
def TrustShow(self):
return self.dbus_object.TrustShow(dbus_interface="org.atomic")
@polkit.enable_proxy
def Uninstall(self, image, name=None, force=False, storage=None, ignore=False, extra_args=None):
if not name:
name = image
if not extra_args:
extra_args = []
if not storage:
storage = ''
if not isinstance(extra_args, (list, tuple)):
extra_args = [ extra_args ]
return self.dbus_object.Uninstall(image, name, force, storage, ignore, extra_args, dbus_interface="org.atomic", timeout = 2147400)
@polkit.enable_proxy
def UnmountImage(self, dest):
ret = self.dbus_object.UnmountImage(dest, dbus_interface="org.atomic")
return ret
@polkit.enable_proxy
def Verify(self, image, no_validate, verbose, storage):
ret = self.dbus_object.Verify(image, no_validate, verbose, storage, dbus_interface="org.atomic")
return ret
@polkit.enable_proxy
def vulnerable(self):
return self.dbus_object.VulnerableInfo(dbus_interface="org.atomic")
@polkit.enable_proxy
def GetScanResultsById(self, iid):
return self.dbus_object.GetScanResultsById(iid, dbus_interface="org.atomic")
#For outputting the list of scanners
def print_scan_list(all_scanners):
if len(all_scanners) == 0:
util.write_out("There are no scanners configured for this system.")
sys.exit(0)
default_scanner = (util.get_atomic_config())['default_scanner']
if default_scanner is None:
default_scanner = ''
for scanner in all_scanners:
scanner_name = scanner['scanner_name']
df = '* ' if scanner_name == default_scanner else ''
default_scan_type = scanner.get('default_scan')
if default_scan_type is None:
raise ValueError("Invalid configuration file: At least one scan type must be "
"declared as the default for {}.".format(scanner_name))
util.write_out("Scanner: {} {}".format(scanner_name, df))
util.write_out("{}Image Name: {}".format(" " * 2, scanner['image_name']))
for scan_type in scanner['scans']:
df = '* ' if default_scan_type == scan_type['name'] else ''
util.write_out("{}Scan type: {} {}".format(" " * 5, scan_type['name'], df))
util.write_out("{}Description: {}\n".format(" " * 5, scan_type['description']))
util.write_out("\n* denotes defaults")
sys.exit(0)
def is_number(var):
try:
int(var)
return True
except ValueError:
return False
def convert_str(val):
if val in ("True", "False"):
return val
if is_number(val):
return val
if val[0] == "[":
return val
return "\"%s\"" % val
if __name__ == "__main__":
dbus_proxy = AtomicDBus()
cmd="dbus_proxy.%s(" % sys.argv[1]
if len(sys.argv[2:]) > 0:
cmd+=convert_str(sys.argv[2])
for i in sys.argv[3:]:
cmd+=","
cmd+=convert_str(i)
cmd+=")"
print(cmd)
try:
s = eval(cmd) # pylint: disable=eval-used
if s:
print(s)
except dbus.exceptions.DBusException as e:
print(e.get_dbus_message())
sys.exit(-1)