diff --git a/subiquity/controllers/filesystem.py b/subiquity/controllers/filesystem.py index c0ada6f7..81702863 100644 --- a/subiquity/controllers/filesystem.py +++ b/subiquity/controllers/filesystem.py @@ -13,18 +13,25 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import enum +import asyncio import json import logging import os import select import sys -import time import pyudev +from subiquitycore.async_helpers import ( + run_in_thread, + schedule_task, + SingleInstanceTask, + ) from subiquitycore.controller import BaseController -from subiquitycore.utils import run_command +from subiquitycore.utils import ( + arun_command, + run_command, + ) from subiquity.controllers.error import ErrorReportKind from subiquity.models.filesystem import ( @@ -53,81 +60,6 @@ PREP_GRUB_SIZE_BYTES = 8 * 1024 * 1024 # 8MiB UEFI_GRUB_SIZE_BYTES = 512 * 1024 * 1024 # 512MiB EFI partition -class ProbeState(enum.IntEnum): - NOT_STARTED = enum.auto() - PROBING = enum.auto() - FAILED = enum.auto() - DONE = enum.auto() - - -class Probe: - - def __init__(self, controller, restricted, timeout, cb): - self.controller = controller - self.restricted = restricted - self.timeout = timeout - self.cb = cb - self.state = ProbeState.NOT_STARTED - self.result = None - if restricted: - self.kind = ErrorReportKind.DISK_PROBE_FAIL - else: - self.kind = ErrorReportKind.BLOCK_PROBE_FAIL - self.crash_report = None - - def start(self): - block_discover_log.debug( - "starting probe restricted=%s", self.restricted) - self.state = ProbeState.PROBING - self.controller.run_in_bg(self._bg_probe, self._probed) - self.controller.loop.set_alarm_in(self.timeout, self._check_timeout) - - def _bg_probe(self): - if self.restricted: - probe_types = {'blockdev'} - else: - probe_types = None - debug_flags = self.controller.debug_flags - if 'bpfail-full' in debug_flags and not self.restricted: - time.sleep(2) - 1/0 - if 'bpfail-restricted' in debug_flags and self.restricted: - time.sleep(2) - 1/0 - # Should consider invoking probert in a subprocess here (so we - # can kill it if it gets stuck). - return self.controller.app.prober.get_storage(probe_types=probe_types) - - def _probed(self, fut): - if self.state == ProbeState.FAILED: - block_discover_log.debug( - "ignoring result %s for timed out probe", fut) - return - try: - self.result = fut.result() - except Exception: - block_discover_log.exception( - "probing failed restricted=%s", self.restricted) - self.crash_report = self.controller.app.make_apport_report( - self.kind, "block probing", interrupt=False) - self.state = ProbeState.FAILED - else: - block_discover_log.info( - "probing successful restricted=%s", self.restricted) - self.state = ProbeState.DONE - self.cb(self) - - def _check_timeout(self, loop, ud): - if self.state != ProbeState.PROBING: - return - self.crash_report = self.controller.app.make_apport_report( - self.kind, "block probing timed out", interrupt=False) - block_discover_log.exception( - "probing timed out restricted=%s", self.restricted) - self.state = ProbeState.FAILED - self.cb(self) - - class FilesystemController(BaseController): def __init__(self, app): @@ -139,52 +71,91 @@ class FilesystemController(BaseController): self.answers.setdefault('guided', False) self.answers.setdefault('guided-index', 0) self.answers.setdefault('manual', []) - self._cur_probe = None self._monitor = None - self._udev_listen_handle = None - self._probes = {} + self._crash_reports = {} + self._probe_once_task = SingleInstanceTask(self._probe_once) + self._probe_task = SingleInstanceTask(self._probe) + + async def _probe_once(self, restricted): + if restricted: + probe_types = {'blockdev'} + fname = 'probe-data-restricted.json' + key = "ProbeDataRestricted" + else: + probe_types = None + fname = 'probe-data.json' + key = "ProbeData" + debug_flags = self.debug_flags + if 'bpfail-full' in debug_flags and not restricted: + await asyncio.sleep(2) + 1/0 + if 'bpfail-restricted' in debug_flags and restricted: + await asyncio.sleep(2) + 1/0 + storage = await run_in_thread( + self.app.prober.get_storage, probe_types) + block_discover_log.info( + "probing successful restricted=%s", restricted) + fpath = os.path.join(self.app.block_log_dir, fname) + with open(fpath, 'w') as fp: + json.dump(storage, fp, indent=4) + self.app.note_file_for_apport(key, fpath) + self.model.load_probe_data(storage) + + async def _probe(self): + self._crash_reports = {} + for (restricted, kind) in [ + (False, ErrorReportKind.BLOCK_PROBE_FAIL), + (True, ErrorReportKind.DISK_PROBE_FAIL), + ]: + try: + await self._probe_once_task.start(restricted) + await asyncio.wait_for(self._probe_once_task.task, 5.0) + except Exception: + block_discover_log.exception( + "block probing failed restricted=%s", restricted) + self._crash_reports[restricted] = self.app.make_apport_report( + kind, "block probing", interrupt=False) + continue + break + if self.showing: + self.start_ui() def start(self): - self._start_probe(restricted=False, start=False) + self._start_task = schedule_task(self._start()) + + async def _start(self): target = self.app.base_model.target if os.path.exists(target): - self.run_in_bg(self._bg_unmount, self._unmounted) - else: - self._cur_probe.start() - - def _bg_unmount(self): - cmd = [ - sys.executable, '-m', 'curtin', 'unmount', - '-t', self.app.base_model.target, - ] - if self.opts.dry_run: - cmd = ['sleep', 0.2] - run_command(cmd) - - def _unmounted(self, fut): + cmd = [ + sys.executable, '-m', 'curtin', 'unmount', + '-t', self.app.base_model.target, + ] + if self.opts.dry_run: + cmd = ['sleep', "0.2"] + await arun_command(cmd) context = pyudev.Context() self._monitor = pyudev.Monitor.from_netlink(context) self._monitor.filter_by(subsystem='block') self._monitor.enable_receiving() self.start_listening_udev() - self._cur_probe.start() + await self._probe_task.start() def start_listening_udev(self): - self._udev_listen_handle = self.loop.watch_file( - self._monitor.fileno(), self._udev_event) + loop = asyncio.get_event_loop() + loop.add_reader(self._monitor.fileno(), self._udev_event) def stop_listening_udev(self): - if self._udev_listen_handle is not None: - self.loop.remove_watch_file(self._udev_listen_handle) - self._udev_listen_handle = None + loop = asyncio.get_event_loop() + loop.remove_reader(self._monitor.fileno()) def _udev_event(self): cp = run_command(['udevadm', 'settle', '-t', '0']) if cp.returncode != 0: log.debug("waiting 0.1 to let udev event queue settle") self.stop_listening_udev() - self.loop.set_alarm_in( - 0.1, lambda loop, ud: self.start_listening_udev()) + loop = asyncio.get_event_loop() + loop.call_later(0.1, self.start_listening_udev) return # Drain the udev events in the queue -- if we stopped listening to # allow udev to settle, it's good bet there is more than one event to @@ -194,59 +165,12 @@ class FilesystemController(BaseController): while select.select([self._monitor.fileno()], [], [], 0)[0]: action, dev = self._monitor.receive_device() log.debug("_udev_event %s %s", action, dev) - self._start_probe(restricted=False) - - def _start_probe(self, *, restricted, start=True): - p = Probe(self, restricted, 5.0, self._probe_done) - self._cur_probe = self._probes[restricted] = p - if start: - p.start() - - def _probe_done(self, probe): - if probe is not self._cur_probe: - block_discover_log.debug( - "ignoring result %s for superseded probe", probe.result) - return - if probe.state == ProbeState.FAILED: - if not probe.restricted: - self._start_probe(restricted=True) - else: - if self.showing: - self.start_ui() - return - if probe.restricted: - fname = 'probe-data-restricted.json' - key = "ProbeDataRestricted" - else: - fname = 'probe-data.json' - key = "ProbeData" - fpath = os.path.join(self.app.block_log_dir, fname) - with open(fpath, 'w') as fp: - json.dump(probe.result, fp, indent=4) - self.app.note_file_for_apport(key, fpath) - try: - self.model.load_probe_data(probe.result) - except Exception: - block_discover_log.exception( - "load_probe_data failed restricted=%s", probe.restricted) - probe.crash_report = self.app.make_apport_report( - probe.kind, "loading probe data", interrupt=False) - if not probe.restricted: - self._start_probe(restricted=True) - else: - # OK, this is a hack - self._cur_probe.state = ProbeState.FAILED - if self.showing: - self.start_ui() - else: - # Should do something here if probing found no devices. - if self.showing: - self.start_ui() + self._probe_task.start_sync() def start_ui(self): - if self._cur_probe.state == ProbeState.PROBING: + if not self._probe_task.task.done(): self.ui.set_body(SlowProbing(self)) - elif self._cur_probe.state == ProbeState.FAILED: + elif True in self._crash_reports: self.ui.set_body(ProbingFailed(self)) self.ui.body.show_error() else: @@ -256,10 +180,9 @@ class FilesystemController(BaseController): # not today. self.stop_listening_udev() self.ui.set_body(GuidedFilesystemView(self)) - if self._cur_probe.restricted: - pr = self._probes[False].crash_report - if pr is not None: - self.app.show_error_report(pr) + pr = self._crash_reports.get(False) + if pr is not None: + self.app.show_error_report(pr) if self.answers['guided']: self.guided(self.answers.get('guided-method', 'direct')) elif self.answers['manual']: diff --git a/subiquity/ui/views/filesystem/probing.py b/subiquity/ui/views/filesystem/probing.py index 784ad791..22155aff 100644 --- a/subiquity/ui/views/filesystem/probing.py +++ b/subiquity/ui/views/filesystem/probing.py @@ -82,4 +82,4 @@ class ProbingFailed(BaseView): def show_error(self, sender=None): self.controller.app.show_error_report( - self.controller._cur_probe.crash_report) + self.controller._crash_reports[False])