Merge pull request #1766 from ogayot/recovery-key

Add API and TUI support for LUKS recovery key
This commit is contained in:
Dan Bungert 2023-08-30 09:25:50 -06:00 committed by GitHub
commit b880843a48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 407 additions and 17 deletions

View File

@ -70,7 +70,7 @@ parts:
source: https://git.launchpad.net/curtin source: https://git.launchpad.net/curtin
source-type: git source-type: git
source-commit: 7ed8b10e88d1a8a879c6805b487a29477afdafd5 source-commit: 5128d5d8a82b8e47269601570198f1ef9a79a4b1
override-pull: | override-pull: |
craftctl default craftctl default

View File

@ -253,7 +253,7 @@ class FilesystemController(SubiquityTuiController, FilesystemManipulator):
self.app.next_screen(coro) self.app.next_screen(coro)
return return
status = await self.app.wait_with_progress(coro) status = await self.app.wait_with_progress(coro)
self.model = FilesystemModel(status.bootloader) self.model = FilesystemModel(status.bootloader, root="/")
self.model.load_server_data(status) self.model.load_server_data(status)
if self.model.bootloader == Bootloader.PREP: if self.model.bootloader == Bootloader.PREP:
self.supports_resilient_boot = False self.supports_resilient_boot = False

View File

