Merge pull request #1354 from dbungert/ptable-msdos-more-betterification

filesystem: logical part numbering and related
This commit is contained in:
Dan Bungert 2022-07-27 09:05:19 -06:00 committed by GitHub
commit 6b13c84e4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 322 additions and 80 deletions

View File

@ -18,6 +18,7 @@ import functools
from gettext import pgettext from gettext import pgettext
from subiquity.common.filesystem import boot, gaps, labels from subiquity.common.filesystem import boot, gaps, labels
from subiquity.common.types import GapUsable
from subiquity.models.filesystem import ( from subiquity.models.filesystem import (
Bootloader, Bootloader,
Disk, Disk,
@ -215,7 +216,11 @@ _can_partition = make_checker(DeviceAction.PARTITION)
@_can_partition.register(gaps.Gap) @_can_partition.register(gaps.Gap)
def _can_partition_gap(gap): def _can_partition_gap(gap):
return True if gap.usable == GapUsable.YES:
return True
if gap.usable == GapUsable.TOO_MANY_PRIMARY_PARTS:
return _("Primary partition limit reached")
return _("Unusable space")
_can_format = make_checker(DeviceAction.FORMAT) _can_format = make_checker(DeviceAction.FORMAT)

View File

@ -17,6 +17,7 @@ import functools
import attr import attr
from subiquity.common.types import GapUsable
from subiquity.models.filesystem import ( from subiquity.models.filesystem import (
align_up, align_up,
align_down, align_down,
@ -35,6 +36,7 @@ class Gap:
offset: int offset: int
size: int size: int
in_extended: bool = False in_extended: bool = False
usable: str = GapUsable.YES
type: str = 'gap' type: str = 'gap'
@ -42,6 +44,10 @@ class Gap:
def id(self): def id(self):
return 'gap-' + self.device.id return 'gap-' + self.device.id
@property
def is_usable(self):
return self.usable == GapUsable.YES
def split(self, size): def split(self, size):
"""returns a tuple of two new gaps, split from the current gap based on """returns a tuple of two new gaps, split from the current gap based on
the supplied size. If size is equal to the gap size, the second gap is the supplied size. If size is equal to the gap size, the second gap is
@ -53,11 +59,13 @@ class Gap:
first_gap = Gap(device=self.device, first_gap = Gap(device=self.device,
offset=self.offset, offset=self.offset,
size=size, size=size,
in_extended=self.in_extended) in_extended=self.in_extended,
usable=self.usable)
rest_gap = Gap(device=self.device, rest_gap = Gap(device=self.device,
offset=self.offset + size, offset=self.offset + size,
size=self.size - size, size=self.size - size,
in_extended=self.in_extended) in_extended=self.in_extended,
usable=self.usable)
return (first_gap, rest_gap) return (first_gap, rest_gap)
@ -66,19 +74,24 @@ def parts_and_gaps(device):
raise NotImplementedError(device) raise NotImplementedError(device)
def remaining_primary_partitions(device, info):
primaries = [p for p in device.partitions() if not p.is_logical]
return info.primary_part_limit - len(primaries)
def find_disk_gaps_v1(device): def find_disk_gaps_v1(device):
r = [] r = []
used = 0 used = 0
ad = device.alignment_data() info = device.alignment_data()
used += ad.min_start_offset used += info.min_start_offset
for p in device._partitions: for p in device._partitions:
used = align_up(used + p.size, 1 << 20) used = align_up(used + p.size, 1 << 20)
r.append(p) r.append(p)
if device._has_preexisting_partition(): if device._has_preexisting_partition():
return r return r
if device.ptable == 'vtoc' and len(device._partitions) >= 3: if remaining_primary_partitions(device, info) < 1:
return r return r
end = align_down(device.size - ad.min_end_offset, 1 << 20) end = align_down(device.size - info.min_end_offset, 1 << 20)
if end - used >= (1 << 20): if end - used >= (1 << 20):
r.append(Gap(device, used, end - used)) r.append(Gap(device, used, end - used))
return r return r
@ -102,13 +115,22 @@ def find_disk_gaps_v2(device, info=None):
return v - v % info.part_align return v - v % info.part_align
def maybe_add_gap(start, end, in_extended): def maybe_add_gap(start, end, in_extended):
if in_extended or primary_parts_remaining > 0:
usable = GapUsable.YES
else:
usable = GapUsable.TOO_MANY_PRIMARY_PARTS
if end - start >= info.min_gap_size: if end - start >= info.min_gap_size:
result.append(Gap(device, start, end - start, in_extended)) result.append(Gap(device=device,
offset=start,
size=end - start,
in_extended=in_extended,
usable=usable))
prev_end = info.min_start_offset prev_end = info.min_start_offset
parts = sorted(device._partitions, key=lambda p: p.offset) parts = device.partitions_by_offset()
extended_end = None extended_end = None
primary_parts_remaining = remaining_primary_partitions(device, info)
for part in parts + [None]: for part in parts + [None]:
if part is None: if part is None:

View File

@ -146,6 +146,12 @@ def _desc_lv(lv):
return _("LVM logical volume") return _("LVM logical volume")
@desc.register(gaps.Gap)
def _desc_gap(gap):
# This is only used in text "cannot add partition {desc}"... bit hackish.
return _("to gap")
@functools.singledispatch @functools.singledispatch
def label(device, *, short=False): def label(device, *, short=False):
"""A label that identifies `device` """A label that identifies `device`
@ -326,4 +332,4 @@ def _for_client_partition(partition, *, min_size=0):
@for_client.register(gaps.Gap) @for_client.register(gaps.Gap)
def _for_client_gap(gap, *, min_size=0): def _for_client_gap(gap, *, min_size=0):
return types.Gap(offset=gap.offset, size=gap.size) return types.Gap(offset=gap.offset, size=gap.size, usable=gap.usable)

View File

@ -29,6 +29,7 @@ from subiquity.models.tests.test_filesystem import (
) )
from subiquity.common.filesystem import gaps from subiquity.common.filesystem import gaps
from subiquity.common.types import GapUsable
class TestGaps(unittest.TestCase): class TestGaps(unittest.TestCase):
@ -270,7 +271,7 @@ class TestDiskGaps(unittest.TestCase):
info = PartitionAlignmentData( info = PartitionAlignmentData(
part_align=5, min_gap_size=1, min_start_offset=0, min_end_offset=0, part_align=5, min_gap_size=1, min_start_offset=0, min_end_offset=0,
ebr_space=2, primary_part_limit=10) ebr_space=2, primary_part_limit=10)
m, d = make_model_and_disk(size=100) m, d = make_model_and_disk(size=100, ptable='dos')
p1 = make_partition(m, d, offset=0, size=50, flag='extended') p1 = make_partition(m, d, offset=0, size=50, flag='extended')
p2 = make_partition(m, d, offset=5, size=45, flag='logical') p2 = make_partition(m, d, offset=5, size=45, flag='logical')
self.assertEqual( self.assertEqual(
@ -293,6 +294,41 @@ class TestDiskGaps(unittest.TestCase):
gaps.Gap(d, 50, 50, False), gaps.Gap(d, 50, 50, False),
]) ])
def test_unusable_gap_primaries(self):
info = PartitionAlignmentData(
part_align=10, min_gap_size=1, min_start_offset=0,
min_end_offset=0, primary_part_limit=1)
m, d = make_model_and_disk(size=100)
p = make_partition(m, d, offset=0, size=90)
g = gaps.Gap(d, offset=90, size=10,
usable=GapUsable.TOO_MANY_PRIMARY_PARTS)
self.assertEqual(
gaps.find_disk_gaps_v2(d, info),
[p, g])
def test_usable_gap_primaries(self):
info = PartitionAlignmentData(
part_align=10, min_gap_size=1, min_start_offset=0,
min_end_offset=0, primary_part_limit=2)
m, d = make_model_and_disk(size=100)
p = make_partition(m, d, offset=0, size=90)
g = gaps.Gap(d, offset=90, size=10, usable=GapUsable.YES)
self.assertEqual(
gaps.find_disk_gaps_v2(d, info),
[p, g])
def test_usable_gap_extended(self):
info = PartitionAlignmentData(
part_align=10, min_gap_size=1, min_start_offset=0,
min_end_offset=0, primary_part_limit=1)
m, d = make_model_and_disk(size=100)
p = make_partition(m, d, offset=0, size=100, flag='extended')
g = gaps.Gap(d, offset=0, size=100,
in_extended=True, usable=GapUsable.YES)
self.assertEqual(
gaps.find_disk_gaps_v2(d, info),
[p, g])
class TestMovableTrailingPartitionsAndGapSize(unittest.TestCase): class TestMovableTrailingPartitionsAndGapSize(unittest.TestCase):
@ -364,7 +400,7 @@ class TestMovableTrailingPartitionsAndGapSize(unittest.TestCase):
# 0----10---20---30---40---50---60---70---80---90---100 # 0----10---20---30---40---50---60---70---80---90---100
# #####[ p1 (extended) ] ##### # #####[ p1 (extended) ] #####
# ######[ p5 (logical) ] ##### # ######[ p5 (logical) ] #####
m, d = make_model_and_disk(size=100) m, d = make_model_and_disk(size=100, ptable='dos')
make_partition(m, d, offset=10, size=40, flag='extended') make_partition(m, d, offset=10, size=40, flag='extended')
p5 = make_partition(m, d, offset=12, size=38, flag='logical') p5 = make_partition(m, d, offset=12, size=38, flag='logical')
self.assertEqual( self.assertEqual(
@ -378,7 +414,7 @@ class TestMovableTrailingPartitionsAndGapSize(unittest.TestCase):
# 0----10---20---30---40---50---60---70---80---90---100 # 0----10---20---30---40---50---60---70---80---90---100
# #####[ p1 (extended) ][ p2 ]##### # #####[ p1 (extended) ][ p2 ]#####
# ######[ p5 (logical) ] ##### # ######[ p5 (logical) ] #####
m, d = make_model_and_disk(size=100) m, d = make_model_and_disk(size=100, ptable='dos')
make_partition(m, d, offset=10, size=40, flag='extended') make_partition(m, d, offset=10, size=40, flag='extended')
make_partition(m, d, offset=50, size=40) make_partition(m, d, offset=50, size=40)
p5 = make_partition(m, d, offset=12, size=38, flag='logical') p5 = make_partition(m, d, offset=12, size=38, flag='logical')
@ -393,7 +429,7 @@ class TestMovableTrailingPartitionsAndGapSize(unittest.TestCase):
# 0----10---20---30---40---50---60---70---80---90---100 # 0----10---20---30---40---50---60---70---80---90---100
# #####[ p1 (extended) ] ##### # #####[ p1 (extended) ] #####
# ######[ p5 (logical)] ##### # ######[ p5 (logical)] #####
m, d = make_model_and_disk(size=100) m, d = make_model_and_disk(size=100, ptable='dos')
make_partition(m, d, offset=10, size=40, flag='extended') make_partition(m, d, offset=10, size=40, flag='extended')
p5 = make_partition(m, d, offset=12, size=30, flag='logical') p5 = make_partition(m, d, offset=12, size=30, flag='logical')
self.assertEqual( self.assertEqual(
@ -407,7 +443,7 @@ class TestMovableTrailingPartitionsAndGapSize(unittest.TestCase):
# 0----10---20---30---40---50---60---70---80---90---100 # 0----10---20---30---40---50---60---70---80---90---100
# #####[ p1 (extended) ]##### # #####[ p1 (extended) ]#####
# ######[ p5 (logical)] [ p6 (logical)] ##### # ######[ p5 (logical)] [ p6 (logical)] #####
m, d = make_model_and_disk(size=100) m, d = make_model_and_disk(size=100, ptable='dos')
make_partition(m, d, offset=10, size=80, flag='extended') make_partition(m, d, offset=10, size=80, flag='extended')
p5 = make_partition(m, d, offset=12, size=30, flag='logical') p5 = make_partition(m, d, offset=12, size=30, flag='logical')
p6 = make_partition(m, d, offset=44, size=30, flag='logical') p6 = make_partition(m, d, offset=44, size=30, flag='logical')
@ -422,7 +458,7 @@ class TestMovableTrailingPartitionsAndGapSize(unittest.TestCase):
# 0----10---20---30---40---50---60---70---80---90---100 # 0----10---20---30---40---50---60---70---80---90---100
# #####[ p1 (extended) ]##### # #####[ p1 (extended) ]#####
# ######[ p5 (logical)] [ p6 (logical) ] ##### # ######[ p5 (logical)] [ p6 (logical) ] #####
m, d = make_model_and_disk(size=100) m, d = make_model_and_disk(size=100, ptable='dos')
make_partition(m, d, offset=10, size=80, flag='extended') make_partition(m, d, offset=10, size=80, flag='extended')
p5 = make_partition(m, d, offset=12, size=30, flag='logical') p5 = make_partition(m, d, offset=12, size=30, flag='logical')
p6 = make_partition(m, d, offset=44, size=44, flag='logical') p6 = make_partition(m, d, offset=44, size=44, flag='logical')
@ -449,6 +485,7 @@ class TestLargestGaps(unittest.TestCase):
d = make_disk() d = make_disk()
[gap] = gaps.parts_and_gaps(d) [gap] = gaps.parts_and_gaps(d)
self.assertEqual(gap, gaps.largest_gap(d)) self.assertEqual(gap, gaps.largest_gap(d))
self.assertTrue(gap.is_usable)
def test_two_gaps(self): def test_two_gaps(self):
m, d = make_model_and_disk(size=100 << 20) m, d = make_model_and_disk(size=100 << 20)
@ -457,6 +494,8 @@ class TestLargestGaps(unittest.TestCase):
make_partition(m, d, offset=40 << 20, size=20 << 20) make_partition(m, d, offset=40 << 20, size=20 << 20)
[_, g1, _, g2] = gaps.parts_and_gaps(d) [_, g1, _, g2] = gaps.parts_and_gaps(d)
self.assertEqual(g2, gaps.largest_gap(d)) self.assertEqual(g2, gaps.largest_gap(d))
self.assertTrue(g1.is_usable)
self.assertTrue(g2.is_usable)
def test_two_disks(self): def test_two_disks(self):
m = make_model() m = make_model()
@ -467,6 +506,8 @@ class TestLargestGaps(unittest.TestCase):
[d2g1] = gaps.parts_and_gaps(d2) [d2g1] = gaps.parts_and_gaps(d2)
self.assertEqual(d1g1, gaps.largest_gap(d1)) self.assertEqual(d1g1, gaps.largest_gap(d1))
self.assertEqual(d2g1, gaps.largest_gap(d2)) self.assertEqual(d2g1, gaps.largest_gap(d2))
self.assertTrue(d1g1.is_usable)
self.assertTrue(d2g1.is_usable)
def test_across_two_disks(self): def test_across_two_disks(self):
m = make_model() m = make_model()
@ -493,3 +534,10 @@ class TestLargestGaps(unittest.TestCase):
make_partition(m, d1, offset=0, size=100 << 20) make_partition(m, d1, offset=0, size=100 << 20)
make_partition(m, d2, offset=0, size=200 << 20) make_partition(m, d2, offset=0, size=200 << 20)
self.assertIsNone(gaps.largest_gap([d1, d2])) self.assertIsNone(gaps.largest_gap([d1, d2]))
class TestUsable(unittest.TestCase):
def test_strings(self):
self.assertEqual('YES', GapUsable.YES.name)
self.assertEqual('TOO_MANY_PRIMARY_PARTS',
GapUsable.TOO_MANY_PRIMARY_PARTS.name)

View File

@ -284,10 +284,16 @@ class Partition:
path: Optional[str] = None path: Optional[str] = None
class GapUsable(enum.Enum):
YES = enum.auto()
TOO_MANY_PRIMARY_PARTS = enum.auto()
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
class Gap: class Gap:
offset: int offset: int
size: int size: int
usable: GapUsable
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)

View File

@ -516,6 +516,12 @@ class _Device(_Formattable, ABC):
def partitions(self): def partitions(self):
return self._partitions return self._partitions
def partitions_by_offset(self):
return sorted(self._partitions, key=lambda p: p.offset)
def partitions_by_number(self):
return sorted(self._partitions, key=lambda p: p.number)
@property @property
def used(self): def used(self):
if self._is_entirely_used(): if self._is_entirely_used():
@ -668,6 +674,14 @@ class Disk(_Device):
return None return None
return id.encode('utf-8').decode('unicode_escape').strip() return id.encode('utf-8').decode('unicode_escape').strip()
def renumber_logical_partitions(self, removed_partition):
parts = [p for p in self.partitions_by_number()
if p.is_logical and p.number > removed_partition.number]
next_num = removed_partition.number
for part in parts:
part.number = next_num
next_num += 1
@fsobj("partition") @fsobj("partition")
class Partition(_Formattable): class Partition(_Formattable):
@ -688,11 +702,20 @@ class Partition(_Formattable):
def __post_init__(self): def __post_init__(self):
if self.number is not None: if self.number is not None:
return return
used_nums = {part.number for part in self.device._partitions
if part.number is not None} used_nums = {p.number for p in self.device._partitions
possible_nums = {i for i in range(1, len(self.device._partitions) + 1)} if p.number is not None
unused_nums = sorted(list(possible_nums - used_nums)) if p.is_logical == self.is_logical}
self.number = unused_nums.pop(0) primary_limit = self.device.alignment_data().primary_part_limit
if self.is_logical:
possible_nums = range(primary_limit + 1, 129)
else:
possible_nums = range(1, primary_limit + 1)
for num in possible_nums:
if num not in used_nums:
self.number = num
return
raise Exception('Exceeded number of available partitions')
def available(self): def available(self):
if self.flag in ['bios_grub', 'prep'] or self.grub_device: if self.flag in ['bios_grub', 'prep'] or self.grub_device:
@ -746,6 +769,10 @@ class Partition(_Formattable):
return None return None
return OsProber(**os_data) return OsProber(**os_data)
@property
def is_logical(self):
return self.flag == 'logical'
ok_for_lvm_vg = ok_for_raid ok_for_lvm_vg = ok_for_raid
@ -1444,6 +1471,7 @@ class FilesystemModel(object):
for p2 in movable_trailing_partitions_and_gap_size(part)[0]: for p2 in movable_trailing_partitions_and_gap_size(part)[0]:
p2.offset -= part.size p2.offset -= part.size
self._remove(part) self._remove(part)
part.device.renumber_logical_partitions(part)
if len(part.device._partitions) == 0: if len(part.device._partitions) == 0:
part.device.ptable = None part.device.ptable = None

View File

@ -16,6 +16,7 @@
import unittest import unittest
import attr import attr
from parameterized import parameterized
from subiquity.models.filesystem import ( from subiquity.models.filesystem import (
Bootloader, Bootloader,
@ -136,10 +137,12 @@ class FakeStorageInfo:
raw = attr.ib(default=attr.Factory(dict)) raw = attr.ib(default=attr.Factory(dict))
def make_model(bootloader=None): def make_model(bootloader=None, storage_version=None):
model = FilesystemModel() model = FilesystemModel()
if bootloader is not None: if bootloader is not None:
model.bootloader = bootloader model.bootloader = bootloader
if storage_version is not None:
model.storage_version = storage_version
model._probe_data = {} model._probe_data = {}
return model return model
@ -167,16 +170,20 @@ def make_model_and_disk(bootloader=None, **kw):
def make_partition(model, device=None, *, preserve=False, size=None, def make_partition(model, device=None, *, preserve=False, size=None,
offset=None, **kw): offset=None, **kw):
flag = kw.pop('flag', None)
if device is None: if device is None:
device = make_disk(model) device = make_disk(model)
if size is None or offset is None: if size is None or offset is None:
gap = gaps.largest_gap(device) gap = gaps.largest_gap(device)
if size is None: if size is None:
size = gap.size//2 if flag == 'extended':
size = gap.size
else:
size = gap.size//2
if offset is None: if offset is None:
offset = gap.offset offset = gap.offset
partition = Partition(m=model, device=device, size=size, offset=offset, partition = Partition(m=model, device=device, size=size, offset=offset,
preserve=preserve, **kw) preserve=preserve, flag=flag, **kw)
model._actions.append(partition) model._actions.append(partition)
return partition return partition
@ -676,37 +683,106 @@ class TestAutoInstallConfig(unittest.TestCase):
self.assertTrue(disk2p1.id in rendered_ids) self.assertTrue(disk2p1.id in rendered_ids)
class TestAlignmentData(unittest.TestCase):
def test_alignment_gaps_coherence(self):
for ptable in 'gpt', 'msdos', 'vtoc':
model = make_model(Bootloader.NONE)
disk1 = make_disk(model, ptable=ptable)
gaps_max = gaps.largest_gap_size(disk1)
align_data = disk1.alignment_data()
align_max = (disk1.size - align_data.min_start_offset
- align_data.min_end_offset)
# The alignment data currently has a better notion of end
# information, so gaps produces numbers that are too small by 1MiB
# for ptable != 'gpt'
self.assertTrue(gaps_max <= align_max, f'ptable={ptable}')
class TestPartitionNumbering(unittest.TestCase): class TestPartitionNumbering(unittest.TestCase):
def test_basic(self): def setUp(self):
m, d1 = make_model_and_disk(ptable='gpt') self.cur_idx = 1
p1 = make_partition(m, d1)
p2 = make_partition(m, d1)
p3 = make_partition(m, d1)
self.assertEqual(1, p1.number)
self.assertEqual(2, p2.number)
self.assertEqual(3, p3.number)
def test_p1_preserved(self): def assert_next(self, part):
m = make_model() self.assertEqual(self.cur_idx, part.number)
m.storage_version = 2 self.cur_idx += 1
d1 = make_disk(m, ptable='gpt')
def test_gpt(self):
m, d1 = make_model_and_disk(ptable='gpt')
for _ in range(8):
self.assert_next(make_partition(m, d1))
@parameterized.expand([
['msdos', 4],
['vtoc', 3],
])
def test_all_primary(self, ptable, primaries):
m = make_model(storage_version=2)
d1 = make_disk(m, ptable=ptable)
for _ in range(primaries):
self.assert_next(make_partition(m, d1))
@parameterized.expand([
['msdos', 4],
['vtoc', 3],
])
def test_primary_and_extended(self, ptable, primaries):
m = make_model(storage_version=2)
d1 = make_disk(m, ptable=ptable)
for _ in range(primaries - 1):
self.assert_next(make_partition(m, d1))
self.assert_next(make_partition(m, d1, flag='extended'))
for _ in range(3):
self.assert_next(make_partition(m, d1, flag='logical'))
@parameterized.expand(
[[pt, primaries, i]
for pt, primaries in (('msdos', 4), ('vtoc', 3))
for i in range(3)]
)
def test_delete_logical(self, ptable, primaries, idx_to_remove):
m = make_model(storage_version=2)
d1 = make_disk(m, ptable=ptable)
self.assert_next(make_partition(m, d1))
self.assert_next(make_partition(m, d1, flag='extended'))
self.cur_idx = primaries + 1
parts = [make_partition(m, d1, flag='logical') for _ in range(3)]
for p in parts:
self.assert_next(p)
to_remove = parts.pop(idx_to_remove)
m.remove_partition(to_remove)
self.cur_idx = primaries + 1
for p in parts:
self.assert_next(p)
@parameterized.expand(
[[pt, primaries, i]
for pt, primaries in (('msdos', 4), ('vtoc', 3))
for i in range(3)]
)
def test_out_of_offset_order(self, ptable, primaries, idx_to_remove):
m = make_model(storage_version=2)
d1 = make_disk(m, ptable=ptable, size=100 << 20)
self.assert_next(make_partition(m, d1, size=10 << 20))
self.assert_next(make_partition(m, d1, flag='extended'))
self.cur_idx = primaries + 1
parts = []
parts.append(make_partition(
m, d1, flag='logical', size=9 << 20, offset=30 << 20))
parts.append(make_partition(
m, d1, flag='logical', size=9 << 20, offset=20 << 20))
parts.append(make_partition(
m, d1, flag='logical', size=9 << 20, offset=40 << 20))
for p in parts:
self.assert_next(p)
to_remove = parts.pop(idx_to_remove)
m.remove_partition(to_remove)
self.cur_idx = primaries + 1
for p in parts:
self.assert_next(p)
@parameterized.expand([
[1, 'msdos', 4],
[1, 'vtoc', 3],
[2, 'msdos', 4],
[2, 'vtoc', 3],
])
def test_no_extra_primary(self, sv, ptable, primaries):
m = make_model(storage_version=sv)
d1 = make_disk(m, ptable=ptable, size=100 << 30)
for _ in range(primaries):
self.assert_next(make_partition(m, d1, size=1 << 30))
with self.assertRaises(Exception):
make_partition(m, d1)
@parameterized.expand([['gpt'], ['msdos'], ['vtoc']])
def test_p1_preserved(self, ptable):
m = make_model(storage_version=2)
d1 = make_disk(m, ptable=ptable)
p1 = make_partition(m, d1, preserve=True, number=1) p1 = make_partition(m, d1, preserve=True, number=1)
p2 = make_partition(m, d1) p2 = make_partition(m, d1)
p3 = make_partition(m, d1) p3 = make_partition(m, d1)
@ -717,10 +793,10 @@ class TestPartitionNumbering(unittest.TestCase):
self.assertEqual(3, p3.number) self.assertEqual(3, p3.number)
self.assertEqual(False, p3.preserve) self.assertEqual(False, p3.preserve)
def test_p2_preserved(self): @parameterized.expand([['gpt'], ['msdos'], ['vtoc']])
m = make_model() def test_p2_preserved(self, ptable):
m.storage_version = 2 m = make_model(storage_version=2)
d1 = make_disk(m, ptable='gpt') d1 = make_disk(m, ptable=ptable)
p2 = make_partition(m, d1, preserve=True, number=2) p2 = make_partition(m, d1, preserve=True, number=2)
p1 = make_partition(m, d1) p1 = make_partition(m, d1)
p3 = make_partition(m, d1) p3 = make_partition(m, d1)
@ -730,3 +806,12 @@ class TestPartitionNumbering(unittest.TestCase):
self.assertEqual(True, p2.preserve) self.assertEqual(True, p2.preserve)
self.assertEqual(3, p3.number) self.assertEqual(3, p3.number)
self.assertEqual(False, p3.preserve) self.assertEqual(False, p3.preserve)
class TestAlignmentData(unittest.TestCase):
@parameterized.expand([['gpt'], ['msdos'], ['vtoc']])
def test_alignment_gaps_coherence(self, ptable):
d1 = make_disk(ptable=ptable)
ad = d1.alignment_data()
align_max = d1.size - ad.min_start_offset - ad.min_end_offset
self.assertEqual(gaps.largest_gap_size(d1), align_max)

View File

@ -65,44 +65,40 @@ class TestSubiquityControllerFilesystem(IsolatedAsyncioTestCase):
class TestGuided(TestCase): class TestGuided(TestCase):
boot_expectations = [
(Bootloader.UEFI, 'gpt', '/boot/efi'),
(Bootloader.UEFI, 'msdos', '/boot/efi'),
(Bootloader.BIOS, 'gpt', None),
# BIOS + msdos is different
(Bootloader.PREP, 'gpt', None),
(Bootloader.PREP, 'msdos', None),
]
def _guided_setup(self, bootloader, ptable): def _guided_setup(self, bootloader, ptable):
self.app = make_app() self.app = make_app()
self.app.opts.bootloader = bootloader.value self.app.opts.bootloader = bootloader.value
self.controller = FilesystemController(self.app) self.controller = FilesystemController(self.app)
self.controller.model = make_model(bootloader) self.controller.model = self.model = make_model(bootloader)
self.controller.model._probe_data = {'blockdev': {}} self.model._probe_data = {'blockdev': {}}
self.d1 = make_disk(self.controller.model, ptable=ptable) self.d1 = make_disk(self.model, ptable=ptable)
@parameterized.expand([ @parameterized.expand(boot_expectations)
(Bootloader.UEFI, 'gpt', '/boot/efi'),
(Bootloader.UEFI, 'msdos', '/boot/efi'),
(Bootloader.BIOS, 'gpt', None),
# BIOS + msdos is different
(Bootloader.PREP, 'gpt', None),
(Bootloader.PREP, 'msdos', None),
])
def test_guided_direct(self, bootloader, ptable, p1mnt): def test_guided_direct(self, bootloader, ptable, p1mnt):
self._guided_setup(bootloader, ptable) self._guided_setup(bootloader, ptable)
self.controller.guided_direct(self.d1) self.controller.guided_direct(self.d1)
[d1p1, d1p2] = self.d1.partitions() [d1p1, d1p2] = self.d1.partitions()
self.assertEqual(p1mnt, d1p1.mount) self.assertEqual(p1mnt, d1p1.mount)
self.assertEqual('/', d1p2.mount) self.assertEqual('/', d1p2.mount)
self.assertEqual(d1p1.size + d1p1.offset, d1p2.offset) self.assertIsNone(gaps.largest_gap(self.d1))
def test_guided_direct_BIOS_MSDOS(self): def test_guided_direct_BIOS_MSDOS(self):
self._guided_setup(Bootloader.BIOS, 'msdos') self._guided_setup(Bootloader.BIOS, 'msdos')
self.controller.guided_direct(self.d1) self.controller.guided_direct(self.d1)
[d1p1] = self.d1.partitions() [d1p1] = self.d1.partitions()
self.assertEqual('/', d1p1.mount) self.assertEqual('/', d1p1.mount)
self.assertIsNone(gaps.largest_gap(self.d1))
@parameterized.expand([ @parameterized.expand(boot_expectations)
(Bootloader.UEFI, 'gpt', '/boot/efi'),
(Bootloader.UEFI, 'msdos', '/boot/efi'),
(Bootloader.BIOS, 'gpt', None),
# BIOS + msdos is different
(Bootloader.PREP, 'gpt', None),
(Bootloader.PREP, 'msdos', None),
])
def test_guided_lvm(self, bootloader, ptable, p1mnt): def test_guided_lvm(self, bootloader, ptable, p1mnt):
self._guided_setup(bootloader, ptable) self._guided_setup(bootloader, ptable)
self.controller.guided_lvm(self.d1) self.controller.guided_lvm(self.d1)
@ -110,16 +106,21 @@ class TestGuided(TestCase):
self.assertEqual(p1mnt, d1p1.mount) self.assertEqual(p1mnt, d1p1.mount)
self.assertEqual('/boot', d1p2.mount) self.assertEqual('/boot', d1p2.mount)
self.assertEqual(None, d1p3.mount) self.assertEqual(None, d1p3.mount)
self.assertEqual(d1p1.size + d1p1.offset, d1p2.offset) [vg] = self.model._all(type='lvm_volgroup')
self.assertEqual(d1p2.size + d1p2.offset, d1p3.offset) [part] = list(vg.devices)
self.assertEqual(d1p3, part)
self.assertIsNone(gaps.largest_gap(self.d1))
def test_guided_lvm_BIOS_MSDOS(self): def test_guided_lvm_BIOS_MSDOS(self):
self._guided_setup(Bootloader.BIOS, 'msdos') self._guided_setup(Bootloader.BIOS, 'msdos')
self.controller.guided_lvm(self.d1) self.controller.guided_lvm(self.d1)
[d1p1, d1p2] = self.d1.partitions() [d1p1, d1p2] = self.d1.partitions()
self.assertEqual('/boot', d1p1.mount) self.assertEqual('/boot', d1p1.mount)
[vg] = self.model._all(type='lvm_volgroup')
[part] = list(vg.devices)
self.assertEqual(d1p2, part)
self.assertEqual(None, d1p2.mount) self.assertEqual(None, d1p2.mount)
self.assertEqual(d1p1.size + d1p1.offset, d1p2.offset) self.assertIsNone(gaps.largest_gap(self.d1))
class TestLayout(TestCase): class TestLayout(TestCase):

View File

@ -1126,6 +1126,7 @@ class TestGap(TestAPI):
gap = sda['partitions'][0] gap = sda['partitions'][0]
expected = (100 << 30) - (2 << 20) expected = (100 << 30) - (2 << 20)
self.assertEqual(expected, gap['size']) self.assertEqual(expected, gap['size'])
self.assertEqual('YES', gap['usable'])
async def test_gap_at_end(self): async def test_gap_at_end(self):
async with start_server('examples/simple.json') as inst: async with start_server('examples/simple.json') as inst:
@ -1279,3 +1280,43 @@ class TestIdentityValidation(TestAPI):
resp = await inst.get('/identity/validate_username', resp = await inst.get('/identity/validate_username',
username='o#$%^&') username='o#$%^&')
self.assertEqual(resp, 'INVALID_CHARS') self.assertEqual(resp, 'INVALID_CHARS')
class TestManyPrimaries(TestAPI):
@timeout()
async def test_create_primaries(self):
cfg = 'examples/simple.json'
extra = ['--storage-version', '2']
async with start_server(cfg, extra_args=extra) as inst:
resp = await inst.get('/storage/v2')
d1 = resp['disks'][0]
data = {'disk_id': d1['id'], 'ptable': 'msdos'}
resp = await inst.post('/storage/v2/reformat_disk', data)
[gap] = match(resp['disks'][0]['partitions'], _type='Gap')
for _ in range(4):
self.assertEqual('YES', gap['usable'])
data = {
'disk_id': d1['id'],
'gap': gap,
'partition': {
'size': 1 << 30,
'format': 'ext4',
}
}
resp = await inst.post('/storage/v2/add_partition', data)
[gap] = match(resp['disks'][0]['partitions'], _type='Gap')
self.assertEqual('TOO_MANY_PRIMARY_PARTS', gap['usable'])
data = {
'disk_id': d1['id'],
'gap': gap,
'partition': {
'size': 1 << 30,
'format': 'ext4',
}
}
with self.assertRaises(ClientResponseError):
await inst.post('/storage/v2/add_partition', data)