kvm-test: support emulating a TPM 2.0
When invoking kvm-test.py, one can pass the --with-tpm2 option so that we emulate a TPM and make it available in the guest. This requires the swtpm package which is available in jammy and more recent versions of Ubuntu. Signed-off-by: Olivier Gayot <olivier.gayot@canonical.com>
This commit is contained in:
parent
f088fa26f1
commit
5244890c5a
|
@ -13,13 +13,15 @@ import argparse
|
|||
import contextlib
|
||||
import copy
|
||||
import crypt
|
||||
import dataclasses
|
||||
import os
|
||||
import pathlib
|
||||
import shlex
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import List, Tuple
|
||||
from typing import List, Optional, Tuple
|
||||
import yaml
|
||||
|
||||
|
||||
|
@ -192,6 +194,9 @@ parser.add_argument('--force-autoinstall', default=None,
|
|||
parser.add_argument('--force-no-autoinstall', default=None,
|
||||
action='store_false', dest="autoinstall",
|
||||
help='do not pass autoinstall on the kernel command line')
|
||||
parser.add_argument('--with-tpm2', action='store_true',
|
||||
help='''emulate a TPM 2.0 interface (requires swtpm
|
||||
package)''')
|
||||
|
||||
|
||||
cc_group = parser.add_mutually_exclusive_group()
|
||||
|
@ -419,6 +424,22 @@ def nets(ctx) -> List[str]:
|
|||
return nics
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class TPMEmulator:
|
||||
socket: pathlib.Path
|
||||
logfile: pathlib.Path
|
||||
tpmstate: pathlib.Path
|
||||
|
||||
|
||||
def tpm(emulator: Optional[TPMEmulator]) -> List[str]:
|
||||
if emulator is None:
|
||||
return []
|
||||
|
||||
return ['-chardev', f'socket,id=chrtpm,path={emulator.socket}',
|
||||
'-tpmdev', 'emulator,id=tpm0,chardev=chrtpm',
|
||||
'-device', 'tpm-tis,tpmdev=tpm0']
|
||||
|
||||
|
||||
def bios(ctx):
|
||||
ret = []
|
||||
# https://help.ubuntu.com/community/UEFI
|
||||
|
@ -431,7 +452,9 @@ def memory(ctx):
|
|||
return ['-m', ctx.args.memory or ctx.default_mem]
|
||||
|
||||
|
||||
def kvm_common(ctx):
|
||||
@contextlib.contextmanager
|
||||
def kvm_prepare_common(ctx):
|
||||
'''Spawn needed background processes and return the CLI options for QEMU'''
|
||||
ret = ['kvm', '-no-reboot']
|
||||
ret.extend(('-vga', 'virtio'))
|
||||
ret.extend(memory(ctx))
|
||||
|
@ -439,7 +462,15 @@ def kvm_common(ctx):
|
|||
ret.extend(nets(ctx))
|
||||
if ctx.args.sound:
|
||||
ret.extend(('-device', 'AC97', '-device', 'usb-ehci'))
|
||||
return ret
|
||||
|
||||
if ctx.args.with_tpm2:
|
||||
tpm_emulator_context = tpm_emulator()
|
||||
else:
|
||||
tpm_emulator_context = contextlib.nullcontext()
|
||||
|
||||
with tpm_emulator_context as tpm_emulator_cm:
|
||||
ret.extend(tpm(tpm_emulator_cm))
|
||||
yield ret
|
||||
|
||||
|
||||
def get_initrd(mntdir):
|
||||
|
@ -463,53 +494,79 @@ def install(ctx):
|
|||
os.mkdir(mntdir)
|
||||
appends = []
|
||||
|
||||
kvm = kvm_common(ctx)
|
||||
with kvm_prepare_common(ctx) as kvm:
|
||||
|
||||
if ctx.args.iso:
|
||||
iso = ctx.args.iso
|
||||
elif ctx.args.base:
|
||||
iso = ctx.baseiso
|
||||
else:
|
||||
iso = ctx.iso
|
||||
|
||||
kvm.extend(('-cdrom', iso))
|
||||
|
||||
if ctx.args.serial:
|
||||
kvm.append('-nographic')
|
||||
appends.append('console=ttyS0')
|
||||
|
||||
if ctx.args.cloud_config is not None or ctx.args.cloud_config_default:
|
||||
if ctx.args.cloud_config is not None:
|
||||
ctx.cloudconfig = ctx.args.cloud_config.read()
|
||||
kvm.extend(drive(create_seed(ctx.cloudconfig, tempdir), 'raw'))
|
||||
if ctx.args.autoinstall is None:
|
||||
# Let's inspect the yaml and check if there is an autoinstall
|
||||
# section.
|
||||
autoinstall = "autoinstall" in yaml.safe_load(ctx.cloudconfig)
|
||||
if ctx.args.iso:
|
||||
iso = ctx.args.iso
|
||||
elif ctx.args.base:
|
||||
iso = ctx.baseiso
|
||||
else:
|
||||
autoinstall = ctx.args.autoinstall
|
||||
iso = ctx.iso
|
||||
|
||||
if autoinstall:
|
||||
appends.append('autoinstall')
|
||||
kvm.extend(('-cdrom', iso))
|
||||
|
||||
if ctx.args.serial:
|
||||
kvm.append('-nographic')
|
||||
appends.append('console=ttyS0')
|
||||
|
||||
if ctx.args.cloud_config is not None or ctx.args.cloud_config_default:
|
||||
if ctx.args.cloud_config is not None:
|
||||
ctx.cloudconfig = ctx.args.cloud_config.read()
|
||||
kvm.extend(drive(create_seed(ctx.cloudconfig, tempdir), 'raw'))
|
||||
if ctx.args.autoinstall is None:
|
||||
# Let's inspect the yaml and check if there is an autoinstall
|
||||
# section.
|
||||
autoinstall = "autoinstall" in yaml.safe_load(ctx.cloudconfig)
|
||||
else:
|
||||
autoinstall = ctx.args.autoinstall
|
||||
|
||||
if autoinstall:
|
||||
appends.append('autoinstall')
|
||||
|
||||
|
||||
if ctx.args.update:
|
||||
appends.append('subiquity-channel=' + ctx.args.update)
|
||||
if ctx.args.update:
|
||||
appends.append('subiquity-channel=' + ctx.args.update)
|
||||
|
||||
kvm.extend(drive(ctx.target))
|
||||
if not os.path.exists(ctx.target) or ctx.args.overwrite:
|
||||
run(f'qemu-img create -f qcow2 {ctx.target} {ctx.args.disksize}')
|
||||
kvm.extend(drive(ctx.target))
|
||||
if not os.path.exists(ctx.target) or ctx.args.overwrite:
|
||||
run(f'qemu-img create -f qcow2 {ctx.target} {ctx.args.disksize}')
|
||||
|
||||
if len(appends) > 0:
|
||||
with mounter(iso, mntdir):
|
||||
# if we're passing kernel args, we need to manually specify
|
||||
# kernel / initrd
|
||||
kvm.extend(('-kernel', f'{mntdir}/casper/vmlinuz'))
|
||||
kvm.extend(('-initrd', get_initrd(mntdir)))
|
||||
kvm.extend(('-append', ' '.join(appends)))
|
||||
if len(appends) > 0:
|
||||
with mounter(iso, mntdir):
|
||||
# if we're passing kernel args, we need to manually specify
|
||||
# kernel / initrd
|
||||
kvm.extend(('-kernel', f'{mntdir}/casper/vmlinuz'))
|
||||
kvm.extend(('-initrd', get_initrd(mntdir)))
|
||||
kvm.extend(('-append', ' '.join(appends)))
|
||||
run(kvm)
|
||||
else:
|
||||
run(kvm)
|
||||
else:
|
||||
run(kvm)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def tpm_emulator(directory=None):
|
||||
if directory is None:
|
||||
directory_context = tempfile.TemporaryDirectory()
|
||||
else:
|
||||
directory_context = contextlib.nullcontext(enter_result=directory)
|
||||
|
||||
with directory_context as tempdir:
|
||||
socket = os.path.join(tempdir, 'swtpm-sock')
|
||||
logfile = os.path.join(tempdir, 'log')
|
||||
tpmstate = tempdir
|
||||
|
||||
ps = subprocess.Popen(['swtpm', 'socket',
|
||||
'--tpmstate', f'dir={tpmstate}',
|
||||
'--ctrl', f'type=unixio,path={socket}',
|
||||
'--tpm2',
|
||||
'--log', f'file={logfile},level=20'],
|
||||
)
|
||||
try:
|
||||
yield TPMEmulator(socket=pathlib.Path(socket),
|
||||
logfile=pathlib.Path(logfile),
|
||||
tpmstate=pathlib.Path(tpmstate))
|
||||
finally:
|
||||
ps.communicate()
|
||||
|
||||
|
||||
def boot(ctx):
|
||||
|
@ -517,9 +574,9 @@ def boot(ctx):
|
|||
if ctx.args.img:
|
||||
target = ctx.args.img
|
||||
|
||||
kvm = kvm_common(ctx)
|
||||
kvm.extend(drive(target))
|
||||
run(kvm)
|
||||
with kvm_prepare_common(ctx) as kvm:
|
||||
kvm.extend(drive(target))
|
||||
run(kvm)
|
||||
|
||||
|
||||
def help():
|
||||
|
|
Loading…
Reference in New Issue