Merge pull request #1237 from dbungert/lp-1948823

autoinstall: allow cloud source to override iso source
This commit is contained in:
Dan Bungert 2022-03-21 12:19:14 -06:00 committed by GitHub
commit fd65eebd0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 150 additions and 13 deletions

View File

@ -17,6 +17,7 @@ import asyncio
import logging
import os
import shlex
import shutil
import sys
import time
from typing import List, Optional
@ -83,6 +84,10 @@ from subiquitycore.snapd import (
NOPROBERARG = "NOPROBER"
iso_autoinstall_path = 'autoinstall.yaml'
reload_autoinstall_path = 'run/subiquity/reload.autoinstall.yaml'
cloud_autoinstall_path = 'run/subiquity/cloud.autoinstall.yaml'
log = logging.getLogger('subiquity.server.server')
@ -468,12 +473,11 @@ class SubiquityServer(Application):
await controller.configured()
def load_autoinstall_config(self, *, only_early):
log.debug("load_autoinstall_config only_early %s", only_early)
if not self.opts.autoinstall:
# autoinstall is None -> no autoinstall file supplied or found
# autoinstall is empty -> explicitly disabling autoinstall
log.debug('load_autoinstall_config only_early %s file %s',
only_early, self.autoinstall)
if not self.autoinstall:
return
with open(self.opts.autoinstall) as fp:
with open(self.autoinstall) as fp:
self.autoinstall_config = yaml.safe_load(fp)
if only_early:
self.controllers.Reporting.setup_autoinstall()
@ -502,6 +506,9 @@ class SubiquityServer(Application):
# It is intended that a non-root client can connect.
os.chmod(self.opts.socket, 0o666)
def base_relative(self, path):
return os.path.join(self.base_model.root, path)
async def wait_for_cloudinit(self):
if self.opts.dry_run:
self.cloud_init_ok = True
@ -523,19 +530,53 @@ class SubiquityServer(Application):
init.read_cfg()
init.fetch(existing="trust")
self.cloud = init.cloudify()
if self.opts.autoinstall is None:
autoinstall_path = '/autoinstall.yaml'
if 'autoinstall' in self.cloud.cfg:
if not os.path.exists(autoinstall_path):
cfg = self.cloud.cfg['autoinstall']
write_file(autoinstall_path, safeyaml.dumps(cfg))
if os.path.exists(autoinstall_path):
self.opts.autoinstall = autoinstall_path
if 'autoinstall' in self.cloud.cfg:
cfg = self.cloud.cfg['autoinstall']
target = self.base_relative(cloud_autoinstall_path)
write_file(target, safeyaml.dumps(cfg))
else:
log.debug(
"cloud-init status: %r, assumed disabled",
status_txt)
def select_autoinstall_location(self):
# precedence
# 1. data from before reload
# 2. command line argument autoinstall
# 3. autoinstall supplied by cloud config
# 4. autoinstall baked into the iso at /autoinstall.yaml
# if opts.autoinstall is set and empty, that means
# autoinstall has been explicitly disabled.
if self.opts.autoinstall == "":
return None
if self.opts.autoinstall is not None and not \
os.path.exists(self.opts.autoinstall):
raise Exception(
f'Autoinstall argument {self.opts.autoinstall} not found')
locations = (self.base_relative(reload_autoinstall_path),
self.opts.autoinstall,
self.base_relative(cloud_autoinstall_path),
self.base_relative(iso_autoinstall_path))
for loc in locations:
if loc is not None and os.path.exists(loc):
return loc
return None
def save_autoinstall_for_reload(self):
target = self.base_relative(reload_autoinstall_path)
if self.autoinstall is None:
return
if not os.path.exists(self.autoinstall):
return
if os.path.exists(target):
return
dirname = os.path.dirname(target)
os.makedirs(dirname, exist_ok=True)
shutil.copyfile(self.autoinstall, target)
def _user_has_password(self, username):
with open('/etc/shadow') as fp:
for line in fp:
@ -599,6 +640,7 @@ class SubiquityServer(Application):
await self.start_api_server()
self.update_state(ApplicationState.CLOUD_INIT_WAIT)
await self.wait_for_cloudinit()
self.autoinstall = self.select_autoinstall_location()
self.set_installer_password()
self.load_autoinstall_config(only_early=True)
if self.autoinstall_config and self.controllers.Early.cmds:
@ -612,6 +654,7 @@ class SubiquityServer(Application):
open(stamp_file, 'w').close()
await asyncio.sleep(1)
self.load_autoinstall_config(only_early=False)
self.save_autoinstall_for_reload()
if self.autoinstall_config:
self.interactive = bool(
self.autoinstall_config.get('interactive-sections'))

View File

@ -0,0 +1,94 @@
# Copyright 2022 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest.mock import Mock
from subiquitycore.tests import SubiTestCase
from subiquity.server.server import (
SubiquityServer,
cloud_autoinstall_path,
iso_autoinstall_path,
reload_autoinstall_path,
)
class TestAutoinstallLoad(SubiTestCase):
def setUp(self):
self.tempdir = self.tmp_dir()
opts = Mock()
opts.dry_run = True
opts.output_base = self.tempdir
opts.machine_config = 'examples/simple.json'
opts.kernel_cmdline = ''
opts.autoinstall = None
self.server = SubiquityServer(opts, None)
self.server.base_model = Mock()
self.server.base_model.root = opts.output_base
def path(self, relative_path):
return self.tmp_path(relative_path, dir=self.tempdir)
def create(self, path):
path = self.path(path)
open(path, 'w').close()
return path
def test_autoinstall_disabled(self):
self.create(reload_autoinstall_path)
self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.server.opts.autoinstall = ""
self.assertIsNone(self.server.select_autoinstall_location())
def test_reload_wins(self):
expected = self.create(reload_autoinstall_path)
autoinstall = self.create(self.path('arg.autoinstall.yaml'))
self.server.opts.autoinstall = autoinstall
self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
def test_arg_wins(self):
expected = self.create(self.path('arg.autoinstall.yaml'))
self.server.opts.autoinstall = expected
self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
def test_cloud_wins(self):
expected = self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
def test_iso_wins(self):
expected = self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
def test_nobody_wins(self):
self.assertIsNone(self.server.select_autoinstall_location())
def test_copied_to_reload(self):
self.server.autoinstall = self.tmp_path('test.yaml', dir=self.tempdir)
expected = 'stuff things'
with open(self.server.autoinstall, 'w') as fp:
fp.write(expected)
self.server.save_autoinstall_for_reload()
with open(self.path(reload_autoinstall_path), 'r') as fp:
self.assertEqual(expected, fp.read())
def test_bogus_autoinstall_argument(self):
self.server.opts.autoinstall = self.path('nonexistant.yaml')
with self.assertRaises(Exception):
self.server.select_autoinstall_location()