autoinstall: fix interaction with early-commands

Early commands is currently documented as follows:
> The autoinstall config is available at /autoinstall.yaml
> (irrespective of how it was provided) and the file will be re-read
> after the early-commands have run to allow them to alter the config
> if necessary.

The previous change to let cloud take precedence over iso location broke
this functionality.  Fix that by copying to the iso location the file of
choice.
This commit is contained in:
Dan Bungert 2022-04-04 11:35:00 -06:00
parent d69731af7a
commit 008b9e2f04
2 changed files with 82 additions and 43 deletions

View File

@ -539,7 +539,7 @@ class SubiquityServer(Application):
"cloud-init status: %r, assumed disabled", "cloud-init status: %r, assumed disabled",
status_txt) status_txt)
def select_autoinstall_location(self): def select_autoinstall(self):
# precedence # precedence
# 1. data from before reload # 1. data from before reload
# 2. command line argument autoinstall # 2. command line argument autoinstall
@ -562,20 +562,23 @@ class SubiquityServer(Application):
for loc in locations: for loc in locations:
if loc is not None and os.path.exists(loc): if loc is not None and os.path.exists(loc):
return loc break
return None else:
return None
def save_autoinstall_for_reload(self): isopath = self.base_relative(iso_autoinstall_path)
target = self.base_relative(reload_autoinstall_path) self.copy_autoinstall(loc, isopath)
if self.autoinstall is None: return isopath
return
if not os.path.exists(self.autoinstall): def copy_autoinstall(self, source, target):
return if source is None or not os.path.exists(source):
if os.path.exists(target):
return return
dirname = os.path.dirname(target) dirname = os.path.dirname(target)
os.makedirs(dirname, exist_ok=True) os.makedirs(dirname, exist_ok=True)
shutil.copyfile(self.autoinstall, target) try:
shutil.copyfile(source, target)
except shutil.SameFileError:
pass
def _user_has_password(self, username): def _user_has_password(self, username):
with open('/etc/shadow') as fp: with open('/etc/shadow') as fp:
@ -640,8 +643,8 @@ class SubiquityServer(Application):
await self.start_api_server() await self.start_api_server()
self.update_state(ApplicationState.CLOUD_INIT_WAIT) self.update_state(ApplicationState.CLOUD_INIT_WAIT)
await self.wait_for_cloudinit() await self.wait_for_cloudinit()
self.autoinstall = self.select_autoinstall_location()
self.set_installer_password() self.set_installer_password()
self.autoinstall = self.select_autoinstall()
self.load_autoinstall_config(only_early=True) self.load_autoinstall_config(only_early=True)
if self.autoinstall_config and self.controllers.Early.cmds: if self.autoinstall_config and self.controllers.Early.cmds:
stamp_file = self.state_path("early-commands") stamp_file = self.state_path("early-commands")
@ -654,7 +657,9 @@ class SubiquityServer(Application):
open(stamp_file, 'w').close() open(stamp_file, 'w').close()
await asyncio.sleep(1) await asyncio.sleep(1)
self.load_autoinstall_config(only_early=False) self.load_autoinstall_config(only_early=False)
self.save_autoinstall_for_reload() self.copy_autoinstall(
self.autoinstall,
self.base_relative(reload_autoinstall_path))
if self.autoinstall_config: if self.autoinstall_config:
self.interactive = bool( self.interactive = bool(
self.autoinstall_config.get('interactive-sections')) self.autoinstall_config.get('interactive-sections'))

View File

