Merge pull request #1947 from Chris-Peterson444/strict-top-level-keys

Enforce top-level keys
This commit is contained in:
Chris Peterson 2024-03-25 19:59:13 -07:00 committed by GitHub
commit ce0683adec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 235 additions and 4 deletions

View File

@ -3,7 +3,11 @@
Autoinstall configuration reference manual
==========================================
The autoinstall file uses the YAML format. At the top level, it must be a mapping containing the keys described in this document. Unrecognised keys are ignored.
The autoinstall file uses the YAML format. At the top level, it must be a
mapping containing the keys described in this document. Unrecognised keys
are ignored in version 1, but will cause a fatal validation error in future
versions.
.. _ai-schema:
@ -25,6 +29,12 @@ Several configuration keys are lists of commands to be executed. Each command ca
Top-level keys
--------------
.. warning::
In version 1, Subiquity will emit warnings when encountering unrecognised
keys. In later versions, a fatal validation error is thrown and the
installation will halt.
.. _ai-version:
version

View File

@ -14,11 +14,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import copy
import logging
import os
import sys
import time
from typing import List, Optional
from typing import Any, List, Optional
import jsonschema
import yaml
@ -553,8 +554,97 @@ class SubiquityServer(Application):
await controller.apply_autoinstall_config()
await controller.configured()
def filter_autoinstall(
self,
autoinstall_config: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Separates autoinstall config by known and unknown keys"""
invalid_config: dict[str, Any] = copy.deepcopy(autoinstall_config)
# Pop keys of all loaded controllers, if they exists
for controller in self.controllers.instances:
invalid_config.pop(controller.autoinstall_key, None)
# Pop server keys, if they exist
for key in self.base_schema["properties"]:
invalid_config.pop(key, None)
valid_config: dict[str, Any] = copy.deepcopy(autoinstall_config)
# Remove the keys we isolated
for key in invalid_config:
valid_config.pop(key)
return (valid_config, invalid_config)
@with_context(name="top_level_keys")
def _enforce_top_level_keys(
self,
*,
autoinstall_config: dict[str, Any],
context: Context,
) -> dict[str, Any]:
"""Enforces usage of known top-level keys.
In Autoinstall v1, unknown top-level keys are removed from
the config and a cleaned config is returned.
In Autoinstall v2, unknown top-level keys result in a fatal
AutoinstallValidationError
Only checks for unrecognized keys, doesn't validate them.
Requires a version number, so this should be called after
validating against the schema.
"""
valid_config, invalid_config = self.filter_autoinstall(autoinstall_config)
# If no bad keys, return early
if len(invalid_config.keys()) == 0:
return autoinstall_config
# If the version is early enough, only warn
version: int = autoinstall_config["version"]
ctx = context
if version == 1:
# Warn then clean out bad keys and return
for key in invalid_config:
warning = f"Unrecognized top-level key {key!r}"
log.warning(warning)
ctx.warning(warning)
warning = (
"Unrecognized top-level keys will cause Autoinstall to "
"throw an error in future versions."
)
log.warning(warning)
ctx.warning(warning)
return valid_config
else:
for key in invalid_config:
error = f"Unrecognized top-level key {key!r}"
log.error(error)
ctx.error(error)
error = "Unrecognized top-level keys are unsupported"
log.error(error)
ctx.error(error)
raw_keys = (f"{key!r}" for key in invalid_config)
details: str = f"Unrecognized top-level key(s): {', '.join(raw_keys)}"
raise AutoinstallValidationError(
owner="top-level keys",
details=details,
)
def validate_autoinstall(self):
with self.context.child("core_validation", level="INFO"):
with self.context.child("core_validation", level="INFO") as ctx:
try:
jsonschema.validate(self.autoinstall_config, self.base_schema)
except ValidationError as original_exception:
@ -567,6 +657,12 @@ class SubiquityServer(Application):
raise new_exception from original_exception
# Enforce top level keys after ensuring we have a version number
self.autoinstall_config = self._enforce_top_level_keys(
autoinstall_config=self.autoinstall_config,
context=ctx,
)
def load_autoinstall_config(self, *, only_early):
log.debug(
"load_autoinstall_config only_early %s file %s",

View File

@ -15,6 +15,7 @@
import os
import shlex
from typing import Any
from unittest.mock import AsyncMock, Mock, patch
import jsonschema
@ -125,7 +126,9 @@ class TestAutoinstallLoad(SubiTestCase):
with self.assertRaises(Exception):
self.server.select_autoinstall()
def test_early_commands_changes_autoinstall(self):
# Only care about changes to autoinstall, not validity
@patch("subiquity.server.server.SubiquityServer.validate_autoinstall")
def test_early_commands_changes_autoinstall(self, validate_mock):
self.server.controllers = Mock()
self.server.controllers.instances = []
rootpath = self.path(root_autoinstall_path)
@ -166,6 +169,33 @@ class TestAutoinstallValidation(SubiTestCase):
}
self.server.make_apport_report = Mock()
# Pseudo Load Controllers to avoid patching the loading logic for each
# controller when we still want access to class attributes
def pseudo_load_controllers(self):
controller_classes = []
for prefix in self.server.controllers.controller_names:
controller_classes.append(
self.server.controllers._get_controller_class(prefix)
)
self.server.controllers.instances = controller_classes
def load_config_and_controllers(
self, config: dict[str, Any]
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Loads an autoinstall config and controllers.
Loads the provided autoinstall config and the controllers.
Returns the valid and invalid portions of the config.
"""
# Reset base schema
self.server.base_schema = SubiquityServer.base_schema
self.server.autoinstall_config = config
self.pseudo_load_controllers()
return self.server.filter_autoinstall(config)
def test_valid_schema(self):
"""Test that the expected autoinstall JSON schema is valid"""
@ -232,6 +262,101 @@ class TestAutoinstallValidation(SubiTestCase):
error = NonReportableError.from_exception(exception)
self.assertEqual(error, self.server.nonreportable_error)
@patch("subiquity.server.server.log")
async def test_autoinstall_validation__strict_top_level_keys_warn(self, log_mock):
"""Test strict top-level key enforcement warnings in v1"""
bad_ai_data = {
"version": 1,
"interactive-sections": ["identity"],
"apt": "Invalid but deferred",
"literally-anything": "lmao",
}
good, bad = self.load_config_and_controllers(bad_ai_data)
# OK in Version 1 but ensure warnings and stripped config
self.server.validate_autoinstall()
log_mock.warning.assert_called()
log_mock.error.assert_not_called()
self.assertEqual(self.server.autoinstall_config, good)
self.assertEqual(bad, {"literally-anything": "lmao"})
@patch("subiquity.server.server.log")
async def test_autoinstall_validation__strict_top_level_keys_error(self, log_mock):
"""Test strict top-level key enforcement errors in v2 or greater"""
bad_ai_data = {
"version": 2,
"interactive-sections": ["identity"],
"apt": "Invalid but deferred",
"literally-anything": "lmao",
}
self.load_config_and_controllers(bad_ai_data)
# TODO: remove once V2 is enabled
self.server.base_schema["properties"]["version"]["maximum"] = 2
# Not OK in Version >= 2
with self.assertRaises(AutoinstallValidationError) as ctx:
self.server.validate_autoinstall()
self.assertIn("top-level keys", str(ctx.exception))
log_mock.error.assert_called()
log_mock.warning.assert_not_called()
@parameterized.expand(
(
(
# Case 1: extra key "some-bad-key"
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
"some-bad-key": "...",
},
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
},
{"some-bad-key": "..."},
),
(
# Case 2: no bad keys
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
},
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
},
{},
),
(
# Case 3: no good keys
{"some-bad-key": "..."},
{},
{"some-bad-key": "..."},
),
)
)
async def test_autoinstall_validation__filter_autoinstall(self, config, good, bad):
"""Test autoinstall config filtering"""
self.server.base_schema = SubiquityServer.base_schema
self.pseudo_load_controllers()
valid, invalid = self.server.filter_autoinstall(config)
self.assertEqual(valid, good)
self.assertEqual(invalid, bad)
class TestMetaController(SubiTestCase):
async def test_interactive_sections_not_present(self):