From 5244890c5ab43d147781104e54dad4ead56f792d Mon Sep 17 00:00:00 2001 From: Olivier Gayot Date: Wed, 2 Aug 2023 12:46:36 +0200 Subject: [PATCH] kvm-test: support emulating a TPM 2.0 When invoking kvm-test.py, one can pass the --with-tpm2 option so that we emulate a TPM and make it available in the guest. This requires the swtpm package which is available in jammy and more recent versions of Ubuntu. Signed-off-by: Olivier Gayot --- scripts/kvm-test.py | 147 ++++++++++++++++++++++++++++++-------------- 1 file changed, 102 insertions(+), 45 deletions(-) diff --git a/scripts/kvm-test.py b/scripts/kvm-test.py index 7baa7242..9098a4d3 100755 --- a/scripts/kvm-test.py +++ b/scripts/kvm-test.py @@ -13,13 +13,15 @@ import argparse import contextlib import copy import crypt +import dataclasses import os +import pathlib import shlex import socket import subprocess import sys import tempfile -from typing import List, Tuple +from typing import List, Optional, Tuple import yaml @@ -192,6 +194,9 @@ parser.add_argument('--force-autoinstall', default=None, parser.add_argument('--force-no-autoinstall', default=None, action='store_false', dest="autoinstall", help='do not pass autoinstall on the kernel command line') +parser.add_argument('--with-tpm2', action='store_true', + help='''emulate a TPM 2.0 interface (requires swtpm + package)''') cc_group = parser.add_mutually_exclusive_group() @@ -419,6 +424,22 @@ def nets(ctx) -> List[str]: return nics +@dataclasses.dataclass(frozen=True) +class TPMEmulator: + socket: pathlib.Path + logfile: pathlib.Path + tpmstate: pathlib.Path + + +def tpm(emulator: Optional[TPMEmulator]) -> List[str]: + if emulator is None: + return [] + + return ['-chardev', f'socket,id=chrtpm,path={emulator.socket}', + '-tpmdev', 'emulator,id=tpm0,chardev=chrtpm', + '-device', 'tpm-tis,tpmdev=tpm0'] + + def bios(ctx): ret = [] # https://help.ubuntu.com/community/UEFI @@ -431,7 +452,9 @@ def memory(ctx): return ['-m', ctx.args.memory or ctx.default_mem] -def kvm_common(ctx): +@contextlib.contextmanager +def kvm_prepare_common(ctx): + '''Spawn needed background processes and return the CLI options for QEMU''' ret = ['kvm', '-no-reboot'] ret.extend(('-vga', 'virtio')) ret.extend(memory(ctx)) @@ -439,7 +462,15 @@ def kvm_common(ctx): ret.extend(nets(ctx)) if ctx.args.sound: ret.extend(('-device', 'AC97', '-device', 'usb-ehci')) - return ret + + if ctx.args.with_tpm2: + tpm_emulator_context = tpm_emulator() + else: + tpm_emulator_context = contextlib.nullcontext() + + with tpm_emulator_context as tpm_emulator_cm: + ret.extend(tpm(tpm_emulator_cm)) + yield ret def get_initrd(mntdir): @@ -463,53 +494,79 @@ def install(ctx): os.mkdir(mntdir) appends = [] - kvm = kvm_common(ctx) + with kvm_prepare_common(ctx) as kvm: - if ctx.args.iso: - iso = ctx.args.iso - elif ctx.args.base: - iso = ctx.baseiso - else: - iso = ctx.iso - - kvm.extend(('-cdrom', iso)) - - if ctx.args.serial: - kvm.append('-nographic') - appends.append('console=ttyS0') - - if ctx.args.cloud_config is not None or ctx.args.cloud_config_default: - if ctx.args.cloud_config is not None: - ctx.cloudconfig = ctx.args.cloud_config.read() - kvm.extend(drive(create_seed(ctx.cloudconfig, tempdir), 'raw')) - if ctx.args.autoinstall is None: - # Let's inspect the yaml and check if there is an autoinstall - # section. - autoinstall = "autoinstall" in yaml.safe_load(ctx.cloudconfig) + if ctx.args.iso: + iso = ctx.args.iso + elif ctx.args.base: + iso = ctx.baseiso else: - autoinstall = ctx.args.autoinstall + iso = ctx.iso - if autoinstall: - appends.append('autoinstall') + kvm.extend(('-cdrom', iso)) + + if ctx.args.serial: + kvm.append('-nographic') + appends.append('console=ttyS0') + + if ctx.args.cloud_config is not None or ctx.args.cloud_config_default: + if ctx.args.cloud_config is not None: + ctx.cloudconfig = ctx.args.cloud_config.read() + kvm.extend(drive(create_seed(ctx.cloudconfig, tempdir), 'raw')) + if ctx.args.autoinstall is None: + # Let's inspect the yaml and check if there is an autoinstall + # section. + autoinstall = "autoinstall" in yaml.safe_load(ctx.cloudconfig) + else: + autoinstall = ctx.args.autoinstall + + if autoinstall: + appends.append('autoinstall') - if ctx.args.update: - appends.append('subiquity-channel=' + ctx.args.update) + if ctx.args.update: + appends.append('subiquity-channel=' + ctx.args.update) - kvm.extend(drive(ctx.target)) - if not os.path.exists(ctx.target) or ctx.args.overwrite: - run(f'qemu-img create -f qcow2 {ctx.target} {ctx.args.disksize}') + kvm.extend(drive(ctx.target)) + if not os.path.exists(ctx.target) or ctx.args.overwrite: + run(f'qemu-img create -f qcow2 {ctx.target} {ctx.args.disksize}') - if len(appends) > 0: - with mounter(iso, mntdir): - # if we're passing kernel args, we need to manually specify - # kernel / initrd - kvm.extend(('-kernel', f'{mntdir}/casper/vmlinuz')) - kvm.extend(('-initrd', get_initrd(mntdir))) - kvm.extend(('-append', ' '.join(appends))) + if len(appends) > 0: + with mounter(iso, mntdir): + # if we're passing kernel args, we need to manually specify + # kernel / initrd + kvm.extend(('-kernel', f'{mntdir}/casper/vmlinuz')) + kvm.extend(('-initrd', get_initrd(mntdir))) + kvm.extend(('-append', ' '.join(appends))) + run(kvm) + else: run(kvm) - else: - run(kvm) + + +@contextlib.contextmanager +def tpm_emulator(directory=None): + if directory is None: + directory_context = tempfile.TemporaryDirectory() + else: + directory_context = contextlib.nullcontext(enter_result=directory) + + with directory_context as tempdir: + socket = os.path.join(tempdir, 'swtpm-sock') + logfile = os.path.join(tempdir, 'log') + tpmstate = tempdir + + ps = subprocess.Popen(['swtpm', 'socket', + '--tpmstate', f'dir={tpmstate}', + '--ctrl', f'type=unixio,path={socket}', + '--tpm2', + '--log', f'file={logfile},level=20'], + ) + try: + yield TPMEmulator(socket=pathlib.Path(socket), + logfile=pathlib.Path(logfile), + tpmstate=pathlib.Path(tpmstate)) + finally: + ps.communicate() def boot(ctx): @@ -517,9 +574,9 @@ def boot(ctx): if ctx.args.img: target = ctx.args.img - kvm = kvm_common(ctx) - kvm.extend(drive(target)) - run(kvm) + with kvm_prepare_common(ctx) as kvm: + kvm.extend(drive(target)) + run(kvm) def help():