@ -13,8 +13,10 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import shlex
from unittest.mock import Mock from unittest.mock import Mock
from subiquitycore.utils import run_command
from subiquitycore.tests import SubiTestCase from subiquitycore.tests import SubiTestCase
from subiquity.server.server import ( from subiquity.server.server import (
SubiquityServer, SubiquityServer,
@ -25,6 +27,10 @@ from subiquity.server.server import (
class TestAutoinstallLoad(SubiTestCase): class TestAutoinstallLoad(SubiTestCase):
def assertContents(self, path, expected):
with open(path, 'r') as fp:
self.assertEqual(expected, fp.read())
def setUp(self): def setUp(self):
self.tempdir = self.tmp_dir() self.tempdir = self.tmp_dir()
opts = Mock() opts = Mock()
@ -40,55 +46,83 @@ class TestAutoinstallLoad(SubiTestCase):
def path(self, relative_path): def path(self, relative_path):
return self.tmp_path(relative_path, dir=self.tempdir) return self.tmp_path(relative_path, dir=self.tempdir)
def create(self, path): def create(self, path, contents):
path = self.path(path) path = self.path(path)
open(path, 'w').close() with open(path, 'w') as fp:
fp.write(contents)
return path return path
def test_autoinstall_disabled(self): def test_autoinstall_disabled(self):
self.create(reload_autoinstall_path) self.create(reload_autoinstall_path, 'reload')
self.create(cloud_autoinstall_path) self.create(cloud_autoinstall_path, 'cloud')
self.create(iso_autoinstall_path) self.create(iso_autoinstall_path, 'iso')
self.server.opts.autoinstall = "" self.server.opts.autoinstall = ""
self.assertIsNone(self.server.select_autoinstall_location()) self.assertIsNone(self.server.select_autoinstall())
def test_reload_wins(self): def test_reload_wins(self):
expected = self.create(reload_autoinstall_path) self.create(reload_autoinstall_path, 'reload')
autoinstall = self.create(self.path('arg.autoinstall.yaml')) autoinstall = self.create(self.path('arg.autoinstall.yaml'), 'arg')
self.server.opts.autoinstall = autoinstall self.server.opts.autoinstall = autoinstall
self.create(cloud_autoinstall_path) self.create(cloud_autoinstall_path, 'cloud')
self.create(iso_autoinstall_path) iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(expected, self.server.select_autoinstall_location()) self.assertEqual(iso, self.server.select_autoinstall())
self.assertContents(iso, 'reload')
def test_arg_wins(self): def test_arg_wins(self):
expected = self.create(self.path('arg.autoinstall.yaml')) arg = self.create(self.path('arg.autoinstall.yaml'), 'arg')
self.server.opts.autoinstall = expected self.server.opts.autoinstall = arg
self.create(cloud_autoinstall_path) self.create(cloud_autoinstall_path, 'cloud')
self.create(iso_autoinstall_path) iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(expected, self.server.select_autoinstall_location()) self.assertEqual(iso, self.server.select_autoinstall())
self.assertContents(iso, 'arg')
def test_cloud_wins(self): def test_cloud_wins(self):
expected = self.create(cloud_autoinstall_path) self.create(cloud_autoinstall_path, 'cloud')
self.create(iso_autoinstall_path) iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(expected, self.server.select_autoinstall_location()) self.assertEqual(iso, self.server.select_autoinstall())
self.assertContents(iso, 'cloud')
def test_iso_wins(self): def test_iso_wins(self):
expected = self.create(iso_autoinstall_path) iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(expected, self.server.select_autoinstall_location()) self.assertEqual(iso, self.server.select_autoinstall())
self.assertContents(iso, 'iso')
def test_nobody_wins(self): def test_nobody_wins(self):
self.assertIsNone(self.server.select_autoinstall_location()) self.assertIsNone(self.server.select_autoinstall())
def test_copied_to_reload(self): def test_copied_to_reload(self):
self.server.autoinstall = self.tmp_path('test.yaml', dir=self.tempdir) data = 'stuff things'
expected = 'stuff things' src = self.create(self.path('test.yaml'), data)
with open(self.server.autoinstall, 'w') as fp: tgt = self.path(reload_autoinstall_path)
fp.write(expected) self.server.copy_autoinstall(src, tgt)
self.server.save_autoinstall_for_reload() self.assertContents(tgt, data)
with open(self.path(reload_autoinstall_path), 'r') as fp:
self.assertEqual(expected, fp.read())
def test_bogus_autoinstall_argument(self): def test_bogus_autoinstall_argument(self):
self.server.opts.autoinstall = self.path('nonexistant.yaml') self.server.opts.autoinstall = self.path('nonexistant.yaml')
with self.assertRaises(Exception): with self.assertRaises(Exception):
self.server.select_autoinstall_location() self.server.select_autoinstall()
def test_early_commands_changes_autoinstall(self):
self.server.controllers = Mock()
self.server.controllers.instances = []
isopath = self.create(iso_autoinstall_path, '')
cmd = f"sed -i -e '$ a stuff: things' {isopath}"
contents = f'''\
version: 1
early-commands: ["{cmd}"]
'''
self.create(cloud_autoinstall_path, contents)
self.server.autoinstall = self.server.select_autoinstall()
self.server.load_autoinstall_config(only_early=True)
before_early = {'version': 1,
'early-commands': [cmd]}
self.assertEqual(before_early, self.server.autoinstall_config)
run_command(shlex.split(cmd), check=True)
self.server.load_autoinstall_config(only_early=False)
after_early = {'version': 1,
'early-commands': [cmd],
'stuff': 'things'}
self.assertEqual(after_early, self.server.autoinstall_config)