1
0
mirror of https://github.com/projectatomic/atomic.git synced 2026-02-05 18:45:01 +01:00
Files
atomic/atomic_dbus.py
Dan Walsh 3fd6c15297 Add dbus support for atomic stop
Closes: #682
Approved by: rhatdan
2016-10-08 09:56:20 +00:00

687 lines
24 KiB
Python
Executable File

#!/usr/bin/python -Es
import dbus
import time
import threading
import dbus.service
import dbus.mainloop.glib
import json
from gi.repository import GLib # pylint: disable=no-name-in-module
import slip.dbus.service
from Atomic import Atomic
from Atomic.containers import Containers
from Atomic.delete import Delete
from Atomic.diff import Diff
from Atomic.help import AtomicHelp
from Atomic.host import Host
from Atomic.info import Info
from Atomic.install import Install
from Atomic.images import Images
from Atomic.mount import Mount
from Atomic.pull import Pull
from Atomic.run import Run
from Atomic.scan import Scan
from Atomic.stop import Stop
from Atomic.sign import Sign
from Atomic.storage import Storage
from Atomic.top import Top
import Atomic.trust as Trust
from Atomic.uninstall import Uninstall
from Atomic.verify import Verify
class atomic_dbus(slip.dbus.service.Object):
default_polkit_auth_required = "org.atomic.readwrite"
class Args():
def __init__(self):
self.display = False
self.quiet = True
self.assumeyes = True
self.remote = False
self.reboot=False
self.force = False
self.image = None
self.system = False
self.spc = False
self.storage = None
self.reg_type = None
self.recurse = False
self.debug = False
self.devices = None
self.driver = None
self.graph = None
self.import_location = None
self.export_location = None
self.compares = []
self.command = []
self.json = True
self.no_files = False
self.name = None
self.user = None
self.names_only = False
self.rpms = False
self.verbose = False
self.setvalues = []
self.scan_targets = []
self.scanner = None
self.scan_type = None
self.list = False
self.rootfs = []
self.images = []
self.delete_targets = []
self.containers = []
self.all = False
self.container = False
self.prune = False
self.heading = False
self.truncate = False
self.registry = None
self.pubkeys = []
self.keytype = None
self.trust_type = None
self.sigstoretype = None
self.sigstore = None
self.default_policy = None
self.mountpoint = None
self.options = None
self.optional = None
self.os = None
self.revision = None
self.reboot=False
self.downgrade=False
self.rebase=False
self.live = False
self.shared = False
self.sign_by = None
self.signature_path = None
self.args = []
self.diff = False
self.hotfix = False
self.refspec = None
self.preview = False
self.gnupghome = None
def __init__(self, *p, **k):
super(atomic_dbus, self).__init__(*p, **k)
self.atomic = Atomic()
self.tasks = []
self.tasks_lock = threading.Lock()
self.last_token = 0
self.scheduler_thread = threading.Thread(target = self.Scheduler)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
self.results = dict()
self.results_lock = threading.Lock()
def Scheduler(self):
while True:
current_task = None
with self.tasks_lock:
if(len(self.tasks) > 0):
current_task = self.tasks.pop(0)
if current_task is not None:
result = current_task[1].scan()
with self.results_lock:
self.results[current_task[0]] = result
time.sleep(1)
def AllocateToken(self):
with self.tasks_lock:
self.last_token += 1
return self.last_token
# atomic diff section
# The Diff method shows differences between two container images, file
# diff or RPMS.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='ssbbb',out_signature='s')
def Diff(self, src, dest, rpms, no_files, names_only):
diff = Diff()
args = self.Args()
args.compares = [src, dest]
args.verbose = True
args.no_files = no_files
args.names_only = names_only
args.rpms = rpms
diff.set_args(args)
return diff.diff()
# atomic host section
# The HostRollback switch to alternate installed tree at next boot
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='b', out_signature='')
def HostRollBack(self, reboot=False):
host=Host()
args=self.Args()
args.reboot=reboot
host.set_args(args)
return host.host_rollback()
# The HostStatus list information about all deployments
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='', out_signature='s')
def HostStatus(self):
host=Host()
args=self.Args()
args.json=True
host.set_args(args)
return host.host_status()
# The HostUpgrade upgrade to the latest Atomic tree if one is available
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='bbs', out_signature='')
def HostUpgrade(self, reboot=False, downgrade=False, os=None):
host=Host()
args=self.Args()
args.reboot=reboot
args.downgrade=downgrade
args.os=os
host.set_args(args)
return host.host_upgrade()
# The HostUpgradeDiff checks for upgrades and print package diff only
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='s', out_signature='s')
def HostUpgradeDiff(self, os=None):
host=Host()
args=self.Args()
args.diff=True
args.os=os
host.set_args(args)
return host.host_upgrade()
# The HostRebase - download and deploy a new origin refspec
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='sb', out_signature='')
def HostRebase(self, refspec, os=None):
host=Host()
args=self.Args()
args.refspec=refspec
args.os=os
host.set_args(args)
return host.host_rebase()
# The HostDeployPreview - download and deploy a new origin refspec
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='sbs', out_signature='')
def HostDeploy(self, revision, reboot, os):
host=Host()
args=self.Args()
args.revision=revision
args.reboot=reboot
args.os=os
host.set_args(args)
return host.host_deploy()
# The HostDeployPreview - download and deploy a new origin refspec
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='ss', out_signature='s')
def HostDeployPreview(self, revision, os=None):
host=Host()
args=self.Args()
args.revision=revision
args.preview=True
args.os=os
host.set_args(args)
return host.host_deploy()
# The HostUnlock - Make the current deployment mutable (for development or a hotfix)
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='b', out_signature='')
def HostUnlock(self, hotfix=False):
host=Host()
args=self.Args()
args.hotfix=hotfix
host.set_args(args)
return host.host_install()
# The HostInstall - Install a (layered) RPM package
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='as', out_signature='')
def HostInstall(self, rpms):
host=Host()
args=self.Args()
args.args=rpms
host.set_args(args)
return host.host_install()
# The HostUninstall - Remove a layered RPM package
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='as', out_signature='')
def HostUninstall(self, rpms):
host=Host()
args=self.Args()
args.args=rpms
host.set_args(args)
return host.host_uninstall()
# atomic containers section
# The ContainersList method will list all containers on the system.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='', out_signature='s')
def ContainersList(self):
c = Containers()
args = self.Args()
args.all=True
c.set_args(args)
return json.dumps(c.ps())
# atomic containers section
# The ContainersDelete method will delete one or more containers on the system.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='asbb', out_signature='')
def ContainersDelete(self, containers, all_containers, force):
c = Containers()
args = self.Args()
args.containers = containers
args.force = force
args.all = all_containers
c.set_args(args)
return c.delete()
# The ContainersTrim method will Discard unused blocks (fstrim) on rootfs of running containers.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='', out_signature='')
def ContainersTrim(self):
c = Containers()
return c.fstrim()
# atomic images section
# The ImagesHelp - Display help associated with the image
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='s', out_signature='s')
def ImagesHelp(self, image):
h = AtomicHelp()
args = self.Args()
args.image=image
h.set_args(args)
return h.help()
# atomic images section
# The ImagesInfo - display label information about an image
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='sb', out_signature='s')
def ImagesInfo(self, image, remote):
i = Info()
args = self.Args()
args.image=image
args.remote=remote
i.set_args(args)
return i.info()
# atomic images section
# The Images method will list all installed container images on the system.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='', out_signature='s')
def ImagesList(self):
images = Images()
args = self.Args()
args.all=True
images.set_args(args)
i = images.images()
return json.dumps(i)
# atomic containers section
# The ImagesDelete method will delete one or more images on the system.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='asbb', out_signature='')
def ImagesDelete(self, images, force, remote):
i = Delete()
args = self.Args()
args.delete_targets = images
args.remote = remote
args.force = force
i.set_args(args)
return i.delete_image()
# The ImagesPrune method will delete unused 'dangling' images
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='', out_signature='')
def ImagesPrune(self):
d = Delete()
return d.prune_images()
# The ImagesPull method will pull the specified image
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='sss', out_signature='')
def ImagePull(self, image, storage, reg_type):
p = Pull()
args = self.Args()
args.image = image
args.storage = storage
if reg_type != "":
args.reg_type = reg_type
p.set_args(args)
return p.pull_image()
# The ImagesUpdate method downloads the latest container image.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='sb', out_signature='')
def ImagesUpdate(self, image, force):
args = self.Args()
args.image = image
args.force = force
self.atomic.set_args(args)
self.atomic.update()
# The Vulnerable method will send back information that says
# whether or not an installed container image is vulnerable
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='', out_signature='s')
def VulnerableInfo(self):
args = self.Args()
self.atomic.set_args(args)
return self.atomic.get_all_vulnerable_info()
# atomic install section
# The Install method will install the specified image
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='sssbsasas', out_signature='')
def Install(self, image, name, user, system, remote, setvalues, extra_args):
i = Install()
args = self.Args()
args.image = image
args.name = name
args.user = user
args.system = system
args.remote = remote
args.setvalues = setvalues
args.args = extra_args
i.set_args(args)
return i.install()
# atomic mount section
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='sssbb', out_signature='')
def MountImage(self, image, mountpoint, options, live, shared):
mount = Mount()
mount.image = image
mount.mountpoint = mountpoint
args = self.Args()
args.options = options
args.live = live
args.shared = shared
self.atomic.set_args(args)
return mount.mount()
# atomic mount section
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='s', out_signature='')
def UnmountImage(self, mountpoint):
mount = Mount()
mount.mountpoint = mountpoint
return mount.unmount()
# atomic run section
# The Run method will run the specified image
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='ssbs', out_signature='')
def Run(self, image, name, spc, command):
r = Run()
args = self.Args()
args.image = image
args.name = name
args.spc = spc
args.command = command
r.set_args(args)
return r.run()
# atomic scan section
# The ScanList method will return a list of all scanners.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='',
out_signature= 's')
def ScanList(self):
scan_list = Scan()
args = self.Args()
scan_list.set_args(args)
return scan_list.get_scanners_list()
# The ScanSetup method will create the scan object.
def _ScanSetup(self, scan_targets, scanner, scan_type, rootfs, _all, images, containers):
scan = Scan()
args = self.Args()
scan.useTTY = False
if scan_targets:
args.scan_targets = scan_targets
if scanner:
args.scanner = scanner
if scan_type:
args.scan_type = scan_type
if len(scan_targets):
args.scan_targets = scan_targets
args.rootfs = rootfs
args.all = _all
args.images = images
args.containers = containers
scan.set_args(args)
return scan
# The Scan method will return a string.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='asssasbbb',
out_signature= 's')
def Scan(self, scan_targets, scanner, scan_type, rootfs, _all, images, containers):
return self._ScanSetup(scan_targets, scanner, scan_type, rootfs, _all, images, containers).scan()
# The ScheduleScan method will return a token.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='asssasbbb',
out_signature= 'x')
def ScheduleScan(self, scan_targets, scanner, scan_type, rootfs, _all, images, containers):
scan = self._ScanSetup(scan_targets, scanner, scan_type, rootfs, _all, images, containers)
token = self.AllocateToken()
with self.tasks_lock:
self.tasks.append((token, scan))
return token
# The GetScanResults method will determine whether or not the results for
# the token are ready.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='x',
out_signature= 's')
def GetScanResults(self, token):
with self.results_lock:
if token in self.results:
ret = self.results[token]
del self.results[token]
return ret
else:
return ""
# atomic sign section
# The create a signature for images which can be used later to verify them.
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='assss', out_signature='s')
def Sign(self, images, sign_by, signature_path, gnupghome):
sign = Sign()
args = self.Args()
args.images = images
args.sign_by = sign_by
args.signature_path = signature_path
args.gnupghome = gnupghome
sign.set_args(args)
return sign.sign()
# atomic stop section
# The Stop method will stop the specified image
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='ssas', out_signature='')
def Stop(self, image, name, extra_args):
i = Stop()
args = self.Args()
args.image = image
args.name = name
args.args = extra_args
i.set_args(args)
return i.stop()
# atomic storage section
# The StorageReset method deletes all containers and images from a system.
# Resets storage to its initial configuration.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='', out_signature='')
def StorageReset(self):
storage = Storage()
# No arguments are passed for storage_reset function
args = self.Args()
storage.set_args(args)
storage.reset()
# The StorageImport method imports all containers and their associated
# contents from a filesystem directory.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='ss', out_signature='')
def StorageImport(self, graph, import_location):
storage = Storage()
args = self.Args()
args.graph = graph
args.import_location = import_location
storage.set_args(args)
storage.Import()
# The StorageExport method exports all containers and their associated
# contents into a filesystem directory.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='ssb', out_signature='')
def StorageExport(self, graph="/var/lib/docker", export_location="/var/lib/atomic/migrate", force = False):
storage = Storage()
args = self.Args()
args.graph = graph
args.export_location = export_location
args.force = force
storage.set_args(args)
storage.Export()
# The StorageModify method modifies the default storage setup.
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='asv', out_signature='')
def StorageModify(self, devices=None, driver = None):
storage = Storage()
args = self.Args()
if devices:
args.devices = devices
else:
args.devices = []
args.driver = driver
storage.set_args(args)
storage.modify()
# atomic top section
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='ass', out_signature='s')
def Top(self, containers, optional):
top = Top()
args = self.Args()
args.containers = containers
args.optional = optional
top.set_args(args)
return top.json()
# atomic trust section
# The TrustShow displays system trust policy
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='', out_signature='s')
def TrustShow(self):
trust = Trust.Trust()
args = self.Args()
trust.set_args(args)
return json.dumps(trust.show())
# TrustAdd adds public key trust to specific registry repository
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='ssassss', out_signature='')
def TrustAdd(self, registry, trusttype, pubkeys, keytype, sigstore, sigstoretype):
trust = Trust.Trust()
args = self.Args()
args.registry = registry
args.pubkeys = pubkeys
args.keytype = keytype
args.trust_type = trusttype
args.sigstoretype = sigstoretype
args.sigstore = sigstore
trust.set_args(args)
trust.add()
# TrustAdd removes public key trust to specific registry repository
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='ss', out_signature='')
def TrustDelete(self, registry, sigstoretype):
trust = Trust.Trust()
args = self.Args()
args.sigstoretype = sigstoretype
args.registry = registry
trust.set_args(args)
trust.delete()
# TrustDefaultPolicy sets the default container image trust for the system
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='s', out_signature='')
def TrustDefaultPolicy(self, default_policy):
trust = Trust.Trust()
args = self.Args()
args.default_policy = default_policy
trust.set_args(args)
return trust.modify_default()
# atomic uninstall section
# The Uninstall method will uninstall the specified image
@slip.dbus.polkit.require_auth("org.atomic.readwrite")
@dbus.service.method("org.atomic", in_signature='ssbas', out_signature='')
def Uninstall(self, image, name, force, extra_args):
i = Uninstall()
args = self.Args()
args.image = image
args.name = name
args.force = force
args.args = extra_args
i.set_args(args)
return i.uninstall()
# atomic upload section
# atomic verify section
# The Verify method takes in an image name and returns whether or not the
# image should be updated
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='as', out_signature='s')
def Verify(self, images):
verifications = []
verify = Verify()
verify.useTTY = False
for image in images:
args = self.Args()
args.image = image
verify.set_args(args)
verifications.append({"Image": image,
"Verification": verify.verify()}) #pylint: disable=no-member
return json.dumps(verifications)
# atomic version section
# The Version method takes in an image name and returns its version
# information
@slip.dbus.polkit.require_auth("org.atomic.read")
@dbus.service.method("org.atomic", in_signature='asb',
out_signature='aa{sv}')
def ImagesVersion(self, images, recurse=False):
versions = []
for image in images:
args = self.Args()
args.image = image
args.recurse = recurse
self.atomic.set_args(args)
versions.append({"Image": image,
"Version": self.atomic.version()})
return versions
if __name__ == "__main__":
mainloop = GLib.MainLoop()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
system_bus = dbus.SystemBus()
dbus.service.BusName("org.atomic", system_bus)
atomic_dbus(system_bus, "/org/atomic/object")
slip.dbus.service.set_mainloop(mainloop)
mainloop.run()