Merge pull request #1818 from dbungert/file-writer-enhancement

util: File writer enhancements
This commit is contained in:
Dan Bungert 2023-10-03 19:16:37 -06:00 committed by GitHub
commit 3a319e77d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 108 additions and 15 deletions

View File

@ -434,7 +434,7 @@ class SubiquityModel:
("etc/cloud/ds-identify.cfg", "policy: enabled\n", 0o644), ("etc/cloud/ds-identify.cfg", "policy: enabled\n", 0o644),
] ]
# Add cloud-init clean hooks to support golden-image creation. # Add cloud-init clean hooks to support golden-image creation.
cfg_files = ["/" + path for (path, _content, _cmode) in files] cfg_files = ["/" + path for (path, _content, _mode) in files]
cfg_files.extend(self.network.rendered_config_paths()) cfg_files.extend(self.network.rendered_config_paths())
if lsb_release()["release"] not in ("20.04", "22.04"): if lsb_release()["release"] not in ("20.04", "22.04"):
cfg_files.append("/etc/cloud/cloud-init.disabled") cfg_files.append("/etc/cloud/cloud-init.disabled")
@ -467,10 +467,10 @@ class SubiquityModel:
if self.source.current.variant == "core": if self.source.current.variant == "core":
# can probably be supported but requires changes # can probably be supported but requires changes
return return
for path, content, cmode in self._cloud_init_files(): for path, content, mode in self._cloud_init_files():
path = os.path.join(self.target, path) path = os.path.join(self.target, path)
os.makedirs(os.path.dirname(path), exist_ok=True) os.makedirs(os.path.dirname(path), exist_ok=True)
write_file(path, content, cmode=cmode) write_file(path, content, mode=mode)
def _media_info(self): def _media_info(self):
if os.path.exists("/cdrom/.disk/info"): if os.path.exists("/cdrom/.disk/info"):

View File

@ -113,7 +113,7 @@ class ShutdownController(SubiquityController):
["cp", "-aT", "/var/log/installer", target_logs] ["cp", "-aT", "/var/log/installer", target_logs]
) )
# Close the permissions from group writes on the target. # Close the permissions from group writes on the target.
set_log_perms(target_logs, isdir=True, group_write=False) set_log_perms(target_logs, group_write=False)
journal_txt = os.path.join(target_logs, "installer-journal.txt") journal_txt = os.path.join(target_logs, "installer-journal.txt")
try: try:

View File

