refactor to avoid checking the labels in the live layer twice

This commit is contained in:
Michael Hudson-Doyle 2024-05-07 09:01:21 +02:00
parent c7fd905c6b
commit 86282f5721
1 changed files with 40 additions and 49 deletions

View File

@ -22,7 +22,7 @@ import os
import pathlib import pathlib
import subprocess import subprocess
import time import time
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import attr import attr
import pyudev import pyudev
@ -184,6 +184,7 @@ class VariationInfo:
capability_info: CapabilityInfo = attr.Factory(CapabilityInfo) capability_info: CapabilityInfo = attr.Factory(CapabilityInfo)
min_size: Optional[int] = None min_size: Optional[int] = None
system: Optional[SystemDetails] = None system: Optional[SystemDetails] = None
needs_systems_mount: bool = False
def is_core_boot_classic(self) -> bool: def is_core_boot_classic(self) -> bool:
return self.label is not None return self.label is not None
@ -336,12 +337,14 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
self._source_handler.cleanup() self._source_handler.cleanup()
self._source_handler = None self._source_handler = None
async def _get_system(self, variation_name, label): async def _get_system(
self, variation_name, label
) -> Tuple[Optional[SystemDetails], bool]:
systems = await self.app.snapdapi.v2.systems.GET() systems = await self.app.snapdapi.v2.systems.GET()
labels = {system.label for system in systems.systems} labels = {system.label for system in systems.systems}
if label in labels: if label in labels:
try: try:
system = await self.app.snapdapi.v2.systems[label].GET() return await self.app.snapdapi.v2.systems[label].GET(), True
except requests.exceptions.HTTPError as http_err: except requests.exceptions.HTTPError as http_err:
log.warning("v2/systems/%s returned %s", label, http_err.response.text) log.warning("v2/systems/%s returned %s", label, http_err.response.text)
raise raise
@ -349,16 +352,14 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
try: try:
await self._mount_systems_dir(variation_name) await self._mount_systems_dir(variation_name)
except NoSnapdSystemsOnSource: except NoSnapdSystemsOnSource:
return None return None, False
try: try:
system = await self.app.snapdapi.v2.systems[label].GET() return await self.app.snapdapi.v2.systems[label].GET(), False
except requests.exceptions.HTTPError as http_err: except requests.exceptions.HTTPError as http_err:
log.warning("v2/systems/%s returned %s", label, http_err.response.text) log.warning("v2/systems/%s returned %s", label, http_err.response.text)
raise raise
finally: finally:
await self._unmount_systems_dir() await self._unmount_systems_dir()
log.debug("got system %s", system)
return system
def info_for_system(self, name: str, label: str, system: SystemDetails): def info_for_system(self, name: str, label: str, system: SystemDetails):
if len(system.volumes) > 1: if len(system.volumes) > 1:
@ -419,6 +420,29 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
return info return info
def _maybe_disable_encryption(self, info: VariationInfo) -> None:
if self.model.bootloader != Bootloader.UEFI:
log.debug("Disabling core boot based install options on non-UEFI " "system")
info.capability_info.disallow_if(
lambda cap: cap.is_core_boot(),
GuidedDisallowedCapabilityReason.NOT_UEFI,
_("Enhanced secure boot options only available on UEFI " "systems."),
)
search_drivers = self.app.base_model.source.search_drivers
if search_drivers is not SEARCH_DRIVERS_AUTOINSTALL_DEFAULT and search_drivers:
log.debug(
"Disabling core boot based install options as third-party "
"drivers selected"
)
info.capability_info.disallow_if(
lambda cap: cap.is_core_boot(),
GuidedDisallowedCapabilityReason.THIRD_PARTY_DRIVERS,
_(
"Enhanced secure boot options cannot currently install "
"third party drivers."
),
)
async def _examine_systems(self): async def _examine_systems(self):
self._variation_info.clear() self._variation_info.clear()
catalog_entry = self.app.base_model.source.current catalog_entry = self.app.base_model.source.current
@ -426,7 +450,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
system = None system = None
label = variation.snapd_system_label label = variation.snapd_system_label
if label is not None: if label is not None:
system = await self._get_system(name, label) system, in_live_layer = await self._get_system(name, label)
log.debug("got system %s for variation %s", system, name) log.debug("got system %s for variation %s", system, name)
if system is not None and len(system.volumes) > 0: if system is not None and len(system.volumes) > 0:
if not self.app.opts.enhanced_secureboot: if not self.app.opts.enhanced_secureboot:
@ -435,46 +459,15 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
info = self.info_for_system(name, label, system) info = self.info_for_system(name, label, system)
if info is None: if info is None:
continue continue
if self.model.bootloader != Bootloader.UEFI: if not in_live_layer:
log.debug( info.needs_systems_mount = True
"Disabling core boot based install options on non-UEFI " self._maybe_disable_encryption(info)
"system"
)
info.capability_info.disallow_if(
lambda cap: cap.is_core_boot(),
GuidedDisallowedCapabilityReason.NOT_UEFI,
_(
"Enhanced secure boot options only available on UEFI "
"systems."
),
)
search_drivers = self.app.base_model.source.search_drivers
if (
search_drivers is not SEARCH_DRIVERS_AUTOINSTALL_DEFAULT
and search_drivers
):
log.debug(
"Disabling core boot based install options as third-party "
"drivers selected"
)
info.capability_info.disallow_if(
lambda cap: cap.is_core_boot(),
GuidedDisallowedCapabilityReason.THIRD_PARTY_DRIVERS,
_(
"Enhanced secure boot options cannot currently install "
"third party drivers."
),
)
self._variation_info[name] = info
elif catalog_entry.type.startswith("dd-"): elif catalog_entry.type.startswith("dd-"):
min_size = variation.size min_size = variation.size
self._variation_info[name] = VariationInfo.dd( info = VariationInfo.dd(name=name, min_size=min_size)
name=name, min_size=min_size
)
else: else:
self._variation_info[name] = VariationInfo.classic( info = VariationInfo.classic(name=name, min_size=variation.size)
name=name, min_size=variation.size self._variation_info[name] = info
)
@with_context() @with_context()
async def apply_autoinstall_config(self, context=None): async def apply_autoinstall_config(self, context=None):
@ -865,12 +858,10 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
offset = offset + structure.size offset = offset + structure.size
async def guided_core_boot(self, disk: Disk): async def guided_core_boot(self, disk: Disk):
if self._info.needs_systems_mount:
await self._mount_systems_dir(self._info.name)
# Formatting for a core boot classic system relies on some curtin # Formatting for a core boot classic system relies on some curtin
# features that are only available with v2 partitioning. # features that are only available with v2 partitioning.
systems = await self.app.snapdapi.v2.systems.GET()
labels = {system.label for system in systems.systems}
if self._info.label not in labels:
await self._mount_systems_dir(self._info.name)
self.model.storage_version = 2 self.model.storage_version = 2
[volume] = self._info.system.volumes.values() [volume] = self._info.system.volumes.values()
self._on_volume = snapdapi.OnVolume.from_volume(volume) self._on_volume = snapdapi.OnVolume.from_volume(volume)