asyncioify the installation

This commit is contained in:
Michael Hudson-Doyle 2019-12-12 15:19:22 +13:00
parent 9e0c0bf106
commit 3d80f46225
2 changed files with 126 additions and 292 deletions

View File

@ -13,18 +13,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from concurrent.futures import Future
import asyncio
import contextlib
import datetime
import logging
import os
import re
import signal
import subprocess
import sys
import platform
import tempfile
import time
import traceback
from curtin.commands.install import (
@ -32,15 +30,21 @@ from curtin.commands.install import (
INSTALL_LOG,
)
import urwid
from systemd import journal
import yaml
from subiquitycore import utils
from subiquitycore.utils import (
arun_command,
astart_command,
run_command,
)
from subiquitycore.controller import BaseController
from subiquity.async_helpers import (
run_in_thread,
schedule_task,
)
from subiquity.controllers.error import ErrorReportKind
from subiquity.ui.views.installprogress import ProgressView
@ -55,157 +59,6 @@ class InstallState:
ERROR = -1
task_counter = 0
def task(f=None, transitions=None, **kw):
"""Annotate a method as a task to be used with StateMachine.
If the method's name starts with _bg_ it is run in a background thread.
(This ability to have tasks flip between running in the foreground and
background is what makes all this interesting).
Annotated methods have various attributes:
._name -- the name of the state, which is the name of the method with
_bg_ stripped off if it was there.
._is_bg -- indicates if this method should run in a background thread.
._transitions -- transitions from this state to another, mapping
transition name to the following state. The transition
named 'success' is special -- it is what is followed
when the function returns, unless some other transition
has been followed beforehand.
._extra -- any extra keyword arguments passed to @task()
"""
if transitions is None:
transitions = {}
def annotate(f):
global task_counter
f._is_task = True
f._order = task_counter
task_counter += 1
if f.__name__.startswith("_bg_"):
f._name = f.__name__[4:]
f._is_bg = True
else:
f._name = f.__name__
f._is_bg = False
f._transitions = transitions
f._extra = kw
return f
if f is not None:
return annotate(f)
else:
return annotate
def collect_tasks(inst, filter_task=lambda f: True):
"""Collect the methods on inst annotated with @task.
Returns a list of tuples (method, transitions) where method is the
annotated method and transitions are the transitions defined while
method is running, with 'success' automatically filled in as a
transition to the next state if not otherwise defined.
"""
task_funcs = []
attrs = inst.__class__.__dict__.values()
for a in attrs:
if not hasattr(a, "_is_task"):
continue
if filter_task(a):
task_funcs.append(getattr(inst, a.__name__))
task_funcs.sort(key=lambda f: f._order)
r = []
for i, func in enumerate(task_funcs[:-1]):
transitions = func._transitions.copy()
if 'success' not in transitions:
transitions['success'] = task_funcs[i+1]._name
r.append((func, transitions))
r.append((task_funcs[-1], task_funcs[-1]._transitions.copy()))
return r
class StateMachine:
"""Run tasks as returned by collect_tasks."""
def __init__(self, controller, task_funcs):
self.controller = controller
self._tasks = {}
self._results = {}
self._transitions = {}
self._subscribers = {}
for func, transitions in task_funcs:
self._tasks[func._name] = func
self._transitions[func._name] = transitions
self.subscribe(
func._name,
lambda fut, name=func._name: self._task_complete(name, fut))
self.cur = task_funcs[0][0]._name
def subscribe(self, name, subscriber):
if name in self._results:
subscriber(self._results[name])
else:
self._subscribers.setdefault(name, set()).add(subscriber)
def _task_complete(self, name, fut):
if name != self.cur:
log.debug(
"_task_complete ignoring %s as %s != %s", fut, name, self.cur)
return
try:
fut.result()
except urwid.ExitMainLoop:
raise
except Exception:
log.debug("%s failed", name)
self.controller.curtin_error()
else:
log.debug("%s completed", name)
if 'success' in self._transitions[name]:
self.transition('success')
else:
log.debug("all tasks completed")
def run(self):
log.debug("running task %s", self.cur)
func = self._tasks[self.cur]
def end(fut):
log.debug('_end %s %s', func._name, fut)
self._results[func._name] = fut
if 'label' in func._extra:
self.controller._install_event_finish()
for subscriber in self._subscribers.get(func._name, ()):
subscriber(fut)
if 'label' in func._extra:
self.controller._install_event_start(func._extra['label'])
if func._is_bg:
self.controller.run_in_bg(func, end)
else:
fut = Future()
try:
fut.set_result(func())
except urwid.ExitMainLoop:
raise
except Exception as e:
fut.set_exception(e)
end(fut)
def transition(self, name):
"""Follow the named transition for the current state."""
new = self._transitions[self.cur][name]
log.debug("transition %s: %s -> %s", name, self.cur, new)
self.cur = new
self.run()
class TracebackExtractor:
start_marker = re.compile(r"^Traceback \(most recent call last\):")
@ -236,33 +89,37 @@ class InstallProgressController(BaseController):
def __init__(self, app):
super().__init__(app)
self.model = app.base_model
self.answers.setdefault('reboot', False)
self.progress_view = None
self.auto_reboot = False
if self.answers.get('reboot', False):
self.auto_reboot = True
self.progress_view = ProgressView(self)
self.install_state = InstallState.NOT_STARTED
self.journal_listener_handle = None
self.filesystem_event = asyncio.Event()
self.reboot_clicked = asyncio.Event()
self._postinstall_prerequisites = {
'install': False,
'ssh': False,
'identity': False,
'snap': False,
'ssh': asyncio.Event(),
'identity': asyncio.Event(),
'snap': asyncio.Event(),
}
self.uu_running = False
self.uu = None
self._event_indent = ""
self._event_syslog_identifier = 'curtin_event.%s' % (os.getpid(),)
self._log_syslog_identifier = 'curtin_log.%s' % (os.getpid(),)
self.sm = None
self.tb_extractor = TracebackExtractor()
def start(self):
self.install_task = schedule_task(self.install())
def tpath(self, *path):
return os.path.join(self.model.target, *path)
def filesystem_config_done(self):
self.curtin_start_install()
self.filesystem_event.set()
def _step_done(self, step):
self._postinstall_prerequisites[step] = True
log.debug("_step_done %s %s", step, self._postinstall_prerequisites)
if all(self._postinstall_prerequisites.values()):
self.start_postinstall_configuration()
self._postinstall_prerequisites[step].set()
def identity_config_done(self):
self._step_done('identity')
@ -289,10 +146,9 @@ class InstallProgressController(BaseController):
self.start_ui()
self.progress_view.show_error(crash_report)
def _bg_run_command_logged(self, cmd, **kwargs):
cmd = ['systemd-cat', '--level-prefix=false',
def logged_command(self, cmd):
return ['systemd-cat', '--level-prefix=false',
'--identifier=' + self._log_syslog_identifier] + cmd
return utils.run_command(cmd, **kwargs)
def _journal_event(self, event):
if event['SYSLOG_IDENTIFIER'] == self._event_syslog_identifier:
@ -342,7 +198,8 @@ class InstallProgressController(BaseController):
return
for event in reader:
callback(event)
return self.loop.watch_file(reader.fileno(), watch)
loop = asyncio.get_event_loop()
return loop.add_reader(reader.fileno(), watch)
def _write_config(self, path, config):
with open(path, 'w') as conf:
@ -381,10 +238,9 @@ class InstallProgressController(BaseController):
return curtin_cmd
def curtin_start_install(self):
log.debug('curtin_start_install')
async def curtin_install(self):
log.debug('curtin_install')
self.install_state = InstallState.RUNNING
self.progress_view = ProgressView(self)
self.journal_listener_handle = self.start_journald_listener(
[self._event_syslog_identifier, self._log_syslog_identifier],
@ -393,56 +249,73 @@ class InstallProgressController(BaseController):
curtin_cmd = self._get_curtin_command()
log.debug('curtin install cmd: {}'.format(curtin_cmd))
self.run_in_bg(
lambda: self._bg_run_command_logged(curtin_cmd),
self.curtin_install_completed)
def curtin_install_completed(self, fut):
cp = fut.result()
cp = await arun_command(
self.logged_command(curtin_cmd), check=True)
log.debug('curtin_install completed: %s', cp.returncode)
if cp.returncode != 0:
self.curtin_error()
return
self.install_state = InstallState.DONE
log.debug('After curtin install OK')
self._step_done('install')
def cancel(self):
pass
def start_postinstall_configuration(self):
has_network = self.model.network.has_network
async def install(self):
def filter_task(func):
if func._extra.get('net_only') and not has_network:
return False
if func._name == 'install_openssh' \
and not self.model.ssh.install_server:
return False
return True
@contextlib.contextmanager
def install_event(label):
self._install_event_start(label)
try:
yield
finally:
self._install_event_finish()
log.debug("starting state machine")
self.sm = StateMachine(self, collect_tasks(self, filter_task))
self.sm.run()
try:
await self.filesystem_event.wait()
@task
def _bg_drain_curtin_events(self):
await self.curtin_install()
await asyncio.wait(
{e.wait() for e in self._postinstall_prerequisites.values()})
await self.drain_curtin_events()
with install_event("final system configuration"):
with install_event("configuring cloud-init"):
await run_in_thread(self.model.configure_cloud_init)
if self.model.ssh.install_server:
with install_event("installing openssh"):
await self.install_openssh()
with install_event("restoring apt configuration"):
await self.restore_apt_config()
self.ui.set_header(_("Installation complete!"))
self.progress_view.set_status(_("Finished install!"))
self.progress_view.show_complete()
if self.model.network.has_network:
self.progress_view.update_running()
with install_event(
"downloading and installing security updates"):
await self.run_uu()
self.progress_view.update_done()
with install_event("copying logs to installed system"):
await self.copy_logs_to_target()
if not self.auto_reboot:
await self.reboot_clicked.wait()
self.reboot()
except Exception:
self.curtin_error()
raise
async def drain_curtin_events(self):
waited = 0.0
while self._event_indent and waited < 5.0:
time.sleep(0.1)
await asyncio.sleep(0.1)
waited += 0.1
log.debug("waited %s seconds for events to drain", waited)
@task
def start_final_configuration(self):
self._install_event_start("final system configuration")
@task(label="configuring cloud-init")
def _bg_configure_cloud_init(self):
self.model.configure_cloud_init()
@task(label="installing openssh")
def _bg_install_openssh(self):
async def install_openssh(self):
if self.opts.dry_run:
cmd = ["sleep", str(2/self.app.scale_factor)]
else:
@ -451,10 +324,9 @@ class InstallProgressController(BaseController):
"/target",
"--", "openssh-server",
]
self._bg_run_command_logged(cmd, check=True)
await arun_command(self.logged_command(cmd), check=True)
@task(label="restoring apt configuration")
def _bg_restore_apt_config(self):
async def restore_apt_config(self):
if self.opts.dry_run:
cmds = [["sleep", str(1/self.app.scale_factor)]]
else:
@ -469,24 +341,9 @@ class InstallProgressController(BaseController):
else:
cmds.append(["umount", self.tpath('var/lib/apt/lists')])
for cmd in cmds:
self._bg_run_command_logged(cmd, check=True)
await arun_command(self.logged_command(cmd), check=True)
@task
def postinstall_complete(self):
self._install_event_finish()
self.ui.set_header(_("Installation complete!"))
self.progress_view.set_status(_("Finished install!"))
self.progress_view.show_complete()
self.copy_logs_transition = 'wait'
@task(net_only=True)
def uu_start(self):
self.progress_view.update_running()
@task(label="downloading and installing security updates",
transitions={'reboot': 'abort_uu'},
net_only=True)
def _bg_run_uu(self):
async def run_uu(self):
target_tmp = os.path.join(self.model.target, "tmp")
os.makedirs(target_tmp, exist_ok=True)
apt_conf = tempfile.NamedTemporaryFile(
@ -495,78 +352,49 @@ class InstallProgressController(BaseController):
apt_conf.close()
env = os.environ.copy()
env["APT_CONFIG"] = apt_conf.name[len(self.model.target):]
self.uu_running = True
if self.opts.dry_run:
self.uu = utils.start_command([
"sleep", str(10/self.app.scale_factor)])
self.uu.wait()
self.uu = await astart_command(self.logged_command([
"sleep", str(5/self.app.scale_factor)]), env=env)
else:
self._bg_run_command_logged([
self.uu = await astart_command(self.logged_command([
sys.executable, "-m", "curtin", "in-target", "-t", "/target",
"--", "unattended-upgrades", "-v",
], env=env, check=True)
]), env=env)
await self.uu.communicate()
self.uu_running = False
self.uu = None
os.remove(apt_conf.name)
@task(transitions={'success': 'copy_logs_to_target'}, net_only=True)
def uu_done(self):
self.progress_view.update_done()
@task(net_only=True)
def abort_uu(self):
async def stop_uu(self):
self._install_event_finish()
@task(label="cancelling update", net_only=True)
def _bg_stop_uu(self):
self._install_event_start("cancelling update")
if self.opts.dry_run:
time.sleep(1)
await asyncio.sleep(1)
self.uu.terminate()
else:
self._bg_run_command_logged([
await arun_command(self.logged_command([
'chroot', '/target',
'/usr/share/unattended-upgrades/unattended-upgrade-shutdown',
'--stop-only',
], check=True)
], check=True))
@task(net_only=True, transitions={'success': 'copy_logs_to_target'})
def _bg_wait_for_uu(self):
r, w = os.pipe()
def callback(fut):
os.write(w, b'x')
self.sm.subscribe('run_uu', callback)
os.read(r, 1)
os.close(w)
os.close(r)
self.copy_logs_transition = 'reboot'
@task(label="copying logs to installed system",
transitions={'reboot': 'reboot'})
def _bg_copy_logs_to_target(self):
async def copy_logs_to_target(self):
if self.opts.dry_run:
if 'copy-logs-fail' in self.debug_flags:
raise PermissionError()
return
target_logs = self.tpath('var/log/installer')
utils.run_command(['cp', '-aT', '/var/log/installer', target_logs])
await arun_command(['cp', '-aT', '/var/log/installer', target_logs])
try:
with open(os.path.join(target_logs,
'installer-journal.txt'), 'w') as output:
utils.run_command(
await arun_command(
['journalctl'],
stdout=output, stderr=subprocess.STDOUT)
except Exception:
log.exception("saving journal failed")
@task(transitions={'wait': 'wait_for_click', 'reboot': 'reboot'})
def copy_logs_done(self):
self.sm.transition(self.copy_logs_transition)
@task(transitions={'reboot': 'reboot'})
def _bg_wait_for_click(self):
if not self.answers['reboot']:
signal.pause()
@task
def reboot(self):
if self.opts.dry_run:
log.debug('dry-run enabled, skipping reboot, quitting instead')
@ -575,26 +403,23 @@ class InstallProgressController(BaseController):
# TODO Possibly run this earlier, to show a warning; or
# switch to shutdown if chreipl fails
if platform.machine() == 's390x':
utils.run_command(["chreipl", "/target/boot"])
run_command(["chreipl", "/target/boot"])
# Should probably run curtin -c $CONFIG unmount -t TARGET first.
utils.run_command(["/sbin/reboot"])
run_command(["/sbin/reboot"])
async def _click_reboot(self):
if self.uu_running:
await self.stop_uu()
self.reboot_clicked.set()
def click_reboot(self):
if self.sm is None:
# If the curtin install itself crashes, the state machine
# that manages post install steps won't be running. Just
# reboot anyway.
self.reboot()
else:
self.sm.transition('reboot')
def quit(self):
if not self.opts.dry_run:
utils.disable_subiquity()
self.signal.emit_signal('quit')
schedule_task(self._click_reboot())
def start_ui(self):
if self.install_state == InstallState.RUNNING:
if self.install_state in [
InstallState.NOT_STARTED,
InstallState.RUNNING,
]:
self.progress_view.title = _("Installing system")
elif self.install_state == InstallState.DONE:
self.progress_view.title = _("Install complete!")

View File

@ -84,6 +84,15 @@ async def arun_command(cmd, *, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cmd, proc.returncode, stdout, stderr)
async def astart_command(cmd, *, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stdin=subprocess.DEVNULL,
env=None, **kw):
log.debug("astart_command called: %s", cmd)
return await asyncio.create_subprocess_exec(
*cmd, stdout=stdout, stderr=stderr,
env=_clean_env(env), **kw)
def start_command(cmd, *, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, encoding='utf-8', errors='replace',
env=None, **kw):