Merge pull request #1485 from mwhudson/tpm-reuse-part

reuse an existing partition if possible in apply_system
This commit is contained in:
Dan Bungert 2022-11-17 10:00:45 -07:00 committed by GitHub
commit fa0aa32117
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 120 additions and 71 deletions

View File

@ -452,43 +452,61 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
for structure in volume.structure:
if structure.role == snapdapi.Role.MBR:
continue
if ',' not in structure.type:
continue
if structure.offset is not None:
offset = structure.offset
yield (structure, offset, structure.size)
offset = offset + structure.size
def _add_structure(self, *, disk: Disk, offset: int, size: int,
is_last: bool, structure: snapdapi.VolumeStructure):
if structure.role == snapdapi.Role.SYSTEM_DATA and is_last:
size = gaps.largest_gap(disk).size
flag = ptable_uuid_to_flag_entry(structure.gpt_part_uuid())[0]
part = self.model.add_partition(
disk, offset=offset, size=size, flag=flag,
partition_name=structure.name)
def apply_system(self, disk_id):
disk = self.model._one(id=disk_id)
[volume] = self._system.volumes.values()
preserved_parts = set()
if volume.schema != disk.ptable:
self.reformat(disk)
disk.ptable = volume.schema
parts_by_offset_size = {}
else:
parts_by_offset_size = {
(part.offset, part.size): part for part in disk.partitions()
}
for _struct, offset, size in self._offsets_and_sizes_for_system():
if (offset, size) in parts_by_offset_size:
preserved_parts.add(parts_by_offset_size[(offset, size)])
for part in disk.partitions():
if part not in preserved_parts:
self.delete_partition(part)
del parts_by_offset_size[(part.offset, part.size)]
for structure, offset, size in self._offsets_and_sizes_for_system():
if (offset, size) in parts_by_offset_size:
part = parts_by_offset_size[(offset, size)]
else:
if structure.role == snapdapi.Role.SYSTEM_DATA and \
structure == volume.structure[-1]:
gap = gaps.largest_gap(disk)
size = gap.size - (offset - gap.offset)
part = self.model.add_partition(disk, offset=offset, size=size)
part.flag = ptable_uuid_to_flag_entry(structure.gpt_part_uuid())[0]
if structure.name:
part.partition_name = structure.name
if structure.filesystem:
part.wipe = 'superblock'
self.delete_filesystem(part.fs())
fs = self.model.add_filesystem(
part, structure.filesystem, label=structure.label)
if structure.role == snapdapi.Role.SYSTEM_DATA:
self.model.add_mount(fs, '/')
elif structure.role == snapdapi.Role.SYSTEM_BOOT:
self.model.add_mount(fs, '/boot')
elif flag == 'boot':
elif part.flag == 'boot':
self.model.add_mount(fs, '/boot/efi')
self._role_to_device[structure.role] = part
def apply_system(self, disk_id):
disk = self.model._one(id=disk_id)
self.reformat(disk)
[volume] = self._system.volumes.values()
for structure, offset, size in self._offsets_and_sizes_for_system():
self._add_structure(
disk=disk, offset=offset, size=size, structure=structure,
is_last=structure == volume.structure[-1])
disk._partitions.sort(key=lambda p: p.number)
def _on_volumes(self) -> Dict[str, snapdapi.OnVolume]:
@ -525,7 +543,6 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
for fs in self.model._all(type='format'):
if fs.volume == part:
fs.volume = arb_device
self._role_to_device[role] = arb_device
@with_context(description="making system bootable")
async def finish_install(self, context):

View File

