Merge pull request #1255 from dbungert/autoinstall-vs-early-cmds

autoinstall: fix interaction with early-commands
This commit is contained in:
Dan Bungert 2022-04-11 12:57:23 -06:00 committed by GitHub
commit dac14e9724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 50 deletions

View File

@ -17,7 +17,6 @@ import asyncio
import logging
import os
import shlex
import shutil
import sys
import time
from typing import List, Optional
@ -37,7 +36,10 @@ import yaml
from subiquitycore.async_helpers import run_in_thread
from subiquitycore.context import with_context
from subiquitycore.core import Application
from subiquitycore.file_util import write_file
from subiquitycore.file_util import (
copy_file_if_exists,
write_file,
)
from subiquitycore.prober import Prober
from subiquitycore.ssh import (
host_key_fingerprints,
@ -539,7 +541,7 @@ class SubiquityServer(Application):
"cloud-init status: %r, assumed disabled",
status_txt)
def select_autoinstall_location(self):
def select_autoinstall(self):
# precedence
# 1. data from before reload
# 2. command line argument autoinstall
@ -562,20 +564,13 @@ class SubiquityServer(Application):
for loc in locations:
if loc is not None and os.path.exists(loc):
return loc
return None
break
else:
return None
def save_autoinstall_for_reload(self):
target = self.base_relative(reload_autoinstall_path)
if self.autoinstall is None:
return
if not os.path.exists(self.autoinstall):
return
if os.path.exists(target):
return
dirname = os.path.dirname(target)
os.makedirs(dirname, exist_ok=True)
shutil.copyfile(self.autoinstall, target)
isopath = self.base_relative(iso_autoinstall_path)
copy_file_if_exists(loc, isopath)
return isopath
def _user_has_password(self, username):
with open('/etc/shadow') as fp:
@ -640,8 +635,8 @@ class SubiquityServer(Application):
await self.start_api_server()
self.update_state(ApplicationState.CLOUD_INIT_WAIT)
await self.wait_for_cloudinit()
self.autoinstall = self.select_autoinstall_location()
self.set_installer_password()
self.autoinstall = self.select_autoinstall()
self.load_autoinstall_config(only_early=True)
if self.autoinstall_config and self.controllers.Early.cmds:
stamp_file = self.state_path("early-commands")
@ -654,7 +649,10 @@ class SubiquityServer(Application):
open(stamp_file, 'w').close()
await asyncio.sleep(1)
self.load_autoinstall_config(only_early=False)
self.save_autoinstall_for_reload()
if self.autoinstall is not None:
copy_file_if_exists(
self.autoinstall,
self.base_relative(reload_autoinstall_path))
if self.autoinstall_config:
self.interactive = bool(
self.autoinstall_config.get('interactive-sections'))

View File