@ -331,6 +331,10 @@ class API:
def GET() -> List[Disk]: def GET() -> List[Disk]:
... ...
class generate_recovery_key:
def GET() -> str:
...
class v2: class v2:
def GET( def GET(
wait: bool = False, wait: bool = False,

View File

@ -139,10 +139,15 @@ class FilesystemManipulator:
def create_volgroup(self, spec): def create_volgroup(self, spec):
devices = set() devices = set()
key = spec.get("passphrase") key = spec.get("passphrase")
for device in spec["devices"]: for device in spec["devices"]:
self.clear(device) self.clear(device)
if key: if key:
device = self.model.add_dm_crypt(device, key) device = self.model.add_dm_crypt(
device,
key,
recovery_key=spec.get("recovery-key"),
)
devices.add(device) devices.add(device)
return self.model.add_volgroup(name=spec["name"], devices=devices) return self.model.add_volgroup(name=spec["name"], devices=devices)
@ -334,7 +339,11 @@ class FilesystemManipulator:
for d in spec["devices"]: for d in spec["devices"]:
self.clear(d) self.clear(d)
if key: if key:
d = self.model.add_dm_crypt(d, key) d = self.model.add_dm_crypt(
d,
key,
recovery_key=spec.get("recovery-key"),
)
d._constructed_device = existing d._constructed_device = existing
devices.add(d) devices.add(d)
existing.name = spec["name"] existing.name = spec["name"]

View File

@ -88,7 +88,7 @@ class TestAnnotations(unittest.TestCase):
def test_vg_encrypted_annotations(self): def test_vg_encrypted_annotations(self):
model, disk = make_model_and_disk() model, disk = make_model_and_disk()
dm_crypt = model.add_dm_crypt(disk, key="passw0rd") dm_crypt = model.add_dm_crypt(disk, key="passw0rd", recovery_key=None)
vg = model.add_volgroup("vg-0", {dm_crypt}) vg = model.add_volgroup("vg-0", {dm_crypt})
self.assertEqual(annotations(vg), ["new", "encrypted"]) self.assertEqual(annotations(vg), ["new", "encrypted"])

View File

@ -493,11 +493,40 @@ GuidedStorageTarget = Union[
] ]
@attr.s(auto_attribs=True)
class RecoveryKey:
# Where to store the key in the live system.
live_location: Optional[str] = None
# Where to copy the key in the target system. /target will automatically be
# prefixed.
backup_location: Optional[str] = None
@classmethod
def from_autoinstall(
cls, config: Union[bool, Dict[str, Any]]
) -> Optional["RecoveryKey"]:
if config is False:
return None
# Recovery key with default values
if config is True:
return cls()
return cls(
backup_location=config.get("backup-location"),
live_location=config.get("live-location"),
)
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
class GuidedChoiceV2: class GuidedChoiceV2:
target: GuidedStorageTarget target: GuidedStorageTarget
capability: GuidedCapability capability: GuidedCapability
# Those two fields are only used when using LVM+LUKS
password: Optional[str] = attr.ib(default=None, repr=False) password: Optional[str] = attr.ib(default=None, repr=False)
recovery_key: Optional[RecoveryKey] = None
sizing_policy: Optional[SizingPolicy] = SizingPolicy.SCALED sizing_policy: Optional[SizingPolicy] = SizingPolicy.SCALED
reset_partition: bool = False reset_partition: bool = False

View File

@ -22,9 +22,10 @@ import math
import os import os
import pathlib import pathlib
import platform import platform
import secrets
import tempfile import tempfile
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Optional, Set, Tuple, Union from typing import Dict, List, Optional, Set, Tuple, Union
import attr import attr
import more_itertools import more_itertools
@ -34,7 +35,7 @@ from curtin.swap import can_use_swapfile
from curtin.util import human2bytes from curtin.util import human2bytes
from probert.storage import StorageInfo from probert.storage import StorageInfo
from subiquity.common.types import Bootloader, OsProber from subiquity.common.types import Bootloader, OsProber, RecoveryKey
log = logging.getLogger("subiquity.models.filesystem") log = logging.getLogger("subiquity.models.filesystem")
@ -47,6 +48,86 @@ class NotFinalPartitionError(Exception):
the last one.""" the last one."""
@attr.s(auto_attribs=True)
class RecoveryKeyHandler:
# Where to store the key on the live system
live_location: Optional[pathlib.Path]
# Where to store the key in the target system. /target will automatically
# be prefixed.
backup_location: pathlib.Path
_key: Optional[str] = attr.ib(repr=False, default=None)
@classmethod
def from_post_data(
cls, data: Optional[RecoveryKey], default_suffix="recovery-key.txt"
) -> Optional["RecoveryKeyHandler"]:
"""Create RecoveryKeyHandler instance from POST-ed RecoveryKey data."""
if data is None:
return None
# Set default values for unspecified settings.
live_location = pathlib.Path("~").expanduser() / default_suffix
backup_location = pathlib.Path("/var/log/installer") / default_suffix
if data.live_location is not None:
live_location = pathlib.Path(data.live_location)
if data.backup_location is not None:
backup_location = pathlib.Path(data.backup_location)
return cls(live_location=live_location, backup_location=backup_location)
def load_key_from_file(self, location: pathlib.Path) -> None:
"""Load the key from the file specified"""
with location.open(mode="r", encoding="utf-8") as fh:
self._key = fh.read().strip()
def generate(self):
"""Generate a key and store internally"""
self._key = FilesystemModel.generate_recovery_key()
def _expose_key(
self,
location: pathlib.Path,
root: pathlib.Path,
parents_perm: int,
key_perm: int,
) -> None:
full_location = root / location.relative_to(location.root)
if not full_location.resolve().is_relative_to(root):
raise RuntimeError(
"Trying to copy recovery key outside of" " designated root directory"
)
full_location.parent.mkdir(mode=parents_perm, parents=True, exist_ok=True)
with full_location.open(mode="w", encoding="utf-8") as fh:
fh.write(self._key)
full_location.chmod(key_perm)
def expose_key_to_live_system(self, root: Optional[pathlib.Path] = None) -> None:
"""Write the key to the live system - so it can be retrieved by the
user of the installer."""
if root is None:
root = pathlib.Path("/")
self._expose_key(
location=self.live_location, root=root, parents_perm=0o755, key_perm=0o644
)
def copy_key_to_target_system(self, target: pathlib.Path) -> None:
"""Write the key to the target system - so it can be retrieved after
the install by an admin."""
self._expose_key(
location=self.backup_location,
root=target,
parents_perm=0o700,
key_perm=0o600,
)
def _set_backlinks(obj): def _set_backlinks(obj):
if obj.id is None: if obj.id is None:
base = obj.type base = obj.type
@ -991,8 +1072,25 @@ class DM_Crypt:
volume: _Formattable = attributes.ref(backlink="_constructed_device") volume: _Formattable = attributes.ref(backlink="_constructed_device")
key: Optional[str] = attr.ib(metadata={"redact": True}, default=None) key: Optional[str] = attr.ib(metadata={"redact": True}, default=None)
keyfile: Optional[str] = None keyfile: Optional[str] = None
recovery_key: Optional[RecoveryKeyHandler] = None
_recovery_keyfile: Optional[str] = None
_recovery_live_location: Optional[str] = None
_recovery_backup_location: Optional[str] = None
path: Optional[str] = None path: Optional[str] = None
def __post_init__(self) -> None:
# When the object is created using _actions_from_config, we should
# build the recovery_key object.
if self._recovery_keyfile is None or self.recovery_key is not None:
return
props: Dict[str, pathlib.Path] = {}
if self._recovery_live_location:
props["live_location"] = pathlib.Path(self._recovery_live_location)
if self._recovery_backup_location:
props["backup_location"] = pathlib.Path(self._recovery_backup_location)
self.recovery_key = RecoveryKeyHandler(**props)
def serialize_key(self): def serialize_key(self):
if self.key and not self.keyfile: if self.key and not self.keyfile:
f = tempfile.NamedTemporaryFile(prefix="luks-key-", mode="w", delete=False) f = tempfile.NamedTemporaryFile(prefix="luks-key-", mode="w", delete=False)
@ -1002,6 +1100,36 @@ class DM_Crypt:
else: else:
return {} return {}
def serialize_recovery_key(self) -> str:
if self.recovery_key is None:
return {"recovery_keyfile": None}
# A bit of a hack to make sure the recovery key gets created when
# converting the DM_Crypt object to a dict.
if self._recovery_keyfile is None:
self.assign_recovery_key()
props = {"recovery_keyfile": self._recovery_keyfile}
if self.recovery_key.live_location is not None:
props["recovery_live_location"] = str(self.recovery_key.live_location)
if self.recovery_key.backup_location is not None:
props["recovery_backup_location"] = str(self.recovery_key.backup_location)
return props
def assign_recovery_key(self):
"""Create the recovery key and temporary store it in a keyfile."""
f = tempfile.NamedTemporaryFile(
prefix="luks-recovery-key-", mode="w", delete=False
)
if self.recovery_key._key is None:
self.recovery_key.generate()
f.write(self.recovery_key._key)
f.close()
self._recovery_keyfile = f.name
dm_name: Optional[str] = None dm_name: Optional[str] = None
preserve: bool = False preserve: bool = False
@ -1219,7 +1347,7 @@ class ActionRenderMode(enum.Enum):
FORMAT_MOUNT = enum.auto() FORMAT_MOUNT = enum.auto()
class FilesystemModel(object): class FilesystemModel:
target = None target = None
_partition_alignment_data = { _partition_alignment_data = {
@ -1268,10 +1396,11 @@ class FilesystemModel(object):
else: else:
return Bootloader.BIOS return Bootloader.BIOS
def __init__(self, bootloader=None): def __init__(self, bootloader=None, *, root: str):
if bootloader is None: if bootloader is None:
bootloader = self._probe_bootloader() bootloader = self._probe_bootloader()
self.bootloader = bootloader self.bootloader = bootloader
self.root = root
self.storage_version = 1 self.storage_version = 1
self._probe_data = None self._probe_data = None
self.dd_target: Optional[Disk] = None self.dd_target: Optional[Disk] = None
@ -1294,7 +1423,7 @@ class FilesystemModel(object):
# the original state. _orig_config plays a similar role, but is # the original state. _orig_config plays a similar role, but is
# expressed in terms of curtin actions, which are not what we want to # expressed in terms of curtin actions, which are not what we want to
# use on the V2 storage API. # use on the V2 storage API.
orig_model = FilesystemModel(self.bootloader) orig_model = FilesystemModel(self.bootloader, root=self.root)
orig_model.target = self.target orig_model.target = self.target
orig_model.load_probe_data(self._probe_data) orig_model.load_probe_data(self._probe_data)
return orig_model return orig_model
@ -1835,6 +1964,9 @@ class FilesystemModel(object):
def all_volgroups(self): def all_volgroups(self):
return self._all(type="lvm_volgroup") return self._all(type="lvm_volgroup")
def all_dm_crypts(self):
return self._all(type="dm_crypt")
def partition_by_partuuid(self, partuuid: str) -> Optional[Partition]: def partition_by_partuuid(self, partuuid: str) -> Optional[Partition]:
return self._one(type="partition", uuid=partuuid) return self._one(type="partition", uuid=partuuid)
@ -1935,10 +2067,23 @@ class FilesystemModel(object):
raise Exception("can only remove empty LV") raise Exception("can only remove empty LV")
self._remove(lv) self._remove(lv)
def add_dm_crypt(self, volume, key): def add_dm_crypt(
self,
volume,
key,
*,
recovery_key: Optional[RecoveryKeyHandler],
root: Optional[pathlib.Path] = None,
):
if not volume.available: if not volume.available:
raise Exception("{} is not available".format(volume)) raise Exception("{} is not available".format(volume))
dm_crypt = DM_Crypt(m=self, volume=volume, key=key)
dm_crypt = DM_Crypt(
m=self,
volume=volume,
key=key,
recovery_key=recovery_key,
)
self._actions.append(dm_crypt) self._actions.append(dm_crypt)
return dm_crypt return dm_crypt
@ -2047,3 +2192,44 @@ class FilesystemModel(object):
if self.reset_partition is not None: if self.reset_partition is not None:
during.add("efibootmgr") during.add("efibootmgr")
return (before, during) return (before, during)
@staticmethod
def generate_recovery_key() -> str:
"""Return a new recovery key suitable for LUKS encryption. The key will
consist of 48 decimal digits."""
digits = 48
return str(secrets.randbelow(10**digits)).zfill(digits)
def load_or_generate_recovery_keys(self) -> None:
for dm_crypt in self.all_dm_crypts():
if dm_crypt.recovery_key is None:
continue
if dm_crypt.recovery_key._key is not None:
continue
if dm_crypt._recovery_keyfile is not None:
dm_crypt.recovery_key.load_key_from_file(
pathlib.Path(dm_crypt._recovery_keyfile)
)
else:
dm_crypt.recovery_key.generate()
def expose_recovery_keys(self) -> None:
for dm_crypt in self.all_dm_crypts():
if dm_crypt.recovery_key is None:
continue
handler = dm_crypt.recovery_key
if handler.live_location is None:
continue
handler.expose_key_to_live_system(root=self.root)
def copy_artifacts_to_target(self) -> None:
for dm_crypt in self.all_dm_crypts():
if dm_crypt.recovery_key is None:
continue
log.debug(
"Copying recovery key for %s to target: %s", dm_crypt, self.target
)
dm_crypt.recovery_key.copy_key_to_target_system(target=self.target)

View File

@ -180,7 +180,7 @@ class SubiquityModel:
self.codecs = CodecsModel() self.codecs = CodecsModel()
self.debconf_selections = DebconfSelectionsModel() self.debconf_selections = DebconfSelectionsModel()
self.drivers = DriversModel() self.drivers = DriversModel()
self.filesystem = FilesystemModel() self.filesystem = FilesystemModel(root=root)
self.identity = IdentityModel() self.identity = IdentityModel()
self.integrity = IntegrityModel() self.integrity = IntegrityModel()
self.kernel = KernelModel() self.kernel = KernelModel()

View File

@ -13,6 +13,7 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import pathlib
import unittest import unittest
from unittest import mock from unittest import mock
@ -20,6 +21,7 @@ import attr
import yaml import yaml
from subiquity.common.filesystem import gaps from subiquity.common.filesystem import gaps
from subiquity.common.types import RecoveryKey
from subiquity.models.filesystem import ( from subiquity.models.filesystem import (
LVM_CHUNK_SIZE, LVM_CHUNK_SIZE,
ZFS, ZFS,
@ -30,6 +32,7 @@ from subiquity.models.filesystem import (
FilesystemModel, FilesystemModel,
NotFinalPartitionError, NotFinalPartitionError,
Partition, Partition,
RecoveryKeyHandler,
ZPool, ZPool,
align_down, align_down,
dehumanize_size, dehumanize_size,
@ -135,7 +138,7 @@ class FakeStorageInfo:
def make_model(bootloader=None, storage_version=None): def make_model(bootloader=None, storage_version=None):
model = FilesystemModel() model = FilesystemModel(root="/tmp")
if bootloader is not None: if bootloader is not None:
model.bootloader = bootloader model.bootloader = bootloader
if storage_version is not None: if storage_version is not None:
@ -1558,3 +1561,50 @@ class TestLivePackages(SubiTestCase):
(before, during) = await m.live_packages() (before, during) = await m.live_packages()
self.assertEqual(set(["zfsutils-linux"]), before) self.assertEqual(set(["zfsutils-linux"]), before)
self.assertEqual(set(["efibootmgr"]), during) self.assertEqual(set(["efibootmgr"]), during)
class TestRecoveryKeyHandler(SubiTestCase):
def test_from_post_data_none(self):
self.assertIsNone(RecoveryKeyHandler.from_post_data(None))
def test_form_post_data_all_set(self):
live_location = "/home/ubuntu/recovery-ubuntu-vg.txt"
backup_location = "/var/log/installer/recovery-ubuntu-vg.txt"
data = RecoveryKey(live_location=live_location, backup_location=backup_location)
expected = RecoveryKeyHandler(
live_location=pathlib.Path(live_location),
backup_location=pathlib.Path(backup_location),
)
self.assertEqual(RecoveryKeyHandler.from_post_data(data), expected)
def test_form_post_data_backup_use_default_suffix(self):
live_location = "/home/ubuntu/recovery-ubuntu-vg.txt"
data = RecoveryKey(live_location=live_location)
expected = RecoveryKeyHandler(
live_location=pathlib.Path(live_location),
backup_location=pathlib.Path("/var/log/installer/recovery-key.txt"),
)
self.assertEqual(RecoveryKeyHandler.from_post_data(data), expected)
def test_form_post_data_backup_override_default_suffix(self):
live_location = "/home/ubuntu/recovery-ubuntu-vg.txt"
data = RecoveryKey(live_location=live_location)
expected = RecoveryKeyHandler(
live_location=pathlib.Path(live_location),
backup_location=pathlib.Path("/var/log/installer/mykey-ubuntu-vg.txt"),
)
self.assertEqual(
RecoveryKeyHandler.from_post_data(
data, default_suffix="mykey-ubuntu-vg.txt"
),
expected,
)

View File

@ -22,7 +22,7 @@ import os
import pathlib import pathlib
import select import select
import time import time
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional, Union
import attr import attr
import pyudev import pyudev
@ -51,6 +51,7 @@ from subiquity.common.types import (
GuidedStorageTargetUseGap, GuidedStorageTargetUseGap,
ModifyPartitionV2, ModifyPartitionV2,
ProbeStatus, ProbeStatus,
RecoveryKey,
ReformatDisk, ReformatDisk,
SizingPolicy, SizingPolicy,
StorageResponse, StorageResponse,
@ -62,7 +63,14 @@ from subiquity.models.filesystem import (
ArbitraryDevice, ArbitraryDevice,
) )
from subiquity.models.filesystem import Disk as ModelDisk from subiquity.models.filesystem import Disk as ModelDisk
from subiquity.models.filesystem import MiB, Raid, _Device, align_down, align_up from subiquity.models.filesystem import (
MiB,
Raid,
RecoveryKeyHandler,
_Device,
align_down,
align_up,
)
from subiquity.server import snapdapi from subiquity.server import snapdapi
from subiquity.server.controller import SubiquityController from subiquity.server.controller import SubiquityController
from subiquity.server.mounter import Mounter from subiquity.server.mounter import Mounter
@ -461,6 +469,12 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
spec = dict(name=vg_name, devices=set([part])) spec = dict(name=vg_name, devices=set([part]))
if choice.password is not None: if choice.password is not None:
spec["passphrase"] = choice.password spec["passphrase"] = choice.password
if choice.recovery_key and not choice.password:
raise Exception("Cannot have a recovery key without encryption")
spec["recovery-key"] = RecoveryKeyHandler.from_post_data(
choice.recovery_key, default_suffix=f"recovery-key-{vg_name}.txt"
)
vg = self.create_volgroup(spec) vg = self.create_volgroup(spec)
if choice.sizing_policy == SizingPolicy.SCALED: if choice.sizing_policy == SizingPolicy.SCALED:
lv_size = sizes.scaled_rootfs_size(vg.size) lv_size = sizes.scaled_rootfs_size(vg.size)
@ -479,6 +493,8 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
mount="/", mount="/",
), ),
) )
self.model.load_or_generate_recovery_keys()
self.model.expose_recovery_keys()
def guided_zfs(self, gap, choice: GuidedChoiceV2): def guided_zfs(self, gap, choice: GuidedChoiceV2):
device = gap.device device = gap.device
@ -710,6 +726,8 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
self.model._actions = self.model._actions_from_config( self.model._actions = self.model._actions_from_config(
config, blockdevs=self.model._probe_data["blockdev"], is_probe_data=False config, blockdevs=self.model._probe_data["blockdev"], is_probe_data=False
) )
self.model.load_or_generate_recovery_keys()
self.model.expose_recovery_keys()
await self.configured() await self.configured()
def potential_boot_disks(self, check_boot=True, with_reformatting=False): def potential_boot_disks(self, check_boot=True, with_reformatting=False):
@ -932,6 +950,9 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
install_minimum_size=minsize, install_minimum_size=minsize,
) )
async def generate_recovery_key_GET(self) -> str:
return self.model.generate_recovery_key()
async def v2_GET( async def v2_GET(
self, self,
wait: bool = False, wait: bool = False,
@ -1213,6 +1234,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
name = layout["name"] name = layout["name"]
password = None password = None
sizing_policy = None sizing_policy = None
guided_recovery_key: Union[bool, RecoveryKey] = False
if name == "hybrid": if name == "hybrid":
# this check is conceptually unnecessary but results in a # this check is conceptually unnecessary but results in a
@ -1266,6 +1288,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
mode = layout.get("mode", "reformat_disk") mode = layout.get("mode", "reformat_disk")
self.validate_layout_mode(mode) self.validate_layout_mode(mode)
password = layout.get("password", None) password = layout.get("password", None)
recovery_key = layout.get("recovery-key", False)
if name == "lvm": if name == "lvm":
sizing_policy = SizingPolicy.from_string( sizing_policy = SizingPolicy.from_string(
layout.get("sizing-policy", None) layout.get("sizing-policy", None)
@ -1274,6 +1297,12 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
capability = GuidedCapability.LVM_LUKS capability = GuidedCapability.LVM_LUKS
else: else:
capability = GuidedCapability.LVM capability = GuidedCapability.LVM
if recovery_key and password is None:
raise Exception(
"recovery_key can only be used if password is specified"
)
guided_recovery_key = RecoveryKey.from_autoinstall(recovery_key)
elif name == "dd": elif name == "dd":
capability = GuidedCapability.DD capability = GuidedCapability.DD
assert mode == "reformat_disk" assert mode == "reformat_disk"
@ -1311,6 +1340,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
target=target, target=target,
capability=capability, capability=capability,
password=password, password=password,
recovery_key=guided_recovery_key,
sizing_policy=sizing_policy, sizing_policy=sizing_policy,
reset_partition=layout.get("reset-partition", False), reset_partition=layout.get("reset-partition", False),
), ),

View File

@ -700,6 +700,7 @@ class InstallController(SubiquityController):
hostname = f.read().strip() hostname = f.read().strip()
await self.app.controllers.Ad.join_domain(hostname, context) await self.app.controllers.Ad.join_domain(hostname, context)
self.model.filesystem.copy_artifacts_to_target()
@with_context(description="configuring cloud-init") @with_context(description="configuring cloud-init")
async def configure_cloud_init(self, context): async def configure_cloud_init(self, context):

View File

@ -14,6 +14,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging import logging
import pathlib
from typing import Optional
import attr import attr
from urwid import Text, connect_signal from urwid import Text, connect_signal
@ -26,6 +28,7 @@ from subiquity.common.types import (
GuidedStorageTargetManual, GuidedStorageTargetManual,
GuidedStorageTargetReformat, GuidedStorageTargetReformat,
Partition, Partition,
RecoveryKey,
) )
from subiquity.models.filesystem import humanize_size from subiquity.models.filesystem import humanize_size
from subiquitycore.ui.buttons import other_btn from subiquitycore.ui.buttons import other_btn
@ -53,6 +56,15 @@ subtitle = _("Configure a guided storage layout, or create a custom one:")
class LUKSOptionsForm(SubForm): class LUKSOptionsForm(SubForm):
passphrase = PasswordField(_("Passphrase:")) passphrase = PasswordField(_("Passphrase:"))
confirm_passphrase = PasswordField(_("Confirm passphrase:")) confirm_passphrase = PasswordField(_("Confirm passphrase:"))
recovery_key = BooleanField(
("Also create a recovery key"),
help=_(
"The key will be stored as"
" ~/recovery-key.txt in the live system and will"
" be copied to /var/log/installer/ in the target"
" system."
),
)
def validate_passphrase(self): def validate_passphrase(self):
if len(self.passphrase.value) < 1: if len(self.passphrase.value) < 1:
@ -368,6 +380,7 @@ class GuidedDiskSelectionView(BaseView):
target = guided_choice["disk"] target = guided_choice["disk"]
tpm_choice = self.form.guided_choice.widget.form.tpm_choice tpm_choice = self.form.guided_choice.widget.form.tpm_choice
password = None password = None
recovery_key: Optional[RecoveryKey] = None
if tpm_choice is not None: if tpm_choice is not None:
if guided_choice.get("use_tpm", tpm_choice.default): if guided_choice.get("use_tpm", tpm_choice.default):
capability = GuidedCapability.CORE_BOOT_ENCRYPTED capability = GuidedCapability.CORE_BOOT_ENCRYPTED
@ -378,6 +391,16 @@ class GuidedDiskSelectionView(BaseView):
if opts.get("encrypt", False): if opts.get("encrypt", False):
capability = GuidedCapability.LVM_LUKS capability = GuidedCapability.LVM_LUKS
password = opts["luks_options"]["passphrase"] password = opts["luks_options"]["passphrase"]
if opts["luks_options"]["recovery_key"]:
# There is only one encrypted LUKS (at max) in guided
# so no need to prefix the locations with the name of
# the VG.
recovery_key = RecoveryKey(
live_location=str(
pathlib.Path("~/recovery-key.txt").expanduser()
),
backup_location="var/log/installer/recovery-key.txt",
)
else: else:
capability = GuidedCapability.LVM capability = GuidedCapability.LVM
else: else:
@ -389,6 +412,7 @@ class GuidedDiskSelectionView(BaseView):
target=target, target=target,
capability=capability, capability=capability,
password=password, password=password,
recovery_key=recovery_key,
) )
else: else:
choice = GuidedChoiceV2( choice = GuidedChoiceV2(

View File

@ -15,11 +15,12 @@
import logging import logging
import os import os
import pathlib
import re import re
from urwid import Text, connect_signal from urwid import Text, connect_signal
from subiquity.models.filesystem import get_lvm_size, humanize_size from subiquity.models.filesystem import RecoveryKeyHandler, get_lvm_size, humanize_size
from subiquity.ui.views.filesystem.compound import ( from subiquity.ui.views.filesystem.compound import (
CompoundDiskForm, CompoundDiskForm,
MultiDeviceField, MultiDeviceField,
@ -78,10 +79,22 @@ class VolGroupForm(CompoundDiskForm):
encrypt = BooleanField(_("Create encrypted volume")) encrypt = BooleanField(_("Create encrypted volume"))
passphrase = PasswordField(_("Passphrase:")) passphrase = PasswordField(_("Passphrase:"))
confirm_passphrase = PasswordField(_("Confirm passphrase:")) confirm_passphrase = PasswordField(_("Confirm passphrase:"))
# TODO replace the placeholders in the help - also potentially replacing
# "~" with the actual home directory.
create_recovery_key = BooleanField(
_("Also create a recovery key:"),
help=_(
"The key will be stored as"
" ~/recovery-key-{name}.txt in the live system and will"
" be copied to /var/log/installer/ in the target"
" system."
),
)
def _change_encrypt(self, sender, new_value): def _change_encrypt(self, sender, new_value):
self.passphrase.enabled = new_value self.passphrase.enabled = new_value
self.confirm_passphrase.enabled = new_value self.confirm_passphrase.enabled = new_value
self.create_recovery_key.enabled = new_value
if not new_value: if not new_value:
self.passphrase.validate() self.passphrase.validate()
self.confirm_passphrase.validate() self.confirm_passphrase.validate()
@ -147,6 +160,7 @@ class VolGroupStretchy(Stretchy):
devices = {} devices = {}
key = "" key = ""
encrypt = False encrypt = False
create_recovery_key = False
for d in existing.devices: for d in existing.devices:
if d.type == "dm_crypt": if d.type == "dm_crypt":
encrypt = True encrypt = True
@ -161,6 +175,7 @@ class VolGroupStretchy(Stretchy):
# TODO make this more user friendly. # TODO make this more user friendly.
if d.key is not None: if d.key is not None:
key = d.key key = d.key
create_recovery_key = d.recovery_key is not None
d = d.volume d = d.volume
devices[d] = "active" devices[d] = "active"
initial = { initial = {
@ -169,6 +184,7 @@ class VolGroupStretchy(Stretchy):
"encrypt": encrypt, "encrypt": encrypt,
"passphrase": key, "passphrase": key,
"confirm_passphrase": key, "confirm_passphrase": key,
"create_recovery_key": create_recovery_key,
} }
possible_components = get_possible_components( possible_components = get_possible_components(
@ -205,6 +221,15 @@ class VolGroupStretchy(Stretchy):
del result["size"] del result["size"]
mdc = self.form.devices.widget mdc = self.form.devices.widget
result["devices"] = mdc.active_devices result["devices"] = mdc.active_devices
if "create_recovery_key" in result:
if result["create_recovery_key"]:
backup_prefix = pathlib.Path("/var/log/installer")
filename = pathlib.Path(f"recovery-key-{result['name']}.txt")
result["recovery-key"] = RecoveryKeyHandler(
live_location=pathlib.Path("~").expanduser() / filename,
backup_location=backup_prefix / filename,
)
del result["create_recovery_key"]
if "confirm_passphrase" in result: if "confirm_passphrase" in result:
del result["confirm_passphrase"] del result["confirm_passphrase"]
safe_result = result.copy() safe_result = result.copy()

View File

@ -13,12 +13,15 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import pathlib
import unittest import unittest
from unittest import mock from unittest import mock
import urwid import urwid
from subiquity.client.controllers.filesystem import FilesystemController from subiquity.client.controllers.filesystem import FilesystemController
from subiquity.models.filesystem import RecoveryKeyHandler
from subiquity.ui.views.filesystem.lvm import VolGroupStretchy from subiquity.ui.views.filesystem.lvm import VolGroupStretchy
from subiquity.ui.views.filesystem.tests.test_partition import make_model_and_disk from subiquity.ui.views.filesystem.tests.test_partition import make_model_and_disk
from subiquitycore.testing import view_helpers from subiquitycore.testing import view_helpers
@ -66,6 +69,7 @@ class LVMViewTests(unittest.TestCase):
"encrypt": True, "encrypt": True,
"passphrase": "passw0rd", "passphrase": "passw0rd",
"confirm_passphrase": "passw0rd", "confirm_passphrase": "passw0rd",
"create_recovery_key": False,
} }
expected_data = { expected_data = {
"name": "vg1", "name": "vg1",
@ -76,3 +80,31 @@ class LVMViewTests(unittest.TestCase):
view_helpers.enter_data(stretchy.form, form_data) view_helpers.enter_data(stretchy.form, form_data)
view_helpers.click(stretchy.form.done_btn.base_widget) view_helpers.click(stretchy.form.done_btn.base_widget)
view.controller.volgroup_handler.assert_called_once_with(None, expected_data) view.controller.volgroup_handler.assert_called_once_with(None, expected_data)
def test_create_vg_encrypted_with_recovery(self):
model, disk = make_model_and_disk()
part1 = model.add_partition(disk, size=10 * (2**30), offset=0)
part2 = model.add_partition(disk, size=10 * (2**30), offset=10 * (2**30))
view, stretchy = make_view(model)
form_data = {
"name": "vg1",
"devices": {part1: "active", part2: "active"},
"encrypt": True,
"passphrase": "passw0rd",
"confirm_passphrase": "passw0rd",
"create_recovery_key": True,
}
expected_data = {
"name": "vg1",
"devices": {part1, part2},
"encrypt": True,
"passphrase": "passw0rd",
"recovery-key": RecoveryKeyHandler(
live_location=pathlib.Path("/home/ubuntu/recovery-key-vg1.txt"),
backup_location=pathlib.Path("/var/log/installer/recovery-key-vg1.txt"),
),
}
view_helpers.enter_data(stretchy.form, form_data)
with mock.patch.dict(os.environ, {"HOME": "/home/ubuntu"}):
view_helpers.click(stretchy.form.done_btn.base_widget)
view.controller.volgroup_handler.assert_called_once_with(None, expected_data)