@ -29,7 +29,7 @@ _DEF_GROUP = "adm"
log = logging.getLogger("subiquitycore.file_util") log = logging.getLogger("subiquitycore.file_util")
def set_log_perms(target, *, isdir=True, group_write=False, mode=None): def set_log_perms(target, *, group_write=False, mode=None, group=_DEF_GROUP):
if os.getuid() != 0: if os.getuid() != 0:
log.warning( log.warning(
"set_log_perms: running as non-root - not adjusting" "set_log_perms: running as non-root - not adjusting"
@ -39,19 +39,16 @@ def set_log_perms(target, *, isdir=True, group_write=False, mode=None):
return return
if mode is None: if mode is None:
mode = _DEF_PERMS_FILE mode = _DEF_PERMS_FILE
if isdir: if os.path.isdir(target):
mode |= 0o110 mode |= 0o110
if group_write: if group_write:
mode |= 0o020 mode |= 0o020
os.chmod(target, mode) os.chmod(target, mode)
os.chown(target, -1, grp.getgrnam(_DEF_GROUP).gr_gid) os.chown(target, -1, grp.getgrnam(group).gr_gid)
@contextlib.contextmanager @contextlib.contextmanager
def open_perms(filename, *, cmode=None): def open_perms(filename, **kwargs):
if cmode is None:
cmode = _DEF_PERMS_FILE
tf = None tf = None
try: try:
dirname = os.path.dirname(filename) dirname = os.path.dirname(filename)
@ -59,7 +56,7 @@ def open_perms(filename, *, cmode=None):
tf = tempfile.NamedTemporaryFile(dir=dirname, delete=False, mode="w") tf = tempfile.NamedTemporaryFile(dir=dirname, delete=False, mode="w")
yield tf yield tf
tf.close() tf.close()
set_log_perms(tf.name, mode=cmode) set_log_perms(tf.name, **kwargs)
os.rename(tf.name, filename) os.rename(tf.name, filename)
except OSError as e: except OSError as e:
if tf is not None: if tf is not None:

View File

@ -23,7 +23,7 @@ def setup_logger(dir, base="subiquity"):
os.makedirs(dir, exist_ok=True) os.makedirs(dir, exist_ok=True)
# Create the log directory in such a way that users in the group may # Create the log directory in such a way that users in the group may
# write to this directory in the installation environment. # write to this directory in the installation environment.
set_log_perms(dir, isdir=True, group_write=True) set_log_perms(dir, group_write=True)
logger = logging.getLogger("") logger = logging.getLogger("")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -34,7 +34,7 @@ def setup_logger(dir, base="subiquity"):
nopid_file = os.path.join(dir, "{}-{}.log".format(base, level)) nopid_file = os.path.join(dir, "{}-{}.log".format(base, level))
logfile = "{}.{}".format(nopid_file, os.getpid()) logfile = "{}.{}".format(nopid_file, os.getpid())
handler = logging.FileHandler(logfile) handler = logging.FileHandler(logfile)
set_log_perms(logfile, isdir=False, group_write=False) set_log_perms(logfile, group_write=False)
# os.symlink cannot replace an existing file or symlink so create # os.symlink cannot replace an existing file or symlink so create
# it and then rename it over. # it and then rename it over.
tmplink = logfile + ".link" tmplink = logfile + ".link"

View File

@ -13,7 +13,15 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from subiquitycore.file_util import copy_file_if_exists from pathlib import Path
from unittest.mock import Mock, patch
from subiquitycore.file_util import (
_DEF_GROUP,
_DEF_PERMS_FILE,
copy_file_if_exists,
set_log_perms,
)
from subiquitycore.tests import SubiTestCase from subiquitycore.tests import SubiTestCase
@ -29,3 +37,91 @@ class TestCopy(SubiTestCase):
def test_copied_non_exist_src(self): def test_copied_non_exist_src(self):
copy_file_if_exists("/does/not/exist", "/ditto") copy_file_if_exists("/does/not/exist", "/ditto")
@patch("subiquitycore.file_util.os.getuid", new=Mock(return_value=0))
class TestLogPerms(SubiTestCase):
def setUp(self):
chmod = patch("subiquitycore.file_util.os.chmod")
self.chmod = chmod.start()
self.addCleanup(chmod.stop)
chown = patch("subiquitycore.file_util.os.chown")
self.chown = chown.start()
self.addCleanup(chown.stop)
getgrnam = patch("subiquitycore.file_util.grp.getgrnam")
self.getgrnam = getgrnam.start()
self.addCleanup(getgrnam.stop)
self.mock_gid = 10
self.getgrnam.return_value = Mock(gr_gid=self.mock_gid)
def test_defaults_group(self):
target = self.tmp_dir()
set_log_perms(target)
self.getgrnam.assert_called_once_with(_DEF_GROUP)
def test_defaults_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_defaults_dir(self):
target = self.tmp_dir()
set_log_perms(target)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_group_write_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, group_write=True)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o020)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_group_write_dir(self):
target = self.tmp_dir()
set_log_perms(target, group_write=True)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o130)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_nogroup_write_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, group_write=False)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_nogroup_write_dir(self):
target = self.tmp_dir()
set_log_perms(target, group_write=False)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_mode_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, mode=0o510)
self.chmod.assert_called_once_with(target, 0o510)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_mode_dir(self):
target = self.tmp_dir()
set_log_perms(target, mode=0o510)
self.chmod.assert_called_once_with(target, 0o510)
self.chown.assert_called_once_with(target, -1, self.mock_gid)
def test_group_file(self):
self.getgrnam.return_value = Mock(gr_gid=11)
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, group="group1")
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE)
self.chown.assert_called_once_with(target, -1, 11)
def test_group_dir(self):
self.getgrnam.return_value = Mock(gr_gid=11)
target = self.tmp_dir()
set_log_perms(target, group="group1")
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110)
self.chown.assert_called_once_with(target, -1, 11)