@ -13,8 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import shlex
from unittest.mock import Mock
from subiquitycore.utils import run_command
from subiquitycore.tests import SubiTestCase
from subiquity.server.server import (
SubiquityServer,
@ -40,55 +42,76 @@ class TestAutoinstallLoad(SubiTestCase):
def path(self, relative_path):
return self.tmp_path(relative_path, dir=self.tempdir)
def create(self, path):
def create(self, path, contents):
path = self.path(path)
open(path, 'w').close()
with open(path, 'w') as fp:
fp.write(contents)
return path
def test_autoinstall_disabled(self):
self.create(reload_autoinstall_path)
self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.create(reload_autoinstall_path, 'reload')
self.create(cloud_autoinstall_path, 'cloud')
self.create(iso_autoinstall_path, 'iso')
self.server.opts.autoinstall = ""
self.assertIsNone(self.server.select_autoinstall_location())
self.assertIsNone(self.server.select_autoinstall())
def test_reload_wins(self):
expected = self.create(reload_autoinstall_path)
autoinstall = self.create(self.path('arg.autoinstall.yaml'))
self.create(reload_autoinstall_path, 'reload')
autoinstall = self.create(self.path('arg.autoinstall.yaml'), 'arg')
self.server.opts.autoinstall = autoinstall
self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
self.create(cloud_autoinstall_path, 'cloud')
iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(iso, self.server.select_autoinstall())
self.assert_contents(iso, 'reload')
def test_arg_wins(self):
expected = self.create(self.path('arg.autoinstall.yaml'))
self.server.opts.autoinstall = expected
self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
arg = self.create(self.path('arg.autoinstall.yaml'), 'arg')
self.server.opts.autoinstall = arg
self.create(cloud_autoinstall_path, 'cloud')
iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(iso, self.server.select_autoinstall())
self.assert_contents(iso, 'arg')
def test_cloud_wins(self):
expected = self.create(cloud_autoinstall_path)
self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
self.create(cloud_autoinstall_path, 'cloud')
iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(iso, self.server.select_autoinstall())
self.assert_contents(iso, 'cloud')
def test_iso_wins(self):
expected = self.create(iso_autoinstall_path)
self.assertEqual(expected, self.server.select_autoinstall_location())
iso = self.create(iso_autoinstall_path, 'iso')
self.assertEqual(iso, self.server.select_autoinstall())
self.assert_contents(iso, 'iso')
def test_nobody_wins(self):
self.assertIsNone(self.server.select_autoinstall_location())
def test_copied_to_reload(self):
self.server.autoinstall = self.tmp_path('test.yaml', dir=self.tempdir)
expected = 'stuff things'
with open(self.server.autoinstall, 'w') as fp:
fp.write(expected)
self.server.save_autoinstall_for_reload()
with open(self.path(reload_autoinstall_path), 'r') as fp:
self.assertEqual(expected, fp.read())
self.assertIsNone(self.server.select_autoinstall())
def test_bogus_autoinstall_argument(self):
self.server.opts.autoinstall = self.path('nonexistant.yaml')
with self.assertRaises(Exception):
self.server.select_autoinstall_location()
self.server.select_autoinstall()
def test_early_commands_changes_autoinstall(self):
self.server.controllers = Mock()
self.server.controllers.instances = []
isopath = self.create(iso_autoinstall_path, '')
cmd = f"sed -i -e '$ a stuff: things' {isopath}"
contents = f'''\
version: 1
early-commands: ["{cmd}"]
'''
self.create(cloud_autoinstall_path, contents)
self.server.autoinstall = self.server.select_autoinstall()
self.server.load_autoinstall_config(only_early=True)
before_early = {'version': 1,
'early-commands': [cmd]}
self.assertEqual(before_early, self.server.autoinstall_config)
run_command(shlex.split(cmd), check=True)
self.server.load_autoinstall_config(only_early=False)
after_early = {'version': 1,
'early-commands': [cmd],
'stuff': 'things'}
self.assertEqual(after_early, self.server.autoinstall_config)

View File

@ -18,6 +18,7 @@ import datetime
import grp
import logging
import os
import shutil
import tempfile
import yaml
@ -73,3 +74,16 @@ def generate_config_yaml(filename, content, **kwargs):
now = datetime.datetime.utcnow()
tf.write(f'# Autogenerated by Subiquity: {now} UTC\n')
tf.write(yaml.dump(content))
def copy_file_if_exists(source: str, target: str):
"""If source exists, copy to destination. Ignore error that dest may be a
duplicate. Create destination parent dirs as needed."""
if not os.path.exists(source):
return
dirname = os.path.dirname(target)
os.makedirs(dirname, exist_ok=True)
try:
shutil.copyfile(source, target)
except shutil.SameFileError:
pass

View File

@ -25,6 +25,10 @@ class SubiTestCase(TestCase):
dir = self.tmp_dir()
return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
def assert_contents(self, path, expected_contents):
with open(path, 'r') as fp:
self.assertEqual(expected_contents, fp.read())
def populate_dir(path, files):
if not os.path.exists(path):

View File

@ -0,0 +1,31 @@
# Copyright 2022 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from subiquitycore.file_util import copy_file_if_exists
from subiquitycore.tests import SubiTestCase
class TestCopy(SubiTestCase):
def test_copied_to_non_exist_dir(self):
data = 'stuff things'
src = self.tmp_path('src')
tgt = self.tmp_path('create-me/target')
with open(src, 'w') as fp:
fp.write(data)
copy_file_if_exists(src, tgt)
self.assert_contents(tgt, data)
def test_copied_non_exist_src(self):
copy_file_if_exists('/does/not/exist', '/ditto')