@ -476,45 +476,87 @@ class TestCoreBootInstallMethods(IsolatedAsyncioTestCase):
self.fsc._configured = True
self.fsc.model = make_model(Bootloader.UEFI)
def test_add_structure(self):
def _details_for_structures(self, structures):
return snapdapi.SystemDetails(
volumes={'pc': snapdapi.Volume(schema='gpt', structure=structures)}
)
def test_apply_system(self):
disk = make_disk(self.fsc.model)
self.fsc._add_structure(
disk=disk,
offset=100,
size=2 << 20,
is_last=False,
structure=snapdapi.VolumeStructure(
name='ptname',
self.fsc._system = self._details_for_structures([
snapdapi.VolumeStructure(
type="83,0FC63DAF-8483-4772-8E79-3D69D8477DE4",
label='label',
size=1 << 20,
role=snapdapi.Role.SYSTEM_DATA,
filesystem='ext4'))
offset=1 << 20,
size=1 << 30,
filesystem='ext4'),
])
self.fsc.apply_system(disk.id)
[part] = disk.partitions()
self.assertEqual(part.offset, 100)
self.assertEqual(part.offset, 1 << 20)
self.assertEqual(part.size, 1 << 30)
self.assertEqual(part.fs().fstype, 'ext4')
def test_apply_system_reuse(self):
disk = make_disk(self.fsc.model)
# Add a partition that matches one in the volume structure
reused_part = make_partition(
self.fsc.model, disk, offset=1 << 20, size=1 << 30, preserve=True)
self.fsc.model.add_filesystem(reused_part, 'ext4')
# And one that does not.
make_partition(
self.fsc.model, disk, offset=2 << 30, size=1 << 30, preserve=True)
self.fsc._system = self._details_for_structures([
snapdapi.VolumeStructure(
type="0FC63DAF-8483-4772-8E79-3D69D8477DE4",
offset=1 << 20,
size=1 << 30,
filesystem='ext4'),
])
self.fsc.apply_system(disk.id)
[part] = disk.partitions()
self.assertEqual(reused_part, part)
self.assertEqual(reused_part.wipe, 'superblock')
self.assertEqual(part.fs().fstype, 'ext4')
def test_apply_system_reuse_no_format(self):
disk = make_disk(self.fsc.model)
existing_part = make_partition(
self.fsc.model, disk, offset=1 << 20, size=1 << 30, preserve=True)
self.fsc._system = self._details_for_structures([
snapdapi.VolumeStructure(
type="0FC63DAF-8483-4772-8E79-3D69D8477DE4",
offset=1 << 20,
size=1 << 30,
filesystem=None),
])
self.fsc.apply_system(disk.id)
[part] = disk.partitions()
self.assertEqual(existing_part, part)
self.assertEqual(existing_part.wipe, None)
def test_apply_system_system_data(self):
disk = make_disk(self.fsc.model)
self.fsc._system = self._details_for_structures([
snapdapi.VolumeStructure(
type="0FC63DAF-8483-4772-8E79-3D69D8477DE4",
offset=2 << 20,
name='ptname',
size=2 << 30,
role=snapdapi.Role.SYSTEM_DATA,
filesystem='ext4'),
])
self.fsc.apply_system(disk.id)
[part] = disk.partitions()
self.assertEqual(part.offset, 2 << 20)
self.assertEqual(part.partition_name, 'ptname')
self.assertEqual(part.flag, 'linux')
self.assertEqual(part.size, 2 << 20)
self.assertEqual(
part.size,
disk.size - (2 << 20) - disk.alignment_data().min_end_offset)
self.assertEqual(part.fs().fstype, 'ext4')
self.assertEqual(part.fs().mount().path, '/')
self.assertEqual(part.wipe, 'superblock')
def test_add_structure_no_fs(self):
disk = make_disk(self.fsc.model)
self.fsc._add_structure(
disk=disk,
offset=100,
size=2 << 20,
is_last=False,
structure=snapdapi.VolumeStructure(
type="83,0FC63DAF-8483-4772-8E79-3D69D8477DE4",
size=1 << 20,
filesystem=None))
[part] = disk.partitions()
self.assertEqual(part.size, 2 << 20)
self.assertEqual(part.fs(), None)
self.assertEqual(part.wipe, None)
async def test_from_sample_data(self):
# calling this a unit test is definitely questionable. but it
# runs much more quickly than the integration test!
@ -565,16 +607,3 @@ class TestCoreBootInstallMethods(IsolatedAsyncioTestCase):
request = call.args[2]
self.assertEqual(request.action, snapdapi.SystemAction.INSTALL)
self.assertEqual(request.step, snapdapi.SystemActionStep.FINISH)
[on_volume] = request.on_volumes.values()
role_to_path = {}
for s in on_volume.structure:
if s.role in [snapdapi.Role.SYSTEM_DATA,
snapdapi.Role.SYSTEM_BOOT,
snapdapi.Role.SYSTEM_SEED]:
role_to_path[s.role] = s.device
expected_role_to_path = {
snapdapi.Role.SYSTEM_DATA: mounts['/'].path,
snapdapi.Role.SYSTEM_BOOT: mounts['/boot'].path,
snapdapi.Role.SYSTEM_SEED: mounts['/boot/efi'].path,
}
self.assertEqual(expected_role_to_path, role_to_path)

View File

@ -142,7 +142,10 @@ class VolumeStructure:
update: VolumeUpdate = attr.Factory(VolumeUpdate)
def gpt_part_uuid(self):
if ',' in self.type:
return self.type.split(',', 1)[1].upper()
else:
return self.type
@attr.s(auto_attribs=True)