diff --git a/subiquity/server/server.py b/subiquity/server/server.py
index 0b7dc679..fb60cfe0 100644
--- a/subiquity/server/server.py
+++ b/subiquity/server/server.py
@@ -17,7 +17,6 @@ import asyncio
import logging
import os
import shlex
-import shutil
import sys
import time
from typing import List, Optional
@@ -37,7 +36,10 @@ import yaml
from subiquitycore.async_helpers import run_in_thread
from subiquitycore.context import with_context
from subiquitycore.core import Application
-from subiquitycore.file_util import write_file
+from subiquitycore.file_util import (
+ copy_file_if_exists,
+ write_file,
+ )
from subiquitycore.prober import Prober
from subiquitycore.ssh import (
host_key_fingerprints,
@@ -539,7 +541,7 @@ class SubiquityServer(Application):
"cloud-init status: %r, assumed disabled",
status_txt)
- def select_autoinstall_location(self):
+ def select_autoinstall(self):
# precedence
# 1. data from before reload
# 2. command line argument autoinstall
@@ -562,20 +564,13 @@ class SubiquityServer(Application):
for loc in locations:
if loc is not None and os.path.exists(loc):
- return loc
- return None
+ break
+ else:
+ return None
- def save_autoinstall_for_reload(self):
- target = self.base_relative(reload_autoinstall_path)
- if self.autoinstall is None:
- return
- if not os.path.exists(self.autoinstall):
- return
- if os.path.exists(target):
- return
- dirname = os.path.dirname(target)
- os.makedirs(dirname, exist_ok=True)
- shutil.copyfile(self.autoinstall, target)
+ isopath = self.base_relative(iso_autoinstall_path)
+ copy_file_if_exists(loc, isopath)
+ return isopath
def _user_has_password(self, username):
with open('/etc/shadow') as fp:
@@ -640,8 +635,8 @@ class SubiquityServer(Application):
await self.start_api_server()
self.update_state(ApplicationState.CLOUD_INIT_WAIT)
await self.wait_for_cloudinit()
- self.autoinstall = self.select_autoinstall_location()
self.set_installer_password()
+ self.autoinstall = self.select_autoinstall()
self.load_autoinstall_config(only_early=True)
if self.autoinstall_config and self.controllers.Early.cmds:
stamp_file = self.state_path("early-commands")
@@ -654,7 +649,10 @@ class SubiquityServer(Application):
open(stamp_file, 'w').close()
await asyncio.sleep(1)
self.load_autoinstall_config(only_early=False)
- self.save_autoinstall_for_reload()
+ if self.autoinstall is not None:
+ copy_file_if_exists(
+ self.autoinstall,
+ self.base_relative(reload_autoinstall_path))
if self.autoinstall_config:
self.interactive = bool(
self.autoinstall_config.get('interactive-sections'))
diff --git a/subiquity/server/tests/test_server.py b/subiquity/server/tests/test_server.py
index 033f93c7..e4f47180 100644
--- a/subiquity/server/tests/test_server.py
+++ b/subiquity/server/tests/test_server.py
@@ -13,8 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
+import shlex
from unittest.mock import Mock
+from subiquitycore.utils import run_command
from subiquitycore.tests import SubiTestCase
from subiquity.server.server import (
SubiquityServer,
@@ -40,55 +42,76 @@ class TestAutoinstallLoad(SubiTestCase):
def path(self, relative_path):
return self.tmp_path(relative_path, dir=self.tempdir)
- def create(self, path):
+ def create(self, path, contents):
path = self.path(path)
- open(path, 'w').close()
+ with open(path, 'w') as fp:
+ fp.write(contents)
return path
def test_autoinstall_disabled(self):
- self.create(reload_autoinstall_path)
- self.create(cloud_autoinstall_path)
- self.create(iso_autoinstall_path)
+ self.create(reload_autoinstall_path, 'reload')
+ self.create(cloud_autoinstall_path, 'cloud')
+ self.create(iso_autoinstall_path, 'iso')
self.server.opts.autoinstall = ""
- self.assertIsNone(self.server.select_autoinstall_location())
+ self.assertIsNone(self.server.select_autoinstall())
def test_reload_wins(self):
- expected = self.create(reload_autoinstall_path)
- autoinstall = self.create(self.path('arg.autoinstall.yaml'))
+ self.create(reload_autoinstall_path, 'reload')
+ autoinstall = self.create(self.path('arg.autoinstall.yaml'), 'arg')
self.server.opts.autoinstall = autoinstall
- self.create(cloud_autoinstall_path)
- self.create(iso_autoinstall_path)
- self.assertEqual(expected, self.server.select_autoinstall_location())
+ self.create(cloud_autoinstall_path, 'cloud')
+ iso = self.create(iso_autoinstall_path, 'iso')
+ self.assertEqual(iso, self.server.select_autoinstall())
+ self.assert_contents(iso, 'reload')
def test_arg_wins(self):
- expected = self.create(self.path('arg.autoinstall.yaml'))
- self.server.opts.autoinstall = expected
- self.create(cloud_autoinstall_path)
- self.create(iso_autoinstall_path)
- self.assertEqual(expected, self.server.select_autoinstall_location())
+ arg = self.create(self.path('arg.autoinstall.yaml'), 'arg')
+ self.server.opts.autoinstall = arg
+ self.create(cloud_autoinstall_path, 'cloud')
+ iso = self.create(iso_autoinstall_path, 'iso')
+ self.assertEqual(iso, self.server.select_autoinstall())
+ self.assert_contents(iso, 'arg')
def test_cloud_wins(self):
- expected = self.create(cloud_autoinstall_path)
- self.create(iso_autoinstall_path)
- self.assertEqual(expected, self.server.select_autoinstall_location())
+ self.create(cloud_autoinstall_path, 'cloud')
+ iso = self.create(iso_autoinstall_path, 'iso')
+ self.assertEqual(iso, self.server.select_autoinstall())
+ self.assert_contents(iso, 'cloud')
def test_iso_wins(self):
- expected = self.create(iso_autoinstall_path)
- self.assertEqual(expected, self.server.select_autoinstall_location())
+ iso = self.create(iso_autoinstall_path, 'iso')
+ self.assertEqual(iso, self.server.select_autoinstall())
+ self.assert_contents(iso, 'iso')
def test_nobody_wins(self):
- self.assertIsNone(self.server.select_autoinstall_location())
-
- def test_copied_to_reload(self):
- self.server.autoinstall = self.tmp_path('test.yaml', dir=self.tempdir)
- expected = 'stuff things'
- with open(self.server.autoinstall, 'w') as fp:
- fp.write(expected)
- self.server.save_autoinstall_for_reload()
- with open(self.path(reload_autoinstall_path), 'r') as fp:
- self.assertEqual(expected, fp.read())
+ self.assertIsNone(self.server.select_autoinstall())
def test_bogus_autoinstall_argument(self):
self.server.opts.autoinstall = self.path('nonexistant.yaml')
with self.assertRaises(Exception):
- self.server.select_autoinstall_location()
+ self.server.select_autoinstall()
+
+ def test_early_commands_changes_autoinstall(self):
+ self.server.controllers = Mock()
+ self.server.controllers.instances = []
+ isopath = self.create(iso_autoinstall_path, '')
+
+ cmd = f"sed -i -e '$ a stuff: things' {isopath}"
+ contents = f'''\
+version: 1
+early-commands: ["{cmd}"]
+'''
+ self.create(cloud_autoinstall_path, contents)
+
+ self.server.autoinstall = self.server.select_autoinstall()
+ self.server.load_autoinstall_config(only_early=True)
+ before_early = {'version': 1,
+ 'early-commands': [cmd]}
+ self.assertEqual(before_early, self.server.autoinstall_config)
+ run_command(shlex.split(cmd), check=True)
+
+ self.server.load_autoinstall_config(only_early=False)
+ after_early = {'version': 1,
+ 'early-commands': [cmd],
+ 'stuff': 'things'}
+ self.assertEqual(after_early, self.server.autoinstall_config)
diff --git a/subiquitycore/file_util.py b/subiquitycore/file_util.py
index 0af78d2b..0c5fe9e4 100644
--- a/subiquitycore/file_util.py
+++ b/subiquitycore/file_util.py
@@ -18,6 +18,7 @@ import datetime
import grp
import logging
import os
+import shutil
import tempfile
import yaml
@@ -73,3 +74,16 @@ def generate_config_yaml(filename, content, **kwargs):
now = datetime.datetime.utcnow()
tf.write(f'# Autogenerated by Subiquity: {now} UTC\n')
tf.write(yaml.dump(content))
+
+
+def copy_file_if_exists(source: str, target: str):
+ """If source exists, copy to destination. Ignore error that dest may be a
+ duplicate. Create destination parent dirs as needed."""
+ if not os.path.exists(source):
+ return
+ dirname = os.path.dirname(target)
+ os.makedirs(dirname, exist_ok=True)
+ try:
+ shutil.copyfile(source, target)
+ except shutil.SameFileError:
+ pass
diff --git a/subiquitycore/tests/__init__.py b/subiquitycore/tests/__init__.py
index da500c3f..0d117de5 100644
--- a/subiquitycore/tests/__init__.py
+++ b/subiquitycore/tests/__init__.py
@@ -25,6 +25,10 @@ class SubiTestCase(TestCase):
dir = self.tmp_dir()
return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
+ def assert_contents(self, path, expected_contents):
+ with open(path, 'r') as fp:
+ self.assertEqual(expected_contents, fp.read())
+
def populate_dir(path, files):
if not os.path.exists(path):
diff --git a/subiquitycore/tests/test_file_util.py b/subiquitycore/tests/test_file_util.py
new file mode 100644
index 00000000..ae0a5b43
--- /dev/null
+++ b/subiquitycore/tests/test_file_util.py
@@ -0,0 +1,31 @@
+# Copyright 2022 Canonical, Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from subiquitycore.file_util import copy_file_if_exists
+from subiquitycore.tests import SubiTestCase
+
+
+class TestCopy(SubiTestCase):
+ def test_copied_to_non_exist_dir(self):
+ data = 'stuff things'
+ src = self.tmp_path('src')
+ tgt = self.tmp_path('create-me/target')
+ with open(src, 'w') as fp:
+ fp.write(data)
+ copy_file_if_exists(src, tgt)
+ self.assert_contents(tgt, data)
+
+ def test_copied_non_exist_src(self):
+ copy_file_if_exists('/does/not/exist', '/ditto')