Merge pull request #1359 from blackboxsw/cloud-init/validate-user-data-schema

subiquitymodel: validate merged cloud-config userdata_raw before reboot
This commit is contained in:
Dan Bungert 2022-09-01 13:48:14 -06:00 committed by GitHub
commit 24ec78c670
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 211 additions and 3 deletions

View File

@ -18,5 +18,5 @@ source:
updates: security updates: security
user-data: user-data:
users: users:
- username: ubuntu - name: ubuntu
password: '$6$wdAcoXrU039hKYPd$508Qvbe7ObUnxoj15DRCkzC3qO7edjH0VV7BPNRDYK4QR8ofJaEEF2heacn0QgD.f8pO8SNp83XNdWG6tocBM1' passwd: '$6$wdAcoXrU039hKYPd$508Qvbe7ObUnxoj15DRCkzC3qO7edjH0VV7BPNRDYK4QR8ofJaEEF2heacn0QgD.f8pO8SNp83XNdWG6tocBM1'

View File

@ -23,6 +23,18 @@ from typing import Set
import uuid import uuid
import yaml import yaml
from cloudinit.config.schema import (
SchemaValidationError,
get_schema,
validate_cloudconfig_schema
)
try:
from cloudinit.config.schema import SchemaProblem
except ImportError:
def SchemaProblem(x, y): return (x, y) # TODO(drop on cloud-init 22.3 SRU)
from curtin.config import merge_config from curtin.config import merge_config
from subiquitycore.file_util import ( from subiquitycore.file_util import (
@ -274,6 +286,46 @@ class SubiquityModel:
def confirm(self): def confirm(self):
self._confirmation.set() self._confirmation.set()
def validate_cloudconfig_schema(self, data: dict, data_source: str):
"""Validate data config adheres to strict cloud-config schema
Log warnings on any deprecated cloud-config keys used.
:param data: dict of valid cloud-config
:param data_source: str to present in logs/errors describing
where this config came from: autoinstall.user-data or system info
:raise SchemaValidationError: on invalid cloud-config schema
"""
# cloud-init v. 22.3 will allow for log_deprecations=True to avoid
# raising errors on deprecated keys.
# In the meantime, iterate over schema_deprecations to log warnings.
try:
validate_cloudconfig_schema(data, schema=get_schema(), strict=True)
except SchemaValidationError as e:
if hasattr(e, "schema_deprecations"):
warnings = []
deprecations = getattr(e, "schema_deprecations")
if deprecations:
for schema_path, message in deprecations:
warnings.append(message)
if warnings:
log.warning(
"The cloud-init configuration for %s contains"
" deprecated values:\n%s", data_source, "\n".join(
warnings
)
)
if e.schema_errors:
if data_source == "autoinstall.user-data":
errors = [
SchemaProblem(f"{data_source}.{path}", message)
for (path, message) in e.schema_errors
]
else:
errors = e.schema_errors
raise SchemaValidationError(schema_errors=errors)
def _cloud_init_config(self): def _cloud_init_config(self):
config = { config = {
'growpart': { 'growpart': {
@ -311,6 +363,9 @@ class SubiquityModel:
config.setdefault("write_files", []).append( config.setdefault("write_files", []).append(
CLOUDINIT_DISABLE_AFTER_INSTALL CLOUDINIT_DISABLE_AFTER_INSTALL
) )
self.validate_cloudconfig_schema(
data=config, data_source="system install"
)
return config return config
async def target_packages(self): async def target_packages(self):

View File

@ -13,7 +13,6 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import fnmatch import fnmatch
import json import json
import unittest import unittest
@ -21,6 +20,12 @@ from unittest import mock
import re import re
import yaml import yaml
from cloudinit.config.schema import SchemaValidationError
try:
from cloudinit.config.schema import SchemaProblem
except ImportError:
def SchemaProblem(x, y): return (x, y) # TODO(drop on cloud-init 22.3 SRU)
from subiquitycore.pubsub import MessageHub from subiquitycore.pubsub import MessageHub
from subiquity.common.types import IdentityData from subiquity.common.types import IdentityData
@ -236,6 +241,17 @@ class TestSubiquityModel(unittest.IsolatedAsyncioTestCase):
self.assertEqual(len(cloud_init_config['users']), 1) self.assertEqual(len(cloud_init_config['users']), 1)
self.assertEqual(cloud_init_config['users'][0]['name'], 'user2') self.assertEqual(cloud_init_config['users'][0]['name'], 'user2')
with self.subTest('Invalid user-data raises error'):
model = self.make_model()
model.userdata = {'bootcmd': "nope"}
with self.assertRaises(SchemaValidationError) as ctx:
model._cloud_init_config()
expected_error = (
"Cloud config schema errors: bootcmd: 'nope' is not of type"
" 'array'"
)
self.assertEqual(expected_error, str(ctx.exception))
@mock.patch('subiquity.models.subiquity.lsb_release') @mock.patch('subiquity.models.subiquity.lsb_release')
@mock.patch('subiquitycore.file_util.datetime.datetime') @mock.patch('subiquitycore.file_util.datetime.datetime')
def test_cloud_init_files_emits_datasource_config_and_clean_script( def test_cloud_init_files_emits_datasource_config_and_clean_script(
@ -297,3 +313,69 @@ class TestSubiquityModel(unittest.IsolatedAsyncioTestCase):
self.assertIsNotNone(expected_files[cpath].match(content)) self.assertIsNotNone(expected_files[cpath].match(content))
else: else:
self.assertEqual(expected_files[cpath], content) self.assertEqual(expected_files[cpath], content)
def test_validatecloudconfig_schema(self):
model = self.make_model()
with self.subTest('Valid cloud-config does not error'):
model.validate_cloudconfig_schema(
data={"ssh_import_id": ["chad.smith"]},
data_source="autoinstall.user-data"
)
# Create our own subclass for focal as schema_deprecations
# was not yet defined.
class SchemaDeprecation(SchemaValidationError):
schema_deprecations = ()
def __init__(self, schema_errors=(), schema_deprecations=()):
super().__init__(schema_errors)
self.schema_deprecations = schema_deprecations
problem = SchemaProblem(
"bogus",
"'bogus' is deprecated, use 'notbogus' instead"
)
with self.subTest('Deprecated cloud-config warns'):
with unittest.mock.patch(
"subiquity.models.subiquity.validate_cloudconfig_schema"
) as validate:
validate.side_effect = SchemaDeprecation(
schema_deprecations=(problem,)
)
with self.assertLogs(
"subiquity.models.subiquity", level="INFO"
) as logs:
model.validate_cloudconfig_schema(
data={"bogus": True},
data_source="autoinstall.user-data"
)
expected = (
"WARNING:subiquity.models.subiquity:The cloud-init"
" configuration for autoinstall.user-data contains deprecated"
" values:\n'bogus' is deprecated, use 'notbogus' instead"
)
self.assertEqual(logs.output, [expected])
with self.subTest('Invalid cloud-config schema errors'):
with self.assertRaises(SchemaValidationError) as ctx:
model.validate_cloudconfig_schema(
data={"bootcmd": "nope"}, data_source="system info"
)
expected_error = (
"Cloud config schema errors: bootcmd: 'nope' is not of"
" type 'array'"
)
self.assertEqual(expected_error, str(ctx.exception))
with self.subTest('Prefix autoinstall.user-data cloud-config errors'):
with self.assertRaises(SchemaValidationError) as ctx:
model.validate_cloudconfig_schema(
data={"bootcmd": "nope"},
data_source="autoinstall.user-data"
)
expected_error = (
"Cloud config schema errors:"
" autoinstall.user-data.bootcmd: 'nope' is not of"
" type 'array'"
)
self.assertEqual(expected_error, str(ctx.exception))

View File

@ -0,0 +1,63 @@
# Copyright 2022 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from subiquity.server.controllers.userdata import (
UserdataController,
)
from subiquitycore.tests.mocks import make_app
from cloudinit.config.schema import SchemaValidationError
try:
from cloudinit.config.schema import SchemaProblem
except ImportError:
def SchemaProblem(x, y): return (x, y) # TODO(drop on cloud-init 22.3 SRU)
class TestUserdataController(unittest.TestCase):
def setUp(self):
self.controller = UserdataController(make_app())
def test_load_autoinstall_data(self):
with self.subTest('Valid user-data resets userdata model'):
valid_schema = {"ssh_import_id": ["you"]}
self.controller.model = {"some": "old userdata"}
self.controller.load_autoinstall_data(valid_schema)
self.assertEqual(self.controller.model, valid_schema)
fake_error = SchemaValidationError(
schema_errors=(
SchemaProblem(
"ssh_import_id",
"'wrong' is not of type 'array'"
),
),
)
invalid_schema = {"ssh_import_id": "wrong"}
validate = self.controller.app.base_model.validate_cloudconfig_schema
validate.side_effect = fake_error
with self.subTest('Invalid user-data raises error'):
with self.assertRaises(SchemaValidationError) as ctx:
self.controller.load_autoinstall_data(invalid_schema)
expected_error = (
"Cloud config schema errors: ssh_import_id: 'wrong' is not of"
" type 'array'"
)
self.assertEqual(expected_error, str(ctx.exception))
validate.assert_called_with(
data=invalid_schema,
data_source="autoinstall.user-data"
)

View File

@ -13,8 +13,12 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from subiquity.server.controller import NonInteractiveController from subiquity.server.controller import NonInteractiveController
log = logging.getLogger("subiquity.server.controllers.userdata")
class UserdataController(NonInteractiveController): class UserdataController(NonInteractiveController):
@ -27,6 +31,10 @@ class UserdataController(NonInteractiveController):
def load_autoinstall_data(self, data): def load_autoinstall_data(self, data):
self.model.clear() self.model.clear()
if data:
self.app.base_model.validate_cloudconfig_schema(
data=data, data_source="autoinstall.user-data",
)
self.model.update(data) self.model.update(data)
def make_autoinstall(self): def make_autoinstall(self):