mirror of
https://github.com/containers/crun.git
synced 2026-02-06 00:46:06 +01:00
Improve error logging: - Add test environment context (uid, rootless, cgroup_v2, cgroup_manager) when tests fail, helping diagnose environment-specific issues Add new command tests: - test_state_created_container: Test state on created but not started container - test_state_stopped_container: Test state on stopped container - test_features_command: Test features command returns valid JSON - test_ps_json_format: Test ps with JSON format - test_delete_force: Test force delete on running container - test_start_command: Test start on created container - test_version_command: Test version output - test_help_command: Test help output Add new exec tests: - test_exec_cwd: Test exec with working directory - test_exec_process_json: Test exec with process.json file - test_exec_detach: Test exec with detach option - test_exec_multiple: Test multiple exec calls - test_exec_exit_code: Test exit code propagation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
710 lines
24 KiB
Python
Executable File
710 lines
24 KiB
Python
Executable File
#!/bin/env python3
|
|
# crun - OCI runtime written in C
|
|
#
|
|
# Copyright (C) 2017, 2018, 2019 Giuseppe Scrivano <giuseppe@scrivano.org>
|
|
# crun is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# crun is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with crun. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from tests_utils import *
|
|
import time
|
|
|
|
def test_exec():
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
out = run_crun_command(["exec", cid, "/init", "echo", "foo"])
|
|
if "foo" not in out:
|
|
logger.info("exec test failed: expected 'foo' in output")
|
|
logger.info("container ID: %s", cid)
|
|
logger.info("actual output: %s", out)
|
|
return -1
|
|
except Exception as e:
|
|
logger.info("exec test failed with exception: %s", e)
|
|
if cid is not None:
|
|
logger.info("container ID: %s", cid)
|
|
raise
|
|
finally:
|
|
if cid is not None:
|
|
try:
|
|
run_crun_command(["delete", "-f", cid])
|
|
except Exception as cleanup_e:
|
|
logger.info("warning: failed to cleanup container %s: %s", cid, cleanup_e)
|
|
return 0
|
|
|
|
def test_uid_tty():
|
|
# we need at least two uids
|
|
if is_rootless():
|
|
return 77
|
|
|
|
if os.isatty(1) == False:
|
|
return 77
|
|
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
conf['process']['terminal'] = True
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
ret = 1
|
|
last_error = None
|
|
try:
|
|
cid = "container-%s" % os.getpid()
|
|
proc = run_and_get_output(conf, hide_stderr=True, command='run', id_container=cid, use_popen=True)
|
|
for i in range(0, 500):
|
|
try:
|
|
out = run_crun_command(["exec", "-t", "--user", "1", cid, "/init", "owner", "/proc/self/fd/0"])
|
|
if "1:" in out:
|
|
ret = 0
|
|
break
|
|
except Exception as e:
|
|
last_error = e
|
|
pass
|
|
time.sleep(0.01)
|
|
if ret != 0:
|
|
logger.info("uid_tty test failed after 500 attempts")
|
|
logger.info("container ID: %s", cid)
|
|
if last_error:
|
|
logger.info("last error: %s", e)
|
|
return ret
|
|
finally:
|
|
if cid is not None:
|
|
try:
|
|
run_crun_command(["delete", "-f", cid])
|
|
except Exception as cleanup_e:
|
|
logger.info("warning: failed to cleanup container %s: %s", cid, cleanup_e)
|
|
return 0
|
|
|
|
def test_exec_root_netns_with_userns():
|
|
if is_rootless():
|
|
return 77
|
|
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf, netns=False)
|
|
conf['linux']['namespaces'].append({"type" : "network", "path" : "/proc/1/ns/net"})
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
with open("/proc/net/route") as f:
|
|
payload = f.read()
|
|
host_routes = [i.split('\t')[0] for i in payload.split('\n')[1:] if i.strip()]
|
|
|
|
out = run_crun_command(["exec", cid, "/init", "cat", "/proc/net/route"])
|
|
|
|
container_routes = [i.split('\t')[0] for i in out.split('\n')[1:] if i.strip()]
|
|
|
|
if len(container_routes) != len(host_routes):
|
|
logger.info("network namespace test failed: different route count")
|
|
logger.info("host routes (%d): %s", len(host_routes), host_routes)
|
|
logger.info("container routes (%d): %s", len(container_routes), container_routes)
|
|
return -1
|
|
|
|
host_routes.sort()
|
|
container_routes.sort()
|
|
|
|
for i, (container_route, host_route) in enumerate(zip(container_routes, host_routes)):
|
|
if container_route != host_route:
|
|
logger.info("network namespace test failed: route mismatch at index %d", i)
|
|
logger.info("expected (host): %s", host_route)
|
|
logger.info("actual (container): %s", container_route)
|
|
logger.info("full host routes: %s", host_routes)
|
|
logger.info("full container routes: %s", container_routes)
|
|
return -1
|
|
except Exception as e:
|
|
logger.info("network namespace test failed with exception: %s", e)
|
|
if cid is not None:
|
|
logger.info("container ID: %s", cid)
|
|
raise
|
|
finally:
|
|
if cid is not None:
|
|
try:
|
|
run_crun_command(["delete", "-f", cid])
|
|
except Exception as cleanup_e:
|
|
logger.info("warning: failed to cleanup container %s: %s", cid, cleanup_e)
|
|
return 0
|
|
|
|
def test_exec_not_exists_helper(detach):
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
try:
|
|
if detach:
|
|
out = run_crun_command(["exec", "-d", cid, "/not.here"])
|
|
else:
|
|
out = run_crun_command(["exec", cid, "/not.here"])
|
|
except Exception as e:
|
|
return 0
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
return 1
|
|
|
|
def test_exec_not_exists():
|
|
return test_exec_not_exists_helper(False)
|
|
|
|
def test_exec_detach_not_exists():
|
|
return test_exec_not_exists_helper(True)
|
|
|
|
def test_exec_additional_gids():
|
|
if is_rootless():
|
|
return 77
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
tempdir = tempfile.mkdtemp()
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
process_file = os.path.join(tempdir, "process.json")
|
|
with open(process_file, "w") as f:
|
|
json.dump({
|
|
"user": {
|
|
"uid": 0,
|
|
"gid": 0,
|
|
"additionalGids": [432]
|
|
},
|
|
"terminal": False,
|
|
"args": [
|
|
"/init",
|
|
"groups"
|
|
],
|
|
"env": [
|
|
"PATH=/bin",
|
|
"TERM=xterm"
|
|
],
|
|
"cwd": "/",
|
|
"noNewPrivileges": True
|
|
}, f)
|
|
out = run_crun_command(["exec", "--process", process_file, cid])
|
|
if "432" not in out:
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
shutil.rmtree(tempdir)
|
|
return 0
|
|
|
|
def test_exec_populate_home_env_from_process_uid():
|
|
if is_rootless():
|
|
return 77
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
tempdir = tempfile.mkdtemp()
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
process_file = os.path.join(tempdir, "process.json")
|
|
with open(process_file, "w") as f:
|
|
json.dump({
|
|
"user": {
|
|
"uid": 1000,
|
|
"gid": 1000,
|
|
"additionalGids": [1000]
|
|
},
|
|
"terminal": False,
|
|
"args": [
|
|
"/init",
|
|
"printenv",
|
|
"HOME"
|
|
],
|
|
"env": [
|
|
"PATH=/bin",
|
|
"TERM=xterm"
|
|
],
|
|
"cwd": "/",
|
|
"noNewPrivileges": True
|
|
}, f)
|
|
out = run_crun_command(["exec", "--process", process_file, cid])
|
|
if "/var/empty" not in out:
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
shutil.rmtree(tempdir)
|
|
return 0
|
|
|
|
def test_exec_add_capability():
|
|
"""Specify an additional capability to add to the process"""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
conf['process']['capabilities'] = {}
|
|
cid = None
|
|
cap_unknown_dict = {"CapInh":"0000000000000000", \
|
|
"CapPrm":"0000000000000000", \
|
|
"CapEff":"0000000000000000", \
|
|
"CapBnd":"0000000000000000", \
|
|
"CapAmb":"0000000000000000"}
|
|
cap_kill_dict = {"CapInh":"0000000000000000", \
|
|
"CapPrm":"0000000000000020", \
|
|
"CapEff":"0000000000000020", \
|
|
"CapBnd":"0000000000000020", \
|
|
"CapAmb":"0000000000000000"}
|
|
cap_sys_admin_dict = {"CapInh":"0000000000000000", \
|
|
"CapPrm":"0000000000200000", \
|
|
"CapEff":"0000000000200000", \
|
|
"CapBnd":"0000000000200000", \
|
|
"CapAmb":"0000000000000000"}
|
|
cap_dict = {"CAP_UNKNOWN": cap_unknown_dict, \
|
|
"CAP_KILL": cap_kill_dict, \
|
|
"CAP_SYS_ADMIN": cap_sys_admin_dict}
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
for cap, value in cap_dict.items():
|
|
out = run_crun_command(["exec", "--cap", cap, cid, "/init", "cat", "/proc/self/status"])
|
|
for i in ['bounding', 'effective', 'inheritable', 'permitted', 'ambient']:
|
|
conf['process']['capabilities'][i] = []
|
|
proc_status = parse_proc_status(out)
|
|
|
|
for i in ['CapInh', 'CapPrm', 'CapEff', 'CapBnd', 'CapAmb']:
|
|
if proc_status[i] != value[i]:
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
return 0
|
|
|
|
def test_exec_add_env():
|
|
"""Add an environment variable"""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
conf['process']['capabilities'] = {}
|
|
cid = None
|
|
env_args_list = []
|
|
env_dict_orig = {"HOME":"/", "PATH":"/bin"}
|
|
env_dict_new = {"HOME":"/tmp", "PATH":"/usr/bin","FOO":"BAR"}
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
# check original environment variable
|
|
for env, value in env_dict_orig.items():
|
|
out = run_crun_command(["exec", cid, "/init", "printenv", env])
|
|
if value not in out:
|
|
return -1
|
|
# check that the environment has the key/value pair we added
|
|
for env, value in env_dict_new.items():
|
|
out = run_crun_command(["exec", "--env", "%s=%s" %(env,value), \
|
|
cid, "/init", "printenv", env])
|
|
env_args_list.append("%s=%s" %(env,value))
|
|
if value not in out:
|
|
return -1
|
|
|
|
# set multiple environment variable at the same time
|
|
out = run_crun_command(["exec", "--env", env_args_list[0], \
|
|
"-e", env_args_list[1], \
|
|
"-e", env_args_list[2], \
|
|
cid, "/init", "printenv", "PATH"])
|
|
if env_dict_new["PATH"] not in out:
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
return 0
|
|
|
|
def test_exec_set_user():
|
|
"""specify the user in the form UID[:GID]"""
|
|
if is_rootless():
|
|
return 77
|
|
|
|
conf = base_config()
|
|
add_all_namespaces(conf)
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
cid = None
|
|
uid_gid_list = ["1000:1000", "0:0", "65535:65535"]
|
|
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
# check current user id
|
|
out = run_crun_command(["exec", cid, "/init", "id"])
|
|
if uid_gid_list[1] not in out:
|
|
return -1
|
|
# check that the uid and gid have the value we added
|
|
for id in uid_gid_list:
|
|
out = run_crun_command(["exec", "--user", id, cid, "/init", "id"])
|
|
if id not in out:
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
return 0
|
|
|
|
def test_exec_no_new_privs():
|
|
"""Set the no new privileges value for the process"""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
conf['process']['capabilities'] = {}
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
# check original value of NoNewPrivs
|
|
out = run_crun_command(["exec", cid, "/init", "cat", "/proc/self/status"])
|
|
proc_status = parse_proc_status(out)
|
|
if proc_status["NoNewPrivs"] != "0":
|
|
return -1
|
|
out = run_crun_command(["exec", "--no-new-privs", cid, "/init", "cat", "/proc/self/status"])
|
|
# check no new privileges value of NoNewPrivs
|
|
proc_status = parse_proc_status(out)
|
|
if proc_status["NoNewPrivs"] != "1":
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
return 0
|
|
|
|
def test_exec_write_pid_file():
|
|
"""Set the no new privileges value for the process"""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
conf['process']['capabilities'] = {}
|
|
cid = None
|
|
tempdir = tempfile.mkdtemp()
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
pid_file = os.path.join(tempdir, cid)
|
|
out = run_crun_command(["exec", "--pid-file", pid_file, cid, "/init", "echo", "hello"])
|
|
if "hello" not in out:
|
|
return -1
|
|
if not os.path.exists(pid_file):
|
|
return -1
|
|
|
|
regu_cont = re.compile(r'\d+')
|
|
with open(pid_file, 'r') as fp:
|
|
contents = fp.read()
|
|
fp.close()
|
|
if not regu_cont.match(contents):
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
shutil.rmtree(tempdir)
|
|
return 0
|
|
|
|
def test_exec_cpu_affinity():
|
|
if len(os.sched_getaffinity(0)) < 4:
|
|
return 77
|
|
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
tempdir = tempfile.mkdtemp()
|
|
|
|
def cpu_mask_from_proc_status(status):
|
|
for l in status.split("\n"):
|
|
parts = l.split(":")
|
|
if parts[0] == "Cpus_allowed_list":
|
|
return parts[1].strip()
|
|
return ""
|
|
|
|
def exec_and_get_affinity_mask(cid, exec_cpu_affinity=None):
|
|
process_file = os.path.join(tempdir, "process.json")
|
|
with open(process_file, "w") as f:
|
|
process = {
|
|
"user": {
|
|
"uid": 0,
|
|
"gid": 0
|
|
},
|
|
"terminal": False,
|
|
"cwd" : "/",
|
|
"args": [
|
|
"/init",
|
|
"cat",
|
|
"/proc/self/status"
|
|
]
|
|
}
|
|
if exec_cpu_affinity is not None:
|
|
process["execCPUAffinity"] = exec_cpu_affinity
|
|
json.dump(process, f)
|
|
|
|
out = run_crun_command(["exec", "--process", process_file, cid])
|
|
return cpu_mask_from_proc_status(out)
|
|
|
|
try:
|
|
with open("/proc/self/status") as f:
|
|
current_cpu_mask = cpu_mask_from_proc_status(f.read())
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
mask = exec_and_get_affinity_mask(cid)
|
|
if mask != current_cpu_mask:
|
|
logger.info("current cpu mask %s != %s", mask, current_cpu_mask)
|
|
return -1
|
|
|
|
mask = exec_and_get_affinity_mask(cid, {"initial" : "0-1"})
|
|
if mask != "0-1":
|
|
logger.info("cpu mask %s != 0-1", mask)
|
|
return -1
|
|
|
|
mask = exec_and_get_affinity_mask(cid, {"final" : "0-2"})
|
|
if mask != "0-2":
|
|
logger.info("cpu mask %s != 0-2", mask)
|
|
return -1
|
|
|
|
mask = exec_and_get_affinity_mask(cid, {"initial" : "1", "final" : "0-3"})
|
|
if mask != "0-3":
|
|
logger.info("cpu mask %s != 0-3", mask)
|
|
return -1
|
|
return 0
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
shutil.rmtree(tempdir)
|
|
return 0
|
|
|
|
def test_exec_getpgrp():
|
|
conf = base_config()
|
|
add_all_namespaces(conf)
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
for terminal in [True, False]:
|
|
if terminal and os.isatty(1) == False:
|
|
continue
|
|
cmdline = ["exec", "-t" if terminal else None, cid, "/init", "getpgrp"]
|
|
out = run_crun_command([x for x in cmdline if x is not None])
|
|
pgrp = int(out.split("\n")[0])
|
|
if pgrp <= 0:
|
|
logger.info("invalid pgrp, got %d", pgrp)
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
return 0
|
|
|
|
def test_exec_help():
|
|
out = run_crun_command(["exec", "--help"])
|
|
if "Usage: crun [OPTION...] exec CONTAINER cmd" not in out:
|
|
return -1
|
|
|
|
return 0
|
|
|
|
def test_exec_error_propagation():
|
|
"""Test that exec setup errors are propagated correctly without both chdir and read pipe errors"""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
try:
|
|
out = run_crun_command_raw(["exec", "--cwd", "/invalid/nonexistent/path", cid, "/init", "echo", "test"])
|
|
return -1
|
|
except subprocess.CalledProcessError as e:
|
|
error_msg = e.output.decode('utf-8', errors='ignore')
|
|
has_chdir_error = "chdir" in error_msg.lower() or "No such file or directory" in error_msg
|
|
has_read_pipe_error = "read pipe failed" in error_msg
|
|
|
|
if has_chdir_error and has_read_pipe_error:
|
|
logger.info("exec error propagation test failed: both chdir and read pipe errors detected")
|
|
logger.info("error message: %s", error_output)
|
|
return -1
|
|
|
|
if not has_chdir_error:
|
|
logger.info("exec error propagation test failed: expected chdir error but got: %s", error_msg)
|
|
return -1
|
|
|
|
return 0
|
|
finally:
|
|
if cid is not None:
|
|
try:
|
|
run_crun_command(["delete", "-f", cid])
|
|
except Exception as cleanup_e:
|
|
logger.info("warning: failed to cleanup container %s: %s", cid, cleanup_e)
|
|
return 0
|
|
|
|
def test_exec_cwd():
|
|
"""Test exec with working directory option."""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
out = run_crun_command(["exec", "--cwd", "/", cid, "/init", "cwd"])
|
|
if "/" not in out:
|
|
logger.info("test_exec_cwd: expected '/' in output, got: %s", out)
|
|
return -1
|
|
return 0
|
|
except Exception as e:
|
|
logger.info("test_exec_cwd failed: %s", e)
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
|
|
|
|
def test_exec_process_json():
|
|
"""Test exec with process.json file."""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
process_file = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
# Create a process.json file
|
|
process_spec = {
|
|
"terminal": False,
|
|
"user": {"uid": 0, "gid": 0},
|
|
"args": ["/init", "echo", "hello"],
|
|
"env": ["PATH=/bin", "TERM=xterm"],
|
|
"cwd": "/"
|
|
}
|
|
|
|
# Use a regular temp file and close it before using
|
|
fd, process_file = tempfile.mkstemp(suffix='.json')
|
|
with os.fdopen(fd, 'w') as f:
|
|
json.dump(process_spec, f)
|
|
|
|
out = run_crun_command(["exec", "--process", process_file, cid])
|
|
if "hello" not in out:
|
|
logger.info("test_exec_process_json: expected 'hello' in output, got: %s", out)
|
|
return -1
|
|
return 0
|
|
|
|
except Exception as e:
|
|
logger.info("test_exec_process_json failed: %s", e)
|
|
return -1
|
|
finally:
|
|
if process_file and os.path.exists(process_file):
|
|
os.unlink(process_file)
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
|
|
|
|
def test_exec_detach():
|
|
"""Test exec with detach option."""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
# Run a detached exec that writes to a file
|
|
run_crun_command(["exec", "--detach", cid, "/init", "true"])
|
|
|
|
# If we get here without error, the test passed
|
|
return 0
|
|
|
|
except Exception as e:
|
|
logger.info("test_exec_detach failed: %s", e)
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
|
|
|
|
def test_exec_multiple():
|
|
"""Test multiple exec calls on same container."""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
# Run multiple exec commands
|
|
for i in range(3):
|
|
out = run_crun_command(["exec", cid, "/init", "echo", str(i)])
|
|
if str(i) not in out:
|
|
logger.info("test_exec_multiple: iteration %d failed, expected '%d' in output", i, i)
|
|
return -1
|
|
|
|
return 0
|
|
|
|
except Exception as e:
|
|
logger.info("test_exec_multiple failed: %s", e)
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
|
|
|
|
def test_exec_exit_code():
|
|
"""Test that exec returns correct exit code."""
|
|
conf = base_config()
|
|
conf['process']['args'] = ['/init', 'pause']
|
|
add_all_namespaces(conf)
|
|
cid = None
|
|
try:
|
|
_, cid = run_and_get_output(conf, hide_stderr=True, command='run', detach=True)
|
|
|
|
# Run exec that exits with code 0
|
|
run_crun_command(["exec", cid, "/init", "exit", "0"])
|
|
|
|
# Run exec that exits with non-zero code
|
|
try:
|
|
run_crun_command_raw(["exec", cid, "/init", "exit", "42"])
|
|
logger.info("test_exec_exit_code: expected non-zero exit but got success")
|
|
return -1
|
|
except subprocess.CalledProcessError as e:
|
|
if e.returncode != 42:
|
|
logger.info("test_exec_exit_code: expected exit code 42, got %d", e.returncode)
|
|
return -1
|
|
|
|
return 0
|
|
|
|
except Exception as e:
|
|
logger.info("test_exec_exit_code failed: %s", e)
|
|
return -1
|
|
finally:
|
|
if cid is not None:
|
|
run_crun_command(["delete", "-f", cid])
|
|
|
|
|
|
all_tests = {
|
|
"exec" : test_exec,
|
|
"exec-not-exists" : test_exec_not_exists,
|
|
"exec-detach-not-exists" : test_exec_detach_not_exists,
|
|
"exec-detach-additional-gids" : test_exec_additional_gids,
|
|
"exec-root-netns-with-userns" : test_exec_root_netns_with_userns,
|
|
"exec-add-capability" : test_exec_add_capability,
|
|
"exec-add-environment_variable" : test_exec_add_env,
|
|
"exec-set-user-with-uid-gid" : test_exec_set_user,
|
|
"exec_add_no_new_privileges" : test_exec_no_new_privs,
|
|
"exec_write_pid_file" : test_exec_write_pid_file,
|
|
"exec_populate_home_env_from_process_uid" : test_exec_populate_home_env_from_process_uid,
|
|
"exec-test-uid-tty": test_uid_tty,
|
|
"exec-cpu-affinity": test_exec_cpu_affinity,
|
|
"exec-getpgrp": test_exec_getpgrp,
|
|
"exec-help" : test_exec_help,
|
|
"exec-error-propagation" : test_exec_error_propagation,
|
|
"exec-cwd" : test_exec_cwd,
|
|
"exec-process-json" : test_exec_process_json,
|
|
"exec-detach" : test_exec_detach,
|
|
"exec-multiple" : test_exec_multiple,
|
|
"exec-exit-code" : test_exec_exit_code,
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
tests_main(all_tests)
|