Merge pull request #1754 from ogayot/kvm-test-tpm

kvm-test: support emulating a TPM 2.0
This commit is contained in:
Olivier Gayot 2023-08-03 09:36:29 +02:00 committed by GitHub
commit 03e9a401b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 102 additions and 45 deletions

View File

@ -13,13 +13,15 @@ import argparse
import contextlib import contextlib
import copy import copy
import crypt import crypt
import dataclasses
import os import os
import pathlib
import shlex import shlex
import socket import socket
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
from typing import List, Tuple from typing import List, Optional, Tuple
import yaml import yaml
@ -192,6 +194,9 @@ parser.add_argument('--force-autoinstall', default=None,
parser.add_argument('--force-no-autoinstall', default=None, parser.add_argument('--force-no-autoinstall', default=None,
action='store_false', dest="autoinstall", action='store_false', dest="autoinstall",
help='do not pass autoinstall on the kernel command line') help='do not pass autoinstall on the kernel command line')
parser.add_argument('--with-tpm2', action='store_true',
help='''emulate a TPM 2.0 interface (requires swtpm
package)''')
cc_group = parser.add_mutually_exclusive_group() cc_group = parser.add_mutually_exclusive_group()
@ -419,6 +424,22 @@ def nets(ctx) -> List[str]:
return nics return nics
@dataclasses.dataclass(frozen=True)
class TPMEmulator:
socket: pathlib.Path
logfile: pathlib.Path
tpmstate: pathlib.Path
def tpm(emulator: Optional[TPMEmulator]) -> List[str]:
if emulator is None:
return []
return ['-chardev', f'socket,id=chrtpm,path={emulator.socket}',
'-tpmdev', 'emulator,id=tpm0,chardev=chrtpm',
'-device', 'tpm-tis,tpmdev=tpm0']
def bios(ctx): def bios(ctx):
ret = [] ret = []
# https://help.ubuntu.com/community/UEFI # https://help.ubuntu.com/community/UEFI
@ -431,7 +452,9 @@ def memory(ctx):
return ['-m', ctx.args.memory or ctx.default_mem] return ['-m', ctx.args.memory or ctx.default_mem]
def kvm_common(ctx): @contextlib.contextmanager
def kvm_prepare_common(ctx):
'''Spawn needed background processes and return the CLI options for QEMU'''
ret = ['kvm', '-no-reboot'] ret = ['kvm', '-no-reboot']
ret.extend(('-vga', 'virtio')) ret.extend(('-vga', 'virtio'))
ret.extend(memory(ctx)) ret.extend(memory(ctx))
@ -439,7 +462,15 @@ def kvm_common(ctx):
ret.extend(nets(ctx)) ret.extend(nets(ctx))
if ctx.args.sound: if ctx.args.sound:
ret.extend(('-device', 'AC97', '-device', 'usb-ehci')) ret.extend(('-device', 'AC97', '-device', 'usb-ehci'))
return ret
if ctx.args.with_tpm2:
tpm_emulator_context = tpm_emulator()
else:
tpm_emulator_context = contextlib.nullcontext()
with tpm_emulator_context as tpm_emulator_cm:
ret.extend(tpm(tpm_emulator_cm))
yield ret
def get_initrd(mntdir): def get_initrd(mntdir):
@ -463,53 +494,79 @@ def install(ctx):
os.mkdir(mntdir) os.mkdir(mntdir)
appends = [] appends = []
kvm = kvm_common(ctx) with kvm_prepare_common(ctx) as kvm:
if ctx.args.iso: if ctx.args.iso:
iso = ctx.args.iso iso = ctx.args.iso
elif ctx.args.base: elif ctx.args.base:
iso = ctx.baseiso iso = ctx.baseiso
else:
iso = ctx.iso
kvm.extend(('-cdrom', iso))
if ctx.args.serial:
kvm.append('-nographic')
appends.append('console=ttyS0')
if ctx.args.cloud_config is not None or ctx.args.cloud_config_default:
if ctx.args.cloud_config is not None:
ctx.cloudconfig = ctx.args.cloud_config.read()
kvm.extend(drive(create_seed(ctx.cloudconfig, tempdir), 'raw'))
if ctx.args.autoinstall is None:
# Let's inspect the yaml and check if there is an autoinstall
# section.
autoinstall = "autoinstall" in yaml.safe_load(ctx.cloudconfig)
else: else:
autoinstall = ctx.args.autoinstall iso = ctx.iso
if autoinstall: kvm.extend(('-cdrom', iso))
appends.append('autoinstall')
if ctx.args.serial:
kvm.append('-nographic')
appends.append('console=ttyS0')
if ctx.args.cloud_config is not None or ctx.args.cloud_config_default:
if ctx.args.cloud_config is not None:
ctx.cloudconfig = ctx.args.cloud_config.read()
kvm.extend(drive(create_seed(ctx.cloudconfig, tempdir), 'raw'))
if ctx.args.autoinstall is None:
# Let's inspect the yaml and check if there is an autoinstall
# section.
autoinstall = "autoinstall" in yaml.safe_load(ctx.cloudconfig)
else:
autoinstall = ctx.args.autoinstall
if autoinstall:
appends.append('autoinstall')
if ctx.args.update: if ctx.args.update:
appends.append('subiquity-channel=' + ctx.args.update) appends.append('subiquity-channel=' + ctx.args.update)
kvm.extend(drive(ctx.target)) kvm.extend(drive(ctx.target))
if not os.path.exists(ctx.target) or ctx.args.overwrite: if not os.path.exists(ctx.target) or ctx.args.overwrite:
run(f'qemu-img create -f qcow2 {ctx.target} {ctx.args.disksize}') run(f'qemu-img create -f qcow2 {ctx.target} {ctx.args.disksize}')
if len(appends) > 0: if len(appends) > 0:
with mounter(iso, mntdir): with mounter(iso, mntdir):
# if we're passing kernel args, we need to manually specify # if we're passing kernel args, we need to manually specify
# kernel / initrd # kernel / initrd
kvm.extend(('-kernel', f'{mntdir}/casper/vmlinuz')) kvm.extend(('-kernel', f'{mntdir}/casper/vmlinuz'))
kvm.extend(('-initrd', get_initrd(mntdir))) kvm.extend(('-initrd', get_initrd(mntdir)))
kvm.extend(('-append', ' '.join(appends))) kvm.extend(('-append', ' '.join(appends)))
run(kvm)
else:
run(kvm) run(kvm)
else:
run(kvm)
@contextlib.contextmanager
def tpm_emulator(directory=None):
if directory is None:
directory_context = tempfile.TemporaryDirectory()
else:
directory_context = contextlib.nullcontext(enter_result=directory)
with directory_context as tempdir:
socket = os.path.join(tempdir, 'swtpm-sock')
logfile = os.path.join(tempdir, 'log')
tpmstate = tempdir
ps = subprocess.Popen(['swtpm', 'socket',
'--tpmstate', f'dir={tpmstate}',
'--ctrl', f'type=unixio,path={socket}',
'--tpm2',
'--log', f'file={logfile},level=20'],
)
try:
yield TPMEmulator(socket=pathlib.Path(socket),
logfile=pathlib.Path(logfile),
tpmstate=pathlib.Path(tpmstate))
finally:
ps.communicate()
def boot(ctx): def boot(ctx):
@ -517,9 +574,9 @@ def boot(ctx):
if ctx.args.img: if ctx.args.img:
target = ctx.args.img target = ctx.args.img
kvm = kvm_common(ctx) with kvm_prepare_common(ctx) as kvm:
kvm.extend(drive(target)) kvm.extend(drive(target))
run(kvm) run(kvm)
def help(): def help():