From 008b9e2f0492de696573300d8ed859c115c7928b Mon Sep 17 00:00:00 2001 From: Dan Bungert Date: Mon, 4 Apr 2022 11:35:00 -0600 Subject: [PATCH] autoinstall: fix interaction with early-commands Early commands is currently documented as follows: > The autoinstall config is available at /autoinstall.yaml > (irrespective of how it was provided) and the file will be re-read > after the early-commands have run to allow them to alter the config > if necessary. The previous change to let cloud take precedence over iso location broke this functionality. Fix that by copying to the iso location the file of choice. --- subiquity/server/server.py | 31 +++++---- subiquity/server/tests/test_server.py | 94 ++++++++++++++++++--------- 2 files changed, 82 insertions(+), 43 deletions(-) diff --git a/subiquity/server/server.py b/subiquity/server/server.py index 0b7dc679..a4af100c 100644 --- a/subiquity/server/server.py +++ b/subiquity/server/server.py @@ -539,7 +539,7 @@ class SubiquityServer(Application): "cloud-init status: %r, assumed disabled", status_txt) - def select_autoinstall_location(self): + def select_autoinstall(self): # precedence # 1. data from before reload # 2. command line argument autoinstall @@ -562,20 +562,23 @@ class SubiquityServer(Application): for loc in locations: if loc is not None and os.path.exists(loc): - return loc - return None + break + else: + return None - def save_autoinstall_for_reload(self): - target = self.base_relative(reload_autoinstall_path) - if self.autoinstall is None: - return - if not os.path.exists(self.autoinstall): - return - if os.path.exists(target): + isopath = self.base_relative(iso_autoinstall_path) + self.copy_autoinstall(loc, isopath) + return isopath + + def copy_autoinstall(self, source, target): + if source is None or not os.path.exists(source): return dirname = os.path.dirname(target) os.makedirs(dirname, exist_ok=True) - shutil.copyfile(self.autoinstall, target) + try: + shutil.copyfile(source, target) + except shutil.SameFileError: + pass def _user_has_password(self, username): with open('/etc/shadow') as fp: @@ -640,8 +643,8 @@ class SubiquityServer(Application): await self.start_api_server() self.update_state(ApplicationState.CLOUD_INIT_WAIT) await self.wait_for_cloudinit() - self.autoinstall = self.select_autoinstall_location() self.set_installer_password() + self.autoinstall = self.select_autoinstall() self.load_autoinstall_config(only_early=True) if self.autoinstall_config and self.controllers.Early.cmds: stamp_file = self.state_path("early-commands") @@ -654,7 +657,9 @@ class SubiquityServer(Application): open(stamp_file, 'w').close() await asyncio.sleep(1) self.load_autoinstall_config(only_early=False) - self.save_autoinstall_for_reload() + self.copy_autoinstall( + self.autoinstall, + self.base_relative(reload_autoinstall_path)) if self.autoinstall_config: self.interactive = bool( self.autoinstall_config.get('interactive-sections')) diff --git a/subiquity/server/tests/test_server.py b/subiquity/server/tests/test_server.py index 033f93c7..19a1fd7d 100644 --- a/subiquity/server/tests/test_server.py +++ b/subiquity/server/tests/test_server.py @@ -13,8 +13,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import shlex from unittest.mock import Mock +from subiquitycore.utils import run_command from subiquitycore.tests import SubiTestCase from subiquity.server.server import ( SubiquityServer, @@ -25,6 +27,10 @@ from subiquity.server.server import ( class TestAutoinstallLoad(SubiTestCase): + def assertContents(self, path, expected): + with open(path, 'r') as fp: + self.assertEqual(expected, fp.read()) + def setUp(self): self.tempdir = self.tmp_dir() opts = Mock() @@ -40,55 +46,83 @@ class TestAutoinstallLoad(SubiTestCase): def path(self, relative_path): return self.tmp_path(relative_path, dir=self.tempdir) - def create(self, path): + def create(self, path, contents): path = self.path(path) - open(path, 'w').close() + with open(path, 'w') as fp: + fp.write(contents) return path def test_autoinstall_disabled(self): - self.create(reload_autoinstall_path) - self.create(cloud_autoinstall_path) - self.create(iso_autoinstall_path) + self.create(reload_autoinstall_path, 'reload') + self.create(cloud_autoinstall_path, 'cloud') + self.create(iso_autoinstall_path, 'iso') self.server.opts.autoinstall = "" - self.assertIsNone(self.server.select_autoinstall_location()) + self.assertIsNone(self.server.select_autoinstall()) def test_reload_wins(self): - expected = self.create(reload_autoinstall_path) - autoinstall = self.create(self.path('arg.autoinstall.yaml')) + self.create(reload_autoinstall_path, 'reload') + autoinstall = self.create(self.path('arg.autoinstall.yaml'), 'arg') self.server.opts.autoinstall = autoinstall - self.create(cloud_autoinstall_path) - self.create(iso_autoinstall_path) - self.assertEqual(expected, self.server.select_autoinstall_location()) + self.create(cloud_autoinstall_path, 'cloud') + iso = self.create(iso_autoinstall_path, 'iso') + self.assertEqual(iso, self.server.select_autoinstall()) + self.assertContents(iso, 'reload') def test_arg_wins(self): - expected = self.create(self.path('arg.autoinstall.yaml')) - self.server.opts.autoinstall = expected - self.create(cloud_autoinstall_path) - self.create(iso_autoinstall_path) - self.assertEqual(expected, self.server.select_autoinstall_location()) + arg = self.create(self.path('arg.autoinstall.yaml'), 'arg') + self.server.opts.autoinstall = arg + self.create(cloud_autoinstall_path, 'cloud') + iso = self.create(iso_autoinstall_path, 'iso') + self.assertEqual(iso, self.server.select_autoinstall()) + self.assertContents(iso, 'arg') def test_cloud_wins(self): - expected = self.create(cloud_autoinstall_path) - self.create(iso_autoinstall_path) - self.assertEqual(expected, self.server.select_autoinstall_location()) + self.create(cloud_autoinstall_path, 'cloud') + iso = self.create(iso_autoinstall_path, 'iso') + self.assertEqual(iso, self.server.select_autoinstall()) + self.assertContents(iso, 'cloud') def test_iso_wins(self): - expected = self.create(iso_autoinstall_path) - self.assertEqual(expected, self.server.select_autoinstall_location()) + iso = self.create(iso_autoinstall_path, 'iso') + self.assertEqual(iso, self.server.select_autoinstall()) + self.assertContents(iso, 'iso') def test_nobody_wins(self): - self.assertIsNone(self.server.select_autoinstall_location()) + self.assertIsNone(self.server.select_autoinstall()) def test_copied_to_reload(self): - self.server.autoinstall = self.tmp_path('test.yaml', dir=self.tempdir) - expected = 'stuff things' - with open(self.server.autoinstall, 'w') as fp: - fp.write(expected) - self.server.save_autoinstall_for_reload() - with open(self.path(reload_autoinstall_path), 'r') as fp: - self.assertEqual(expected, fp.read()) + data = 'stuff things' + src = self.create(self.path('test.yaml'), data) + tgt = self.path(reload_autoinstall_path) + self.server.copy_autoinstall(src, tgt) + self.assertContents(tgt, data) def test_bogus_autoinstall_argument(self): self.server.opts.autoinstall = self.path('nonexistant.yaml') with self.assertRaises(Exception): - self.server.select_autoinstall_location() + self.server.select_autoinstall() + + def test_early_commands_changes_autoinstall(self): + self.server.controllers = Mock() + self.server.controllers.instances = [] + isopath = self.create(iso_autoinstall_path, '') + + cmd = f"sed -i -e '$ a stuff: things' {isopath}" + contents = f'''\ +version: 1 +early-commands: ["{cmd}"] +''' + self.create(cloud_autoinstall_path, contents) + + self.server.autoinstall = self.server.select_autoinstall() + self.server.load_autoinstall_config(only_early=True) + before_early = {'version': 1, + 'early-commands': [cmd]} + self.assertEqual(before_early, self.server.autoinstall_config) + run_command(shlex.split(cmd), check=True) + + self.server.load_autoinstall_config(only_early=False) + after_early = {'version': 1, + 'early-commands': [cmd], + 'stuff': 'things'} + self.assertEqual(after_early, self.server.autoinstall_config)