From ddc3345eb69afee38e0b70c943b7cce86a2d0723 Mon Sep 17 00:00:00 2001 From: Dan Bungert Date: Tue, 3 Oct 2023 16:36:45 -0600 Subject: [PATCH 1/4] util: standardize on term 'mode' --- subiquity/models/subiquity.py | 6 +++--- subiquitycore/file_util.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/subiquity/models/subiquity.py b/subiquity/models/subiquity.py index 6c3991d1..cbd0cc64 100644 --- a/subiquity/models/subiquity.py +++ b/subiquity/models/subiquity.py @@ -434,7 +434,7 @@ class SubiquityModel: ("etc/cloud/ds-identify.cfg", "policy: enabled\n", 0o644), ] # Add cloud-init clean hooks to support golden-image creation. - cfg_files = ["/" + path for (path, _content, _cmode) in files] + cfg_files = ["/" + path for (path, _content, _mode) in files] cfg_files.extend(self.network.rendered_config_paths()) if lsb_release()["release"] not in ("20.04", "22.04"): cfg_files.append("/etc/cloud/cloud-init.disabled") @@ -467,10 +467,10 @@ class SubiquityModel: if self.source.current.variant == "core": # can probably be supported but requires changes return - for path, content, cmode in self._cloud_init_files(): + for path, content, mode in self._cloud_init_files(): path = os.path.join(self.target, path) os.makedirs(os.path.dirname(path), exist_ok=True) - write_file(path, content, cmode=cmode) + write_file(path, content, mode=mode) def _media_info(self): if os.path.exists("/cdrom/.disk/info"): diff --git a/subiquitycore/file_util.py b/subiquitycore/file_util.py index 50d57349..0d2ac125 100644 --- a/subiquitycore/file_util.py +++ b/subiquitycore/file_util.py @@ -48,9 +48,9 @@ def set_log_perms(target, *, isdir=True, group_write=False, mode=None): @contextlib.contextmanager -def open_perms(filename, *, cmode=None): - if cmode is None: - cmode = _DEF_PERMS_FILE +def open_perms(filename, *, mode=None): + if mode is None: + mode = _DEF_PERMS_FILE tf = None try: @@ -59,7 +59,7 @@ def open_perms(filename, *, cmode=None): tf = tempfile.NamedTemporaryFile(dir=dirname, delete=False, mode="w") yield tf tf.close() - set_log_perms(tf.name, mode=cmode) + set_log_perms(tf.name, mode=mode) os.rename(tf.name, filename) except OSError as e: if tf is not None: From ddc11d8687d187a2ff7e2e006ea978bd13eff037 Mon Sep 17 00:00:00 2001 From: Dan Bungert Date: Tue, 3 Oct 2023 16:40:15 -0600 Subject: [PATCH 2/4] util: more control on file writer mode and group --- subiquitycore/file_util.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/subiquitycore/file_util.py b/subiquitycore/file_util.py index 0d2ac125..461e5396 100644 --- a/subiquitycore/file_util.py +++ b/subiquitycore/file_util.py @@ -29,7 +29,9 @@ _DEF_GROUP = "adm" log = logging.getLogger("subiquitycore.file_util") -def set_log_perms(target, *, isdir=True, group_write=False, mode=None): +def set_log_perms( + target, *, isdir=True, group_write=False, mode=None, group=_DEF_GROUP +): if os.getuid() != 0: log.warning( "set_log_perms: running as non-root - not adjusting" @@ -44,14 +46,11 @@ def set_log_perms(target, *, isdir=True, group_write=False, mode=None): if group_write: mode |= 0o020 os.chmod(target, mode) - os.chown(target, -1, grp.getgrnam(_DEF_GROUP).gr_gid) + os.chown(target, -1, grp.getgrnam(group).gr_gid) @contextlib.contextmanager -def open_perms(filename, *, mode=None): - if mode is None: - mode = _DEF_PERMS_FILE - +def open_perms(filename, **kwargs): tf = None try: dirname = os.path.dirname(filename) @@ -59,7 +58,7 @@ def open_perms(filename, *, mode=None): tf = tempfile.NamedTemporaryFile(dir=dirname, delete=False, mode="w") yield tf tf.close() - set_log_perms(tf.name, mode=mode) + set_log_perms(tf.name, **kwargs) os.rename(tf.name, filename) except OSError as e: if tf is not None: From 4a4e8ba8863401404a132784dfd66ce04822984b Mon Sep 17 00:00:00 2001 From: Dan Bungert Date: Tue, 3 Oct 2023 18:56:07 -0600 Subject: [PATCH 3/4] util: explicit isdir arg from set_log_perms target already exists, we should just inspect target and find if it is a directory or not. --- subiquity/server/controllers/shutdown.py | 2 +- subiquitycore/file_util.py | 6 ++---- subiquitycore/log.py | 4 ++-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/subiquity/server/controllers/shutdown.py b/subiquity/server/controllers/shutdown.py index 971bbe41..503d0ce3 100644 --- a/subiquity/server/controllers/shutdown.py +++ b/subiquity/server/controllers/shutdown.py @@ -113,7 +113,7 @@ class ShutdownController(SubiquityController): ["cp", "-aT", "/var/log/installer", target_logs] ) # Close the permissions from group writes on the target. - set_log_perms(target_logs, isdir=True, group_write=False) + set_log_perms(target_logs, group_write=False) journal_txt = os.path.join(target_logs, "installer-journal.txt") try: diff --git a/subiquitycore/file_util.py b/subiquitycore/file_util.py index 461e5396..3e67194c 100644 --- a/subiquitycore/file_util.py +++ b/subiquitycore/file_util.py @@ -29,9 +29,7 @@ _DEF_GROUP = "adm" log = logging.getLogger("subiquitycore.file_util") -def set_log_perms( - target, *, isdir=True, group_write=False, mode=None, group=_DEF_GROUP -): +def set_log_perms(target, *, group_write=False, mode=None, group=_DEF_GROUP): if os.getuid() != 0: log.warning( "set_log_perms: running as non-root - not adjusting" @@ -41,7 +39,7 @@ def set_log_perms( return if mode is None: mode = _DEF_PERMS_FILE - if isdir: + if os.path.isdir(target): mode |= 0o110 if group_write: mode |= 0o020 diff --git a/subiquitycore/log.py b/subiquitycore/log.py index f04f2d1e..36a54f97 100644 --- a/subiquitycore/log.py +++ b/subiquitycore/log.py @@ -23,7 +23,7 @@ def setup_logger(dir, base="subiquity"): os.makedirs(dir, exist_ok=True) # Create the log directory in such a way that users in the group may # write to this directory in the installation environment. - set_log_perms(dir, isdir=True, group_write=True) + set_log_perms(dir, group_write=True) logger = logging.getLogger("") logger.setLevel(logging.DEBUG) @@ -34,7 +34,7 @@ def setup_logger(dir, base="subiquity"): nopid_file = os.path.join(dir, "{}-{}.log".format(base, level)) logfile = "{}.{}".format(nopid_file, os.getpid()) handler = logging.FileHandler(logfile) - set_log_perms(logfile, isdir=False, group_write=False) + set_log_perms(logfile, group_write=False) # os.symlink cannot replace an existing file or symlink so create # it and then rename it over. tmplink = logfile + ".link" From 8ab052c2000aa4b9dd67e6c19ef584b487864f7f Mon Sep 17 00:00:00 2001 From: Dan Bungert Date: Tue, 3 Oct 2023 18:57:09 -0600 Subject: [PATCH 4/4] util: set_log_perms tests --- subiquitycore/tests/test_file_util.py | 98 ++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/subiquitycore/tests/test_file_util.py b/subiquitycore/tests/test_file_util.py index ea0b4849..2d522eef 100644 --- a/subiquitycore/tests/test_file_util.py +++ b/subiquitycore/tests/test_file_util.py @@ -13,7 +13,15 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from subiquitycore.file_util import copy_file_if_exists +from pathlib import Path +from unittest.mock import Mock, patch + +from subiquitycore.file_util import ( + _DEF_GROUP, + _DEF_PERMS_FILE, + copy_file_if_exists, + set_log_perms, +) from subiquitycore.tests import SubiTestCase @@ -29,3 +37,91 @@ class TestCopy(SubiTestCase): def test_copied_non_exist_src(self): copy_file_if_exists("/does/not/exist", "/ditto") + + +@patch("subiquitycore.file_util.os.getuid", new=Mock(return_value=0)) +class TestLogPerms(SubiTestCase): + def setUp(self): + chmod = patch("subiquitycore.file_util.os.chmod") + self.chmod = chmod.start() + self.addCleanup(chmod.stop) + chown = patch("subiquitycore.file_util.os.chown") + self.chown = chown.start() + self.addCleanup(chown.stop) + getgrnam = patch("subiquitycore.file_util.grp.getgrnam") + self.getgrnam = getgrnam.start() + self.addCleanup(getgrnam.stop) + self.mock_gid = 10 + self.getgrnam.return_value = Mock(gr_gid=self.mock_gid) + + def test_defaults_group(self): + target = self.tmp_dir() + set_log_perms(target) + self.getgrnam.assert_called_once_with(_DEF_GROUP) + + def test_defaults_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_defaults_dir(self): + target = self.tmp_dir() + set_log_perms(target) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_group_write_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, group_write=True) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o020) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_group_write_dir(self): + target = self.tmp_dir() + set_log_perms(target, group_write=True) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o130) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_nogroup_write_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, group_write=False) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_nogroup_write_dir(self): + target = self.tmp_dir() + set_log_perms(target, group_write=False) + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_mode_file(self): + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, mode=0o510) + self.chmod.assert_called_once_with(target, 0o510) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_mode_dir(self): + target = self.tmp_dir() + set_log_perms(target, mode=0o510) + self.chmod.assert_called_once_with(target, 0o510) + self.chown.assert_called_once_with(target, -1, self.mock_gid) + + def test_group_file(self): + self.getgrnam.return_value = Mock(gr_gid=11) + target = self.tmp_path("file") + Path(target).touch() + set_log_perms(target, group="group1") + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE) + self.chown.assert_called_once_with(target, -1, 11) + + def test_group_dir(self): + self.getgrnam.return_value = Mock(gr_gid=11) + target = self.tmp_dir() + set_log_perms(target, group="group1") + self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110) + self.chown.assert_called_once_with(target, -1, 11)