Merge pull request #1309 from dbungert/gap-utils

gap: several utilities
This commit is contained in:
Dan Bungert 2022-06-13 13:58:49 -06:00 committed by GitHub
commit a44939e402
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 9 deletions

View File

@ -28,7 +28,8 @@ from subiquity.models.filesystem import (
)
@attr.s(auto_attribs=True)
# should also set on_setattr=None with attrs 20.1.0
@attr.s(auto_attribs=True, frozen=True)
class Gap:
device: object
offset: int
@ -41,6 +42,24 @@ class Gap:
def id(self):
return 'gap-' + self.device.id
def split(self, size):
"""returns a tuple of two new gaps, split from the current gap based on
the supplied size. If size is equal to the gap size, the second gap is
None. The original gap is unmodified."""
if size > self.size:
raise Exception('requested size larger than gap')
if size == self.size:
return (self, None)
first_gap = Gap(device=self.device,
offset=self.offset,
size=size,
in_extended=self.in_extended)
rest_gap = Gap(device=self.device,
offset=self.offset + size,
size=self.size - size,
in_extended=self.in_extended)
return (first_gap, rest_gap)
@functools.singledispatch
def parts_and_gaps(device):
@ -186,3 +205,11 @@ def movable_trailing_partitions_and_gap_size(partition):
else:
return (trailing_partitions, 0)
return (trailing_partitions, 0)
def at_offset(device, offset):
for pg in parts_and_gaps(device):
if isinstance(pg, Gap):
if pg.offset == offset:
return pg
return None

View File

@ -22,6 +22,7 @@ from subiquity.models.filesystem import (
MiB,
)
from subiquity.models.tests.test_filesystem import (
make_disk,
make_model_and_disk,
make_partition,
)
@ -31,26 +32,67 @@ from subiquity.common.filesystem import gaps
class TestGaps(unittest.TestCase):
def test_basic(self):
model, disk1 = make_model_and_disk()
[gap] = gaps.parts_and_gaps(make_disk())
self.assertTrue(isinstance(gap, gaps.Gap))
self.assertEqual(MiB, gap.offset)
pg = gaps.parts_and_gaps(disk1)
self.assertEqual(1, len(pg))
self.assertTrue(isinstance(pg[0], gaps.Gap))
self.assertEqual(MiB, pg[0].offset)
class TestSplitGap(unittest.TestCase):
def test_equal(self):
[gap] = gaps.parts_and_gaps(make_disk())
actual = gap.split(gap.size)
self.assertEqual((gap, None), actual)
def test_too_big(self):
[gap] = gaps.parts_and_gaps(make_disk())
with self.assertRaises(Exception):
gap.split(gap.size + MiB)
def test_split(self):
[gap] = gaps.parts_and_gaps(make_disk(size=100 << 30))
size = 10 << 30
new_gaps = gap.split(size)
self.assertEqual(2, len(new_gaps))
self.assertEqual(size, new_gaps[0].size)
self.assertEqual(gap.size - size, new_gaps[1].size)
self.assertEqual(gap.offset, new_gaps[0].offset)
self.assertEqual(gap.offset + size, new_gaps[1].offset)
class TestAtOffset(unittest.TestCase):
def test_zero(self):
self.assertIsNone(gaps.at_offset(make_disk(), 0))
def test_match(self):
[gap] = gaps.parts_and_gaps(make_disk())
self.assertEqual(gap, gaps.at_offset(gap.device, gap.offset))
def test_not_match(self):
[gap] = gaps.parts_and_gaps(make_disk())
self.assertIsNone(gaps.at_offset(gap.device, gap.offset + 1))
def test_two_gaps(self):
m, d = make_model_and_disk(size=100 << 20)
m.storage_version = 2
make_partition(m, d, offset=0, size=20 << 20)
make_partition(m, d, offset=40 << 20, size=20 << 20)
[_, g1, _, g2] = gaps.parts_and_gaps(d)
self.assertEqual(g1, gaps.at_offset(d, 20 << 20))
self.assertEqual(g2, gaps.at_offset(d, 60 << 20))
class TestDiskGaps(unittest.TestCase):
def test_no_partition_gpt(self):
size = 1 << 30
m, d = make_model_and_disk(size=size, ptable='gpt')
d = make_disk(size=size, ptable='gpt')
self.assertEqual(
gaps.find_disk_gaps_v2(d),
[gaps.Gap(d, MiB, size - 2*MiB, False)])
def test_no_partition_dos(self):
size = 1 << 30
m, d = make_model_and_disk(size=size, ptable='dos')
d = make_disk(size=size, ptable='dos')
self.assertEqual(
gaps.find_disk_gaps_v2(d),
[gaps.Gap(d, MiB, size - MiB, False)])

View File

@ -143,7 +143,9 @@ def make_model(bootloader=None):
return model
def make_disk(fs_model, **kw):
def make_disk(fs_model=None, **kw):
if fs_model is None:
fs_model = make_model()
if 'serial' not in kw:
kw['serial'] = 'serial%s' % len(fs_model._actions)
if 'path' not in kw: