Merge pull request #867 from mwhudson/disk-matching-fixes

check udev data directly when processing match: specs
This commit is contained in:
Michael Hudson-Doyle 2021-01-06 13:56:05 +13:00 committed by GitHub
commit b13dabba85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 23 deletions

View File

@ -1386,17 +1386,23 @@ class FilesystemModel(object):
def _make_matchers(self, match):
matchers = []
def _udev_val(disk, key):
return self._probe_data['blockdev'].get(disk.path, {}).get(key, '')
def match_serial(disk):
if disk.serial is not None:
return fnmatch.fnmatchcase(disk.serial, match['serial'])
return fnmatch.fnmatchcase(
_udev_val(disk, "ID_SERIAL"), match['serial'])
def match_model(disk):
if disk.model is not None:
return fnmatch.fnmatchcase(disk.model, match['model'])
return fnmatch.fnmatchcase(
_udev_val(disk, "ID_MODEL"), match['model'])
def match_vendor(disk):
return fnmatch.fnmatchcase(
_udev_val(disk, "ID_VENDOR"), match['vendor'])
def match_path(disk):
if disk.path is not None:
return fnmatch.fnmatchcase(disk.path, match['path'])
return fnmatch.fnmatchcase(disk.path, match['path'])
def match_ssd(disk):
is_ssd = disk.info_for_display()['rotational'] == 'false'
@ -1406,6 +1412,8 @@ class FilesystemModel(object):
matchers.append(match_serial)
if 'model' in match:
matchers.append(match_model)
if 'vendor' in match:
matchers.append(match_vendor)
if 'path' in match:
matchers.append(match_path)
if 'ssd' in match:

View File

@ -145,7 +145,7 @@ def make_disk(fs_model, **kw):
if 'serial' not in kw:
kw['serial'] = 'serial%s' % len(fs_model._actions)
if 'path' not in kw:
kw['path'] = '/dev/thing'
kw['path'] = '/dev/thing%s' % len(fs_model._actions)
if 'ptable' not in kw:
kw['ptable'] = 'gpt'
size = kw.pop('size', 100*(2**30))
@ -851,16 +851,25 @@ class TestFilesystemModel(unittest.TestCase):
self.assertTrue(dos_esp.is_esp)
def fake_up_blockdata_disk(disk, **kw):
model = disk._m
if model._probe_data is None:
model._probe_data = {}
blockdev = model._probe_data.setdefault('blockdev', {})
d = blockdev[disk.path] = {
'DEVTYPE': 'disk',
'ID_SERIAL': disk.serial,
'ID_MODEL': disk.model,
'attrs': {
'size': disk.size,
},
}
d.update(kw)
def fake_up_blockdata(model):
bd = {}
for disk in model.all_disks():
bd[disk.path] = {
'DEVTYPE': 'disk',
'attrs': {
'size': disk.size,
},
}
model._probe_data = {'blockdev': bd}
fake_up_blockdata_disk(disk)
class TestAutoInstallConfig(unittest.TestCase):
@ -939,9 +948,10 @@ class TestAutoInstallConfig(unittest.TestCase):
def test_path_glob(self):
model = make_model()
make_disk(model, serial='aaaa', path='/dev/aaa')
make_disk(model, serial='bbbb', path='/dev/bbb')
fake_up_blockdata(model)
d1 = make_disk(model, serial='aaaa', path='/dev/aaa')
d2 = make_disk(model, serial='bbbb', path='/dev/bbb')
fake_up_blockdata_disk(d1)
fake_up_blockdata_disk(d2)
model.apply_autoinstall_config([
{
'type': 'disk',
@ -952,13 +962,14 @@ class TestAutoInstallConfig(unittest.TestCase):
},
])
new_disk = model._one(type="disk", id="disk0")
self.assertEqual(new_disk.serial, "aaaa")
self.assertEqual(new_disk.serial, d1.serial)
def test_model_glob(self):
model = make_model()
make_disk(model, serial='aaaa', model='aaa')
make_disk(model, serial='bbbb', model='bbb')
fake_up_blockdata(model)
d1 = make_disk(model, serial='aaaa')
d2 = make_disk(model, serial='bbbb')
fake_up_blockdata_disk(d1, ID_MODEL='aaa')
fake_up_blockdata_disk(d2, ID_MODEL='bbb')
model.apply_autoinstall_config([
{
'type': 'disk',
@ -969,7 +980,25 @@ class TestAutoInstallConfig(unittest.TestCase):
},
])
new_disk = model._one(type="disk", id="disk0")
self.assertEqual(new_disk.serial, "aaaa")
self.assertEqual(new_disk.serial, d1.serial)
def test_vendor_glob(self):
model = make_model()
d1 = make_disk(model, serial='aaaa')
d2 = make_disk(model, serial='bbbb')
fake_up_blockdata_disk(d1, ID_VENDOR='aaa')
fake_up_blockdata_disk(d2, ID_VENDOR='bbb')
model.apply_autoinstall_config([
{
'type': 'disk',
'id': 'disk0',
'match': {
'vendor': 'a*',
},
},
])
new_disk = model._one(type="disk", id="disk0")
self.assertEqual(new_disk.serial, d1.serial)
def test_no_matching_disk(self):
model = make_model()