filesystem: add API support for LUKS recovery key

Signed-off-by: Olivier Gayot <olivier.gayot@canonical.com>
This commit is contained in:
Olivier Gayot 2023-08-17 15:57:32 +02:00
parent c210b93458
commit d8ebc56b69
7 changed files with 240 additions and 8 deletions

View File

@ -139,10 +139,15 @@ class FilesystemManipulator:
def create_volgroup(self, spec):
devices = set()
key = spec.get("passphrase")
for device in spec["devices"]:
self.clear(device)
if key:
device = self.model.add_dm_crypt(device, key)
device = self.model.add_dm_crypt(
device,
key,
recovery_key=spec.get("recovery-key"),
)
devices.add(device)
return self.model.add_volgroup(name=spec["name"], devices=devices)
@ -334,7 +339,11 @@ class FilesystemManipulator:
for d in spec["devices"]:
self.clear(d)
if key:
d = self.model.add_dm_crypt(d, key)
d = self.model.add_dm_crypt(
d,
key,
recovery_key=spec.get("recovery-key"),
)
d._constructed_device = existing
devices.add(d)
existing.name = spec["name"]

View File

@ -88,7 +88,7 @@ class TestAnnotations(unittest.TestCase):
def test_vg_encrypted_annotations(self):
model, disk = make_model_and_disk()
dm_crypt = model.add_dm_crypt(disk, key="passw0rd")
dm_crypt = model.add_dm_crypt(disk, key="passw0rd", recovery_key=None)
vg = model.add_volgroup("vg-0", {dm_crypt})
self.assertEqual(annotations(vg), ["new", "encrypted"])

View File

@ -493,11 +493,40 @@ GuidedStorageTarget = Union[
]
@attr.s(auto_attribs=True)
class RecoveryKey:
# Where to store the key in the live system.
live_location: Optional[str] = None
# Where to copy the key in the target system. /target will automatically be
# prefixed.
backup_location: Optional[str] = None
@classmethod
def from_autoinstall(
cls, config: Union[bool, Dict[str, Any]]
) -> Optional["RecoveryKey"]:
if config is False:
return None
# Recovery key with default values
if config is True:
return cls()
return cls(
backup_location=config.get("backup-location"),
live_location=config.get("live-location"),
)
@attr.s(auto_attribs=True)
class GuidedChoiceV2:
target: GuidedStorageTarget
capability: GuidedCapability
# Those two fields are only used when using LVM+LUKS
password: Optional[str] = attr.ib(default=None, repr=False)
recovery_key: Optional[RecoveryKey] = None
sizing_policy: Optional[SizingPolicy] = SizingPolicy.SCALED
reset_partition: bool = False

View File

