diff --git a/subiquity/models/subiquity.py b/subiquity/models/subiquity.py index 6c3991d1..cbd0cc64 100644 --- a/subiquity/models/subiquity.py +++ b/subiquity/models/subiquity.py @@ -434,7 +434,7 @@ class SubiquityModel: ("etc/cloud/ds-identify.cfg", "policy: enabled\n", 0o644), ] # Add cloud-init clean hooks to support golden-image creation. - cfg_files = ["/" + path for (path, _content, _cmode) in files] + cfg_files = ["/" + path for (path, _content, _mode) in files] cfg_files.extend(self.network.rendered_config_paths()) if lsb_release()["release"] not in ("20.04", "22.04"): cfg_files.append("/etc/cloud/cloud-init.disabled") @@ -467,10 +467,10 @@ class SubiquityModel: if self.source.current.variant == "core": # can probably be supported but requires changes return - for path, content, cmode in self._cloud_init_files(): + for path, content, mode in self._cloud_init_files(): path = os.path.join(self.target, path) os.makedirs(os.path.dirname(path), exist_ok=True) - write_file(path, content, cmode=cmode) + write_file(path, content, mode=mode) def _media_info(self): if os.path.exists("/cdrom/.disk/info"): diff --git a/subiquity/server/controllers/shutdown.py b/subiquity/server/controllers/shutdown.py index 971bbe41..503d0ce3 100644 --- a/subiquity/server/controllers/shutdown.py +++ b/subiquity/server/controllers/shutdown.py @@ -113,7 +113,7 @@ class ShutdownController(SubiquityController): ["cp", "-aT", "/var/log/installer", target_logs] ) # Close the permissions from group writes on the target. - set_log_perms(target_logs, isdir=True, group_write=False) + set_log_perms(target_logs, group_write=False) journal_txt = os.path.join(target_logs, "installer-journal.txt") try: diff --git a/subiquitycore/file_util.py b/subiquitycore/file_util.py index 50d57349..3e67194c 100644 --- a/subiquitycore/file_util.py +++ b/subiquitycore/file_util.py @@ -29,7 +29,7 @@ _DEF_GROUP = "adm" log = logging.getLogger("subiquitycore.file_util") -def set_log_perms(target, *, isdir=True, group_write=False, mode=None): +def set_log_perms(target, *, group_write=False, mode=None, group=_DEF_GROUP): if os.getuid() != 0: log.warning( "set_log_perms: running as non-root - not adjusting" @@ -39,19 +39,16 @@ def set_log_perms(target, *, isdir=True, group_write=False, mode=None): return if mode is None: mode = _DEF_PERMS_FILE - if isdir: + if os.path.isdir(target): mode |= 0o110 if group_write: mode |= 0o020 os.chmod(target, mode) - os.chown(target, -1, grp.getgrnam(_DEF_GROUP).gr_gid) + os.chown(target, -1, grp.getgrnam(group).gr_gid) @contextlib.contextmanager -def open_perms(filename, *, cmode=None): - if cmode is None: - cmode = _DEF_PERMS_FILE - +def open_perms(filename, **kwargs): tf = None try: dirname = os.path.dirname(filename) @@ -59,7 +56,7 @@ def open_perms(filename, *, cmode=None): tf = tempfile.NamedTemporaryFile(dir=dirname, delete=False, mode="w") yield tf tf.close() - set_log_perms(tf.name, mode=cmode) + set_log_perms(tf.name, **kwargs) os.rename(tf.name, filename) except OSError as e: if tf is not None: diff --git a/subiquitycore/log.py b/subiquitycore/log.py index f04f2d1e..36a54f97 100644 --- a/subiquitycore/log.py +++ b/subiquitycore/log.py @@ -23,7 +23,7 @@ def setup_logger(dir, base="subiquity"): os.makedirs(dir, exist_ok=True) # Create the log directory in such a way that users in the group may # write to this directory in the installation environment. - set_log_perms(dir, isdir=True, group_write=True) + set_log_perms(dir, group_write=True) logger = logging.getLogger("") logger.setLevel(logging.DEBUG) @@ -34,7 +34,7 @@ def setup_logger(dir, base="subiquity"): nopid_file = os.path.join(dir, "{}-{}.log".format(base, level)) logfile = "{}.{}".format(nopid_file, os.getpid()) handler = logging.FileHandler(logfile) - set_log_perms(logfile, isdir=False, group_write=False) + set_log_perms(logfile, group_write=False) # os.symlink cannot replace an existing file or symlink so create # it and then rename it over. tmplink = logfile + ".link" diff --git a/subiquitycore/tests/test_file_util.py b/subiquitycore/tests/test_file_util.py index ea0b4849..2d522eef 100644 --- a/subiquitycore/tests/test_file_util.py +++ b/subiquitycore/tests/test_file_util.py @@ -13,7 +13,15 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from subiquitycore.file_util import copy_file_if_exists +from pathlib import Path +from unittest.mock import Mock, patch + +from subiquitycore.file_util import ( + _DEF_GROUP, + _DEF_PERMS_FILE, + copy_file_if_exists, + set_log_perms, +) from subiquitycore.tests import SubiTestCase @@ -29,3 +37,91 @@ class TestCopy(SubiTestCase): def test_copied_non_exist_src(self): copy_file_if_exists("/does/not/exist", "/ditto") + + +@patch("subiquitycore.file_util.os.getuid", new=Mock(return_value=0)) +class TestLogPerms(SubiTestCase): + def setUp(self): + chmod = patch("subiquitycore.file_util.os.chmod") + self.chmod = chmod.start() + self.addCleanup(chmod.stop) + chown = patch("subiquitycore.file_util.os.chown") + self.chown = chown.start() + self.addCleanup(chown.stop) + getgrnam = patch("subiquitycore.file_util.grp.getgrnam") + self.getgrnam = getgrnam.start() + self.addCleanup(getgrnam.stop) + self.mock_gid = 10 + self.getgrnam.return_value = Mock(gr_gid=self.mock_gid) + + def test_defaults_group(self): + target = self.tmp_dir() + set_log_perms(target) + self.getgrnam.assert_called_once_with(_DEF_GROUP) + + def test_defaults_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_defaults_dir(self): + target = self.tmp_dir() + set_log_perms(target) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_group_write_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, group_write=True) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o020) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_group_write_dir(self): + target = self.tmp_dir() + set_log_perms(target, group_write=True) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o130) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_nogroup_write_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, group_write=False) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_nogroup_write_dir(self): + target = self.tmp_dir() + set_log_perms(target, group_write=False) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_mode_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, mode=0o510) + self.chmod.assert_called_once_with(target, 0o510) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_mode_dir(self): + target = self.tmp_dir() + set_log_perms(target, mode=0o510) + self.chmod.assert_called_once_with(target, 0o510) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_group_file(self): + self.getgrnam.return_value = Mock(gr_gid=11) + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, group="group1") + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE) + self.chown.assert_called_once_with(target, -1, 11) + + def test_group_dir(self): + self.getgrnam.return_value = Mock(gr_gid=11) + target = self.tmp_dir() + set_log_perms(target, group="group1") + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110) + self.chown.assert_called_once_with(target, -1, 11)