504 lines
15 KiB
Python
Executable File
504 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
'''kvm-test - boot a kvm with a test iso, possibly building that test iso first
|
|
|
|
kvm-test --build -q --install -o --boot
|
|
slimy build, install, overwrite existing image if it exists,
|
|
and boot the result after install
|
|
|
|
See kvm-test -h for options and more examples.
|
|
'''
|
|
|
|
import argparse
|
|
import contextlib
|
|
import copy
|
|
import crypt
|
|
import os
|
|
import random
|
|
import socket
|
|
import sys
|
|
import tempfile
|
|
import yaml
|
|
|
|
|
|
cfg = '''
|
|
iso:
|
|
basedir: /srv/iso
|
|
release:
|
|
edge: jammy/subiquity-edge/jammy-live-server-subiquity-edge-amd64.iso
|
|
canary: jammy/jammy-desktop-canary-amd64.iso
|
|
jammy: jammy/jammy-live-server-amd64.iso
|
|
desktop: impish/jammy-desktop-amd64.iso
|
|
impish: impish/ubuntu-21.10-live-server-amd64.iso
|
|
hirsute: hirsute/ubuntu-21.04-live-server-amd64.iso
|
|
groovy: groovy/ubuntu-20.10-live-server-amd64.iso
|
|
focal: focal/ubuntu-20.04.3-live-server-amd64.iso
|
|
bionic: bionic/bionic-live-server-amd64.iso
|
|
default: edge
|
|
'''
|
|
|
|
sys_memory = '8G'
|
|
|
|
|
|
def salted_crypt(plaintext_password):
|
|
# match subiquity documentation
|
|
salt = '$6$exDY1mhS4KUYCE/2'
|
|
return crypt.crypt(plaintext_password, salt)
|
|
|
|
|
|
class Context:
|
|
def __init__(self, args):
|
|
self.config = self.load_config()
|
|
self.args = args
|
|
self.release = args.release
|
|
if not self.release:
|
|
self.release = self.config["iso"]["default"]
|
|
iso = self.config["iso"]
|
|
try:
|
|
self.baseiso = os.path.join(iso["basedir"],
|
|
iso["release"][self.release])
|
|
except KeyError:
|
|
pass
|
|
self.curdir = os.getcwd()
|
|
# self.iso = f'{self.curdir}/{self.release}-test.iso'
|
|
self.iso = f'/tmp/kvm-test/{self.release}-test.iso'
|
|
self.hostname = f'{self.release}-test'
|
|
# self.target = f'{self.curdir}/{self.hostname}.img'
|
|
self.target = f'/tmp/kvm-test/{self.hostname}.img'
|
|
self.password = salted_crypt('ubuntu')
|
|
self.cloudconfig = f'''\
|
|
#cloud-config
|
|
autoinstall:
|
|
version: 1
|
|
locale:
|
|
en_US.UTF-8
|
|
ssh:
|
|
install-server: true
|
|
allow-pw: true
|
|
identity:
|
|
hostname: {self.hostname}
|
|
password: "{self.password}"
|
|
username: ubuntu
|
|
'''
|
|
# refresh-installer:
|
|
# update: yes
|
|
# channel: candidate
|
|
|
|
def merge(self, a, b):
|
|
'''Take a pair of dictionaries, and provide the merged result.
|
|
Assumes that any key conflicts have values that are themselves
|
|
dictionaries and raises TypeError if found otherwise.'''
|
|
result = copy.deepcopy(a)
|
|
|
|
for key in b:
|
|
if key in result:
|
|
left = result[key]
|
|
right = b[key]
|
|
if type(left) is not dict or type(right) is not dict:
|
|
result[key] = right
|
|
else:
|
|
result[key] = self.merge(left, right)
|
|
else:
|
|
result[key] = b[key]
|
|
|
|
return result
|
|
|
|
def load_config(self):
|
|
result = yaml.safe_load(cfg)
|
|
homecfg = f'{os.environ["HOME"]}/.kvm-test.yaml'
|
|
if os.path.exists(homecfg):
|
|
with open(homecfg, 'r') as f:
|
|
result = self.merge(result, yaml.safe_load(f))
|
|
|
|
return result
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description='''\
|
|
Test isos and images written to /tmp/kvm-test
|
|
|
|
Sample usage:
|
|
kvm-test --build -q --install -o -a --boot
|
|
slimy build, run install, overwrite existing image, use autoinstall,
|
|
boot final resulting image
|
|
|
|
kvm-test --install -bo -rfocal
|
|
boot the focal base iso unmodified and run install manually
|
|
|
|
See 'cfg' in script for expected layout of iso files,
|
|
which can be managed with ~/.kvm-test.yaml''')
|
|
parser.add_argument('-a', '--autoinstall', default=False,
|
|
action='store_true', help='use autoinstall')
|
|
parser.add_argument('-b', '--base', default=False, action='store_true',
|
|
help='use base iso')
|
|
parser.add_argument('-B', '--bios', action='store_true', default=False,
|
|
help='boot in BIOS mode')
|
|
parser.add_argument('-c', '--channel', default=False, action='store',
|
|
help='build iso with snap from channel')
|
|
parser.add_argument('-d', '--disksize', default='12G', action='store',
|
|
help='size of disk to create (12G default)')
|
|
parser.add_argument('-f', '--autoinstall-file', action='store',
|
|
type=argparse.FileType(),
|
|
help='load autoinstall from file')
|
|
parser.add_argument('-i', '--img', action='store', help='use this img')
|
|
parser.add_argument('-n', '--nets', action='store', default=1, type=int,
|
|
help='number of network interfaces')
|
|
parser.add_argument('-o', '--overwrite', default=False, action='store_true',
|
|
help='allow overwrite of the target image')
|
|
parser.add_argument('-q', '--quick', default=False, action='store_true',
|
|
help='build iso with quick-test-this-branch')
|
|
parser.add_argument('-r', '--release', action='store', help='target release')
|
|
parser.add_argument('-s', '--serial', default=False, action='store_true',
|
|
help='attach to serial console')
|
|
parser.add_argument('-S', '--sound', default=False, action='store_true',
|
|
help='enable sound')
|
|
parser.add_argument('-t', '--this', action='store',
|
|
help='use this iso')
|
|
parser.add_argument('-u', '--update', action='store',
|
|
help='subiquity-channel argument')
|
|
parser.add_argument('--save', action='store_true',
|
|
help='preserve built snap')
|
|
parser.add_argument('--reuse', action='store_true',
|
|
help='reuse previously saved snap')
|
|
parser.add_argument('--build', default=False, action='store_true',
|
|
help='build test iso')
|
|
parser.add_argument('--install', default=False, action='store_true',
|
|
help='''install from iso - one must either build a test
|
|
iso, use a base iso, or reuse previous test iso''')
|
|
parser.add_argument('--boot', default=False, action='store_true',
|
|
help='boot test image')
|
|
|
|
|
|
def waitstatus_to_exitcode(waitstatus):
|
|
'''If the process exited normally (if WIFEXITED(status) is true), return
|
|
the process exit status (return WEXITSTATUS(status)): result greater
|
|
than or equal to 0.
|
|
|
|
If the process was terminated by a signal (if WIFSIGNALED(status) is
|
|
true), return -signum where signum is the number of the signal that
|
|
caused the process to terminate (return -WTERMSIG(status)): result less
|
|
than 0.
|
|
|
|
Otherwise, raise a ValueError.'''
|
|
|
|
# This function is for python 3.9 compat
|
|
|
|
if 'waitstatus_to_exitcode' in dir(os):
|
|
return os.waitstatus_to_exitcode(waitstatus)
|
|
if os.WIFEXITED(waitstatus):
|
|
return os.WEXITSTATUS(waitstatus)
|
|
if os.WIFSIGNALED(waitstatus):
|
|
return -os.WTERMSIG(waitstatus)
|
|
|
|
raise ValueError
|
|
|
|
|
|
class SubProcessFailure(Exception):
|
|
pass
|
|
|
|
|
|
def run(cmds):
|
|
for cmd in [line.strip() for line in cmds.splitlines()]:
|
|
if len(cmd) < 1:
|
|
continue
|
|
# semi-simulate "bash -x"
|
|
print(f'+ {cmd}')
|
|
# FIXME subprocess check
|
|
ec = waitstatus_to_exitcode(os.system(cmd))
|
|
if ec != 0:
|
|
raise SubProcessFailure(f'command [{cmd}] returned [{ec}]')
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def delete_later(path):
|
|
try:
|
|
yield path
|
|
finally:
|
|
os.remove(path)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def noop(path):
|
|
yield path
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mounter(src, dest):
|
|
run(f'sudo mount -r {src} {dest}')
|
|
try:
|
|
yield
|
|
finally:
|
|
run(f'sudo umount {dest}')
|
|
|
|
|
|
def build(ctx):
|
|
run('sudo -v')
|
|
run(f'rm -f {ctx.iso}')
|
|
project = os.path.basename(os.getcwd())
|
|
|
|
snap_manager = noop if ctx.args.save else delete_later
|
|
# FIXME generalize to
|
|
# PYTHONPATH=$LIVEFS_EDITOR python3 -m livefs_edit $OLD_ISO $NEW_ISO
|
|
# --inject-snap $SUBIQUITY_SNAP_PATH
|
|
if project.startswith('subiquity'):
|
|
if ctx.args.quick:
|
|
with snap_manager('subiquity_test.snap') as snap:
|
|
run(f'sudo ./scripts/quick-test-this-branch.sh {ctx.baseiso} \
|
|
{ctx.iso}')
|
|
elif ctx.args.channel:
|
|
run(f'sudo PYTHONPATH=$LIVEFS_EDITOR python3 -m livefs_edit \
|
|
{ctx.baseiso} {ctx.iso} \
|
|
--add-snap-from-store subiquity {ctx.args.channel}')
|
|
else:
|
|
with snap_manager('subiquity_test.snap') as snap:
|
|
if not ctx.args.reuse:
|
|
run(f'''
|
|
snapcraft clean --use-lxd
|
|
snapcraft snap --use-lxd --output {snap}
|
|
''')
|
|
run(f'''
|
|
test -f {snap}
|
|
sudo PYTHONPATH=$LIVEFS_EDITOR python3 -m livefs_edit \
|
|
{ctx.baseiso} {ctx.iso} \
|
|
--add-snap-from-store core20 stable \
|
|
--inject-snap {snap}
|
|
''')
|
|
# sudo ./scripts/inject-subiquity-snap.sh {ctx.baseiso} \
|
|
# {snap} {ctx.iso}
|
|
elif project == 'ubuntu-desktop-installer':
|
|
with snap_manager('udi_test.snap') as snap:
|
|
run(f'''
|
|
snapcraft clean --use-lxd
|
|
snapcraft snap --use-lxd --output {snap}
|
|
test -f {snap}
|
|
sudo ./scripts/inject-snap {ctx.baseiso} \
|
|
{ctx.iso} {snap}
|
|
''')
|
|
else:
|
|
raise Exception(f'do not know how to build {projct}')
|
|
|
|
run(f'test -f {ctx.iso}')
|
|
|
|
|
|
def write(dest, data):
|
|
with open(dest, 'w') as destfile:
|
|
destfile.write(data)
|
|
|
|
|
|
def touch(dest):
|
|
with open(dest, 'w'):
|
|
pass
|
|
|
|
|
|
def create_seed(cloudconfig, tempdir):
|
|
write(f'{tempdir}/user-data', cloudconfig)
|
|
touch(f'{tempdir}/meta-data')
|
|
seed = f'{tempdir}/seed.iso'
|
|
run(f'cloud-localds {seed} {tempdir}/user-data {tempdir}/meta-data')
|
|
return seed
|
|
|
|
|
|
def drive(path, format='qcow2'):
|
|
kwargs = []
|
|
serial = None
|
|
cparam = 'writethrough'
|
|
# if cache == False: cparam = 'none'
|
|
# serial doesn't work..
|
|
# serial = str(int(random.random() * 100000000)).zfill(8)
|
|
kwargs += [f'file={path}']
|
|
kwargs += [f'format={format}']
|
|
kwargs += [f'cache={cparam}']
|
|
kwargs += ['if=virtio']
|
|
if serial:
|
|
kwargs += [f'serial={serial}']
|
|
|
|
return '-drive ' + ','.join(kwargs)
|
|
|
|
|
|
class PortFinder:
|
|
def __init__(self):
|
|
self.finder = self.port_generator()
|
|
|
|
def port_generator(self):
|
|
for port in range(2222, 8000):
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
res = sock.connect_ex(('localhost', port))
|
|
if res != 0:
|
|
yield port
|
|
|
|
def get(self):
|
|
return next(self.finder)
|
|
|
|
|
|
def nets(ctx):
|
|
ports = PortFinder()
|
|
|
|
ret = []
|
|
if ctx.args.nets > 0:
|
|
for _ in range(ctx.args.nets):
|
|
port = ports.get()
|
|
ret += ['-nic',
|
|
'user,model=virtio-net-pci,' +
|
|
f'hostfwd=tcp::{port}-:22'] # ,restrict=on']
|
|
else:
|
|
ret += ['-nic', 'none']
|
|
return ret
|
|
|
|
|
|
def bios(ctx):
|
|
ret = []
|
|
# https://help.ubuntu.com/community/UEFI
|
|
if not ctx.args.bios:
|
|
ret += ['-bios', '/usr/share/qemu/OVMF.fd']
|
|
return ret
|
|
|
|
|
|
def memory(ctx):
|
|
return ['-m', str(sys_memory)]
|
|
|
|
|
|
def kvm_common(ctx):
|
|
ret = ['kvm', '-no-reboot']
|
|
ret.extend(('-vga', 'virtio'))
|
|
ret.extend(memory(ctx))
|
|
ret.extend(bios(ctx))
|
|
ret.extend(nets(ctx))
|
|
if ctx.args.sound:
|
|
ret.extend(('-device', 'AC97', '-device', 'usb-ehci'))
|
|
return ret
|
|
|
|
|
|
def grub_get_extra_args(mntdir):
|
|
# The inject-snap process of livefs-edit adds a new layer squash
|
|
# that must be in place, or the injected snap isn't there.
|
|
# We don't want to hardcode the current value, because that layer
|
|
# isn't there if a base iso is being used.
|
|
# Parse the args out of grub.cfg and include them with ours.
|
|
cfgpath = f'{mntdir}/boot/grub/grub.cfg'
|
|
ret = []
|
|
try:
|
|
with open(cfgpath, 'r') as fp:
|
|
for line in fp.readlines():
|
|
chunks = line.strip().split('\t')
|
|
# ['linux', '/casper/vmlinuz ---']
|
|
# layerfs-path=a.b.c.d.e.squashfs
|
|
if not chunks or not chunks[0] == 'linux':
|
|
continue
|
|
subchunks = chunks[1].split(' ')
|
|
if not subchunks or not subchunks[0] == '/casper/vmlinuz':
|
|
continue
|
|
for sc in subchunks[1:]:
|
|
if sc != '---':
|
|
ret.append(sc)
|
|
break
|
|
# breakpoint()
|
|
except FileNotFoundError:
|
|
pass
|
|
return ret
|
|
|
|
|
|
def get_initrd(mntdir):
|
|
for initrd in ('initrd', 'initrd.lz', 'initrd.lz4'):
|
|
path = f'{mntdir}/casper/{initrd}'
|
|
if os.path.exists(path):
|
|
return path
|
|
raise Exception('initrd not found')
|
|
|
|
|
|
def install(ctx):
|
|
if os.path.exists(ctx.target):
|
|
if ctx.args.overwrite:
|
|
os.remove(ctx.target)
|
|
|
|
run('sudo -v')
|
|
|
|
with tempfile.TemporaryDirectory() as tempdir:
|
|
mntdir = f'{tempdir}/mnt'
|
|
os.mkdir(mntdir)
|
|
appends = []
|
|
|
|
kvm = kvm_common(ctx)
|
|
|
|
if ctx.args.this:
|
|
iso = ctx.args.this
|
|
elif ctx.args.base:
|
|
iso = ctx.baseiso
|
|
else:
|
|
iso = ctx.iso
|
|
|
|
kvm += ['-cdrom', iso]
|
|
|
|
if ctx.args.serial:
|
|
kvm += ['-nographic']
|
|
appends += ['console=ttyS0']
|
|
|
|
if ctx.args.autoinstall or ctx.args.autoinstall_file:
|
|
if ctx.args.autoinstall_file:
|
|
ctx.cloudconfig = ctx.args.autoinstall_file.read()
|
|
kvm += [drive(create_seed(ctx.cloudconfig, tempdir), 'raw')]
|
|
appends += ['autoinstall']
|
|
|
|
if ctx.args.update:
|
|
appends += ['subiquity-channel=candidate']
|
|
|
|
kvm += [drive(ctx.target)]
|
|
if not os.path.exists(ctx.target) or ctx.args.overwrite:
|
|
run(f'qemu-img create -f qcow2 {ctx.target} {ctx.args.disksize}')
|
|
|
|
# drive2 = f'{ctx.curdir}/drive2.img'
|
|
|
|
# appends += ['subiquity-channel=edge']
|
|
|
|
with mounter(iso, mntdir):
|
|
appends += grub_get_extra_args(mntdir)
|
|
|
|
if len(appends) > 0:
|
|
# if we're passing kernel args, we need to manually specify
|
|
# kernel / initrd
|
|
kvm += ['-kernel', f'{mntdir}/casper/vmlinuz']
|
|
kvm += ['-initrd', get_initrd(mntdir)]
|
|
toappend = ' '.join(appends)
|
|
kvm += ['-append', f'"{toappend}"']
|
|
|
|
run(' '.join(kvm))
|
|
|
|
|
|
def boot(ctx):
|
|
target = ctx.target
|
|
if ctx.args.img:
|
|
target = ctx.args.img
|
|
|
|
kvm = kvm_common(ctx)
|
|
kvm += [drive(target)]
|
|
run(' '.join(kvm))
|
|
|
|
|
|
def help(ctx):
|
|
parser.print_usage()
|
|
sys.exit(1)
|
|
|
|
|
|
def cloud(ctx):
|
|
print(ctx.cloudconfig)
|
|
|
|
|
|
try:
|
|
ctx = Context(parser.parse_args())
|
|
except TypeError:
|
|
help()
|
|
|
|
if ctx.args.base and ctx.args.build:
|
|
raise Exception('cannot use base iso and build')
|
|
|
|
os.makedirs('/tmp/kvm-test', exist_ok=True)
|
|
|
|
if ctx.args.build:
|
|
build(ctx)
|
|
if ctx.args.install:
|
|
install(ctx)
|
|
if ctx.args.boot:
|
|
boot(ctx)
|
|
if True not in (ctx.args.build, ctx.args.install, ctx.args.boot):
|
|
parser.print_help()
|