@ -35,7 +35,7 @@ from curtin.swap import can_use_swapfile
from curtin.util import human2bytes
from probert.storage import StorageInfo
from subiquity.common.types import Bootloader, OsProber
from subiquity.common.types import Bootloader, OsProber, RecoveryKey
log = logging.getLogger("subiquity.models.filesystem")
@ -58,6 +58,25 @@ class RecoveryKeyHandler:
_key: Optional[str] = attr.ib(repr=False, default=None)
@classmethod
def from_post_data(
cls, data: Optional[RecoveryKey], default_suffix="recovery-key.txt"
) -> Optional["RecoveryKeyHandler"]:
"""Create RecoveryKeyHandler instance from POST-ed RecoveryKey data."""
if data is None:
return None
# Set default values for unspecified settings.
live_location = pathlib.Path("~").expanduser() / default_suffix
backup_location = pathlib.Path("/var/log/installer") / default_suffix
if data.live_location is not None:
live_location = pathlib.Path(data.live_location)
if data.backup_location is not None:
backup_location = pathlib.Path(data.backup_location)
return cls(live_location=live_location, backup_location=backup_location)
def load_key_from_file(self, location: pathlib.Path) -> None:
"""Load the key from the file specified"""
with location.open(mode="r", encoding="utf-8") as fh:
@ -1053,8 +1072,25 @@ class DM_Crypt:
volume: _Formattable = attributes.ref(backlink="_constructed_device")
key: Optional[str] = attr.ib(metadata={"redact": True}, default=None)
keyfile: Optional[str] = None
recovery_key: Optional[RecoveryKeyHandler] = None
_recovery_keyfile: Optional[str] = None
_recovery_live_location: Optional[str] = None
_recovery_backup_location: Optional[str] = None
path: Optional[str] = None
def __post_init__(self) -> None:
# When the object is created using _actions_from_config, we should
# build the recovery_key object.
if self._recovery_keyfile is None or self.recovery_key is not None:
return
props: Dict[str, pathlib.Path] = {}
if self._recovery_live_location:
props["live_location"] = pathlib.Path(self._recovery_live_location)
if self._recovery_backup_location:
props["backup_location"] = pathlib.Path(self._recovery_backup_location)
self.recovery_key = RecoveryKeyHandler(**props)
def serialize_key(self):
if self.key and not self.keyfile:
f = tempfile.NamedTemporaryFile(prefix="luks-key-", mode="w", delete=False)
@ -1064,6 +1100,36 @@ class DM_Crypt:
else:
return {}
def serialize_recovery_key(self) -> str:
if self.recovery_key is None:
return {"recovery_keyfile": None}
# A bit of a hack to make sure the recovery key gets created when
# converting the DM_Crypt object to a dict.
if self._recovery_keyfile is None:
self.assign_recovery_key()
props = {"recovery_keyfile": self._recovery_keyfile}
if self.recovery_key.live_location is not None:
props["recovery_live_location"] = str(self.recovery_key.live_location)
if self.recovery_key.backup_location is not None:
props["recovery_backup_location"] = str(self.recovery_key.backup_location)
return props
def assign_recovery_key(self):
"""Create the recovery key and temporary store it in a keyfile."""
f = tempfile.NamedTemporaryFile(
prefix="luks-recovery-key-", mode="w", delete=False
)
if self.recovery_key._key is None:
self.recovery_key.generate()
f.write(self.recovery_key._key)
f.close()
self._recovery_keyfile = f.name
dm_name: Optional[str] = None
preserve: bool = False
@ -1898,6 +1964,9 @@ class FilesystemModel:
def all_volgroups(self):
return self._all(type="lvm_volgroup")
def all_dm_crypts(self):
return self._all(type="dm_crypt")
def partition_by_partuuid(self, partuuid: str) -> Optional[Partition]:
return self._one(type="partition", uuid=partuuid)
@ -1998,10 +2067,23 @@ class FilesystemModel:
raise Exception("can only remove empty LV")
self._remove(lv)
def add_dm_crypt(self, volume, key):
def add_dm_crypt(
self,
volume,
key,
*,
recovery_key: Optional[RecoveryKeyHandler],
root: Optional[pathlib.Path] = None,
):
if not volume.available:
raise Exception("{} is not available".format(volume))
dm_crypt = DM_Crypt(m=self, volume=volume, key=key)
dm_crypt = DM_Crypt(
m=self,
volume=volume,
key=key,
recovery_key=recovery_key,
)
self._actions.append(dm_crypt)
return dm_crypt
@ -2117,3 +2199,37 @@ class FilesystemModel:
consist of 48 decimal digits."""
digits = 48
return str(secrets.randbelow(10**digits)).zfill(digits)
def load_or_generate_recovery_keys(self) -> None:
for dm_crypt in self.all_dm_crypts():
if dm_crypt.recovery_key is None:
continue
if dm_crypt.recovery_key._key is not None:
continue
if dm_crypt._recovery_keyfile is not None:
dm_crypt.recovery_key.load_key_from_file(
pathlib.Path(dm_crypt._recovery_keyfile)
)
else:
dm_crypt.recovery_key.generate()
def expose_recovery_keys(self) -> None:
for dm_crypt in self.all_dm_crypts():
if dm_crypt.recovery_key is None:
continue
handler = dm_crypt.recovery_key
if handler.live_location is None:
continue
handler.expose_key_to_live_system(root=self.root)
def copy_artifacts_to_target(self) -> None:
for dm_crypt in self.all_dm_crypts():
if dm_crypt.recovery_key is None:
continue
log.debug(
"Copying recovery key for %s to target: %s", dm_crypt, self.target
)
dm_crypt.recovery_key.copy_key_to_target_system(target=self.target)

View File

@ -13,6 +13,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pathlib
import unittest
from unittest import mock
@ -20,6 +21,7 @@ import attr
import yaml
from subiquity.common.filesystem import gaps
from subiquity.common.types import RecoveryKey
from subiquity.models.filesystem import (
LVM_CHUNK_SIZE,
ZFS,
@ -30,6 +32,7 @@ from subiquity.models.filesystem import (
FilesystemModel,
NotFinalPartitionError,
Partition,
RecoveryKeyHandler,
ZPool,
align_down,
dehumanize_size,
@ -1558,3 +1561,50 @@ class TestLivePackages(SubiTestCase):
(before, during) = await m.live_packages()
self.assertEqual(set(["zfsutils-linux"]), before)
self.assertEqual(set(["efibootmgr"]), during)
class TestRecoveryKeyHandler(SubiTestCase):
def test_from_post_data_none(self):
self.assertIsNone(RecoveryKeyHandler.from_post_data(None))
def test_form_post_data_all_set(self):
live_location = "/home/ubuntu/recovery-ubuntu-vg.txt"
backup_location = "/var/log/installer/recovery-ubuntu-vg.txt"
data = RecoveryKey(live_location=live_location, backup_location=backup_location)
expected = RecoveryKeyHandler(
live_location=pathlib.Path(live_location),
backup_location=pathlib.Path(backup_location),
)
self.assertEqual(RecoveryKeyHandler.from_post_data(data), expected)
def test_form_post_data_backup_use_default_suffix(self):
live_location = "/home/ubuntu/recovery-ubuntu-vg.txt"
data = RecoveryKey(live_location=live_location)
expected = RecoveryKeyHandler(
live_location=pathlib.Path(live_location),
backup_location=pathlib.Path("/var/log/installer/recovery-key.txt"),
)
self.assertEqual(RecoveryKeyHandler.from_post_data(data), expected)
def test_form_post_data_backup_override_default_suffix(self):
live_location = "/home/ubuntu/recovery-ubuntu-vg.txt"
data = RecoveryKey(live_location=live_location)
expected = RecoveryKeyHandler(
live_location=pathlib.Path(live_location),
backup_location=pathlib.Path("/var/log/installer/mykey-ubuntu-vg.txt"),
)
self.assertEqual(
RecoveryKeyHandler.from_post_data(
data, default_suffix="mykey-ubuntu-vg.txt"
),
expected,
)

View File

@ -22,7 +22,7 @@ import os
import pathlib
import select
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
import attr
import pyudev
@ -51,6 +51,7 @@ from subiquity.common.types import (
GuidedStorageTargetUseGap,
ModifyPartitionV2,
ProbeStatus,
RecoveryKey,
ReformatDisk,
SizingPolicy,
StorageResponse,
@ -62,7 +63,14 @@ from subiquity.models.filesystem import (
ArbitraryDevice,
)
from subiquity.models.filesystem import Disk as ModelDisk
from subiquity.models.filesystem import MiB, Raid, _Device, align_down, align_up
from subiquity.models.filesystem import (
MiB,
Raid,
RecoveryKeyHandler,
_Device,
align_down,
align_up,
)
from subiquity.server import snapdapi
from subiquity.server.controller import SubiquityController
from subiquity.server.mounter import Mounter
@ -461,6 +469,12 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
spec = dict(name=vg_name, devices=set([part]))
if choice.password is not None:
spec["passphrase"] = choice.password
if choice.recovery_key and not choice.password:
raise Exception("Cannot have a recovery key without encryption")
spec["recovery-key"] = RecoveryKeyHandler.from_post_data(
choice.recovery_key, default_suffix=f"recovery-key-{vg_name}.txt"
)
vg = self.create_volgroup(spec)
if choice.sizing_policy == SizingPolicy.SCALED:
lv_size = sizes.scaled_rootfs_size(vg.size)
@ -479,6 +493,8 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
mount="/",
),
)
self.model.load_or_generate_recovery_keys()
self.model.expose_recovery_keys()
def guided_zfs(self, gap, choice: GuidedChoiceV2):
device = gap.device
@ -710,6 +726,8 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
self.model._actions = self.model._actions_from_config(
config, blockdevs=self.model._probe_data["blockdev"], is_probe_data=False
)
self.model.load_or_generate_recovery_keys()
self.model.expose_recovery_keys()
await self.configured()
def potential_boot_disks(self, check_boot=True, with_reformatting=False):
@ -1216,6 +1234,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
name = layout["name"]
password = None
sizing_policy = None
guided_recovery_key: Union[bool, RecoveryKey] = False
if name == "hybrid":
# this check is conceptually unnecessary but results in a
@ -1269,6 +1288,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
mode = layout.get("mode", "reformat_disk")
self.validate_layout_mode(mode)
password = layout.get("password", None)
recovery_key = layout.get("recovery-key", False)
if name == "lvm":
sizing_policy = SizingPolicy.from_string(
layout.get("sizing-policy", None)
@ -1277,6 +1297,12 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
capability = GuidedCapability.LVM_LUKS
else:
capability = GuidedCapability.LVM
if recovery_key and password is None:
raise Exception(
"recovery_key can only be used if password is specified"
)
guided_recovery_key = RecoveryKey.from_autoinstall(recovery_key)
elif name == "dd":
capability = GuidedCapability.DD
assert mode == "reformat_disk"
@ -1314,6 +1340,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
target=target,
capability=capability,
password=password,
recovery_key=guided_recovery_key,
sizing_policy=sizing_policy,
reset_partition=layout.get("reset-partition", False),
),

View File

@ -700,6 +700,7 @@ class InstallController(SubiquityController):
hostname = f.read().strip()
await self.app.controllers.Ad.join_domain(hostname, context)
self.model.filesystem.copy_artifacts_to_target()
@with_context(description="configuring cloud-init")
async def configure_cloud_init(self, context):