subiquity/subiquitycore/tests/test_models_filesystems.py

462 lines
17 KiB
Python
Raw Normal View History

import argparse
import logging
import random
import testtools
import yaml
from mock import patch
from subiquitycore.models.blockdev import (Blockdev,
blockdev_align_up,
FIRST_PARTITION_OFFSET,
GPT_END_RESERVE,
sort_actions)
from subiquitycore.models.filesystem import FilesystemModel
from subiquitycore.prober import Prober
from subiquitycore.tests import fakes
GB = 1 << 40
class TestFilesystemModel(testtools.TestCase):
def setUp(self):
super(TestFilesystemModel, self).setUp()
# don't show logging messages while testing
logging.disable(logging.CRITICAL)
self.make_fsm()
# mocking the reading of the fake data saves on IO
@patch.object(Prober, '_load_machine_config')
@patch.object(Prober, 'get_storage')
def make_fsm(self, _get_storage, _load_machine_config):
_get_storage.return_value = fakes.FAKE_MACHINE_STORAGE_DATA
_load_machine_config.return_value = fakes.FAKE_MACHINE_JSON_DATA
self.opts = argparse.Namespace()
self.opts.machine_config = fakes.FAKE_MACHINE_JSON
self.opts.dry_run = True
self.prober = Prober(self.opts)
self.storage = fakes.FAKE_MACHINE_STORAGE_DATA
self.fsm = FilesystemModel(self.prober, self.opts)
def test_init(self):
self.assertNotEqual(self.fsm, None)
self.assertEqual(self.fsm.info, {})
self.assertEqual(self.fsm.devices, {})
self.assertEqual(self.fsm.raid_devices, {})
self.assertEqual(self.fsm.storage, {})
def test_get_signals(self):
self.assertEqual(sorted(self.fsm.get_signals()),
sorted(self.fsm.signals + self.fsm.fs_menu))
def test_get_signal_by_name(self):
for (name, signal, method) in self.fsm.get_signals():
self.assertEqual(self.fsm.get_signal_by_name(name), signal)
def test_get_menu(self):
self.assertEqual(sorted(self.fsm.get_menu()),
sorted(self.fsm.fs_menu))
def test_probe_storage(self):
'''sd[b..i]'''
disks = [d for d in self.storage.keys()
if self.storage[d]['DEVTYPE'] == 'disk' and
self.storage[d]['MAJOR'] in ['8', '253']]
self.fsm.probe_storage()
self.assertNotEqual(self.fsm.storage, {})
self.assertEqual(sorted(self.fsm.info.keys()),
sorted(disks))
def test_get_disk(self):
self.fsm.probe_storage()
diskname = random.choice(list(self.fsm.info.keys()))
disk = Blockdev(diskname,
self.fsm.info[diskname].serial,
self.fsm.info[diskname].model,
size=self.fsm.info[diskname].size)
test_disk = self.fsm.get_disk(diskname)
print(disk)
print(test_disk)
self.assertEqual(test_disk, disk)
def test_get_disk_from_partition(self):
self.fsm.probe_storage()
diskname = random.choice(list(self.fsm.info.keys()))
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, disk.freespace, None, None, flag='raid')
partpath = '{}{}'.format(disk.path, 1)
print(partpath)
self.assertTrue(partpath[-1], 1)
test_disk = self.fsm.get_disk(partpath)
print(disk)
print(test_disk)
self.assertEqual(test_disk, disk)
def test_get_all_disks(self):
self.fsm.probe_storage()
all_disks = self.fsm.get_all_disks()
for disk in all_disks:
self.assertTrue(disk in self.fsm.devices.values())
def test_get_available_disks(self):
''' occupy one of the probed disks and ensure
that it's not included in the available disks
result since it's not actually avaialable
'''
self.fsm.probe_storage()
diskname = random.choice(list(self.fsm.info.keys()))
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, disk.freespace, None, None, flag='raid')
avail_disks = self.fsm.get_available_disks()
self.assertLess(len(avail_disks), len(self.fsm.devices.values()))
self.assertTrue(disk not in avail_disks)
def test_add_device(self):
self.fsm.probe_storage()
diskname = random.choice(list(self.fsm.info.keys()))
disk = Blockdev(diskname,
self.fsm.info[diskname].serial,
self.fsm.info[diskname].model,
size=self.fsm.info[diskname].size)
devname = '/dev/md0'
self.fsm.add_device(devname, disk)
self.assertTrue(devname in self.fsm.devices)
def test_get_partitions(self):
self.fsm.probe_storage()
# no partitions
partitions = self.fsm.get_partitions()
self.assertEqual(len(partitions), 0)
# add one to a random disk
diskname = random.choice(list(self.fsm.info.keys()))
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, disk.freespace, None, None, flag='raid')
# we added one, we should get one
partitions = self.fsm.get_partitions()
self.assertEqual(len(partitions), 1)
# it should have the same base device name
print(partitions, diskname)
self.assertTrue(partitions[0].startswith(diskname))
def test_installable(self):
self.fsm.probe_storage()
self.assertEqual(self.fsm.installable(), False)
# create a partition that installs to root(/)
diskname = random.choice(list(self.fsm.info.keys()))
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, disk.freespace, 'ext4', '/', flag='bios_grub')
# now we should be installable
self.assertEqual(self.fsm.installable(), True)
def test_not_installable(self):
self.fsm.probe_storage()
# create a partition that installs to not root(/)
diskname = random.choice(list(self.fsm.info.keys()))
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, disk.freespace, 'ext4', '/opt', flag='bios_grub')
# we should not be installable
self.assertEqual(self.fsm.installable(), False)
def test_bootable(self):
self.fsm.probe_storage()
self.assertEqual(self.fsm.bootable(), False)
# create a partition that installs to root(/)
diskname = random.choice(list(self.fsm.info.keys()))
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, disk.freespace, 'ext4', '/', flag='bios_grub')
# now we should be installable
self.assertEqual(self.fsm.bootable(), True)
def test_get_empty_disks(self):
self.fsm.probe_storage()
empty = self.fsm.get_empty_disks()
avail_disks = self.fsm.get_available_disks()
self.assertEqual(len(empty), len(avail_disks))
# create a partition but not FS or Mount
diskname = random.choice(self.fsm.get_available_disk_names())
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, int(disk.freespace / 2), None, None, flag='raid')
self.assertEqual(len(disk.partitions), 1)
print('disk: {}'.format(disk))
print('disk avail: {} is_mounted={} percent_free={}'.format(
disk.devpath, disk.is_mounted(), disk.percent_free,
len(disk.partitions)))
# we should have one less empty disk than available
empty = self.fsm.get_empty_disks()
avail_disks = self.fsm.get_available_disks()
print('empty')
for d in empty:
print('empty: {} is_mounted={} percent_free={}'.format(
d.devpath, d.is_mounted(), d.percent_free,
len(d.partitions)))
print('avail')
for d in avail_disks:
print('avail: {} is_mounted={} percent_free={}'.format(
d.devpath, d.is_mounted(), d.percent_free,
len(d.partitions)))
self.assertLess(len(empty), len(avail_disks))
def test_get_empty_disks_names(self):
self.fsm.probe_storage()
empty_names = self.fsm.get_empty_disk_names()
for name in empty_names:
print(name)
self.assertTrue(name in self.fsm.devices)
def test_get_empty_partition_names(self):
self.fsm.probe_storage()
empty = self.fsm.get_empty_partition_names()
self.assertEqual(empty, [])
# create a partition (not full sized) but not FS or Mount
diskname = random.choice(self.fsm.get_available_disk_names())
disk = self.fsm.get_disk(diskname)
disk.add_partition(1, int(disk.freespace / 2), None, None, flag=None)
# one empty partition
[empty] = self.fsm.get_empty_partition_names()
print('empty={}'.format(empty))
print('diskane={}'.format(diskname))
self.assertTrue(diskname in empty)
def test_get_empty_partition_names(self):
self.fsm.probe_storage()
diskname = random.choice(self.fsm.get_available_disk_names())
disk = self.fsm.get_disk(diskname)
# create a partition (not full sized) but not FS or Mount
disk.add_partition(1, int(disk.freespace / 2), None, None, flag='raid')
avail_disk_names = self.fsm.get_available_disk_names()
print(disk.devpath)
print(avail_disk_names)
self.assertTrue(disk.devpath in avail_disk_names)
class TestBlockdev(testtools.TestCase):
def setUp(self):
super(TestBlockdev, self).setUp()
self.devpath = '/dev/foobar'
self.serial = 'serial'
self.model = 'model'
self.parttype = 'gpt'
self.size = 128 * GB
self.bd = Blockdev(self.devpath, self.serial, self.model,
self.parttype, self.size)
def test_blockdev_init(self):
# verify
self.assertNotEqual(self.bd, None)
self.assertEqual(self.bd.available, True)
self.assertEqual(self.bd.blocktype, 'disk')
self.assertEqual(self.bd.devpath, self.devpath)
self.assertEqual(self.bd.freespace, self.size)
self.assertEqual(self.bd.model, self.model)
self.assertEqual(self.bd.path, self.devpath)
self.assertEqual(self.bd.percent_free, 100)
self.assertEqual(self.bd.size, self.size)
self.assertEqual(self.bd.usedspace, 0)
self.assertEqual(list(self.bd.available_partitions), [])
self.assertEqual(list(self.bd.filesystems), [])
self.assertEqual(list(self.bd.mounts), [])
self.assertEqual(list(self.bd.partitions), [])
self.assertEqual(list(self.bd.partnames), [])
# requires mock
#self.assertEqual(self.bd.mounted, [])
def add_partition(self, partnum=1, partsize=1 * GB, fstype='ext4',
mountpoint='/', flag='bios_grub'):
return self.bd.add_partition(partnum, partsize, fstype,
mountpoint, flag)
def test_blockdev_add_first_partition(self):
# add a default partition
partnum=1
partsize=1*GB
partpath='{}{}'.format(self.devpath, 1)
partsize_aligned = self.add_partition()
# verify
self.assertEqual(len(list(self.bd.partitions)), 1)
new_part = self.bd.partitions[1]
# first partition has an offset and alignment (1M)
size_plus_offset_aligned = blockdev_align_up(partsize + FIRST_PARTITION_OFFSET)
self.assertEqual(new_part.size, size_plus_offset_aligned -
FIRST_PARTITION_OFFSET)
# partition check
partpath = "{}{}".format(self.devpath, '1')
self.assertTrue(partpath in self.bd.partnames)
# format check
self.assertTrue(partpath in self.bd.filesystems)
# mount check
self.assertTrue(partpath in self.bd._mounts)
def test_blockdev_add_additional_partition(self):
self.add_partition()
partsize = 2 * GB
new_size = self.add_partition(partnum=2, partsize=partsize, fstype='ext4',
mountpoint='/foo', flag='boot')
self.assertEqual(len(list(self.bd.partitions)), 2)
print([action.get() for (num, action) in self.bd.partitions.items()])
# additional partitions don't have an offset, just alignment
new_part = self.bd.partitions[2]
offset_aligned = blockdev_align_up(partsize)
self.assertEqual(offset_aligned, new_part.size)
self.assertEqual(new_size, new_part.size)
self.assertEqual(offset_aligned, new_size)
def test_blockdev_add_partition_no_format_no_mount(self):
self.add_partition()
partnum=2
new_size = self.add_partition(partnum=partnum, partsize=1 * GB, fstype=None,
mountpoint=None, flag='raid')
partpath='{}{}'.format(self.devpath, partnum)
self.assertEqual(len(list(self.bd.partitions)), 2)
print([action.get() for (num, action) in self.bd.partitions.items()])
# format check
self.assertTrue(partpath not in self.bd.filesystems)
# mount check
self.assertTrue(partpath not in self.bd._mounts)
def test_blockdev_lastpartnumber(self):
self.add_partition()
self.assertEqual(self.bd.lastpartnumber, 1)
def test_blockdev_get_partition(self):
partpath='{}{}'.format(self.devpath, '1')
self.add_partition()
new_part = self.bd.partitions[1]
part2 = self.bd.get_partition(partpath)
self.assertEqual(new_part, part2)
def test_blockdev_get_partition_with_string(self):
''' attempt to add a partition with number as a string type '''
partnum = '1'
self.add_partition(partnum=partnum)
# format the partpath with devpath and partnum
partpath='{}{}'.format(self.devpath, partnum)
# we shouldn't be able to get it via a string index
self.assertRaises(KeyError, lambda x: self.bd.partitions[x], partnum)
# check that we did create the partition and store it
# with an integer as the key in the partitions dictionary
new_part = self.bd.partitions[int(partnum)]
part2 = self.bd.get_partition(partpath)
self.assertEqual(new_part, part2)
def test_blockdev_get_actions(self):
self.add_partition()
actions = self.bd.get_actions()
# actions: disk, partition, format, mount
self.assertEqual(len(actions), 4)
action_types = [a.get('type') for a in actions]
for a in ['disk', 'partition', 'format', 'mount']:
self.assertTrue(a in action_types)
def test_blockdev_sort_actions(self):
self.add_partition()
actions = sort_actions(self.bd.get_actions())
# self.bd has a partition, add_partition method adds a
# disk action, partition action, a format, and a mount point action.
# We should have a sorted order of actions which define disk,
# partition it, format and then mount confirm this by walking up
# the order and comparing action type
for (idx, a) in enumerate(actions):
print(idx, a)
order = ['disk', 'partition', 'format', 'mount']
for (idx, type) in enumerate(order):
print(idx, type)
self.assertEqual(order[idx], actions[idx].get('type'))
def test_blockdev_get_fs_table(self):
self.add_partition()
partnum = 1
partsize = self.bd.partitions[partnum].size
partpath = '{}{}'.format(self.devpath, partnum)
mount = self.bd._mounts[partpath]
fstype = self.bd.filesystems[partpath].fstype
# test
fs_table = self.bd.get_fs_table()
# verify
self.assertEqual(len(fs_table), len(self.bd.partitions))
self.assertEqual(mount, fs_table[0][0])
self.assertEqual(partsize, fs_table[0][1])
self.assertEqual(fstype, fs_table[0][2])
self.assertEqual(partpath, fs_table[0][3])
def test_blockdev_get_fs_table_swap(self):
self.add_partition()
partnum=2
self.add_partition(partnum=partnum, partsize=1 * GB, fstype='swap',
mountpoint=None, flag=None)
partsize = self.bd.partitions[partnum].size
partpath = '{}{}'.format(self.devpath, partnum)
fstype = 'swap'
mount = fstype
# test
fs_table = self.bd.get_fs_table()
# verify
self.assertEqual(len(fs_table), len(self.bd.partitions))
self.assertEqual(mount, fs_table[1][0])
self.assertEqual(partsize, fs_table[1][1])
self.assertEqual(fstype, fs_table[1][2])
self.assertEqual(partpath, fs_table[1][3])
def test_blockdev_available_partitions(self):
# add a non-empty partition
self.add_partition()
# we shouldn't have any empty partitions
empty = self.bd.available_partitions
self.assertEqual(empty, [])
partnum=2
self.add_partition(partnum=partnum, partsize=1 * GB,
fstype='leave unformatted',
mountpoint=None, flag=None)
# we should have one empty partition
empty = self.bd.available_partitions
print(empty)
self.assertEqual(len(empty), 1)