asyncioify filesystem controller

This commit is contained in:
Michael Hudson-Doyle 2019-12-12 23:04:55 +13:00
parent 2ad6f0b2ee
commit eee1280de4
2 changed files with 82 additions and 159 deletions

View File

@ -13,18 +13,25 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import enum
import asyncio
import json
import logging
import os
import select
import sys
import time
import pyudev
from subiquitycore.async_helpers import (
run_in_thread,
schedule_task,
SingleInstanceTask,
)
from subiquitycore.controller import BaseController
from subiquitycore.utils import run_command
from subiquitycore.utils import (
arun_command,
run_command,
)
from subiquity.controllers.error import ErrorReportKind
from subiquity.models.filesystem import (
@ -53,81 +60,6 @@ PREP_GRUB_SIZE_BYTES = 8 * 1024 * 1024 # 8MiB
UEFI_GRUB_SIZE_BYTES = 512 * 1024 * 1024 # 512MiB EFI partition
class ProbeState(enum.IntEnum):
NOT_STARTED = enum.auto()
PROBING = enum.auto()
FAILED = enum.auto()
DONE = enum.auto()
class Probe:
def __init__(self, controller, restricted, timeout, cb):
self.controller = controller
self.restricted = restricted
self.timeout = timeout
self.cb = cb
self.state = ProbeState.NOT_STARTED
self.result = None
if restricted:
self.kind = ErrorReportKind.DISK_PROBE_FAIL
else:
self.kind = ErrorReportKind.BLOCK_PROBE_FAIL
self.crash_report = None
def start(self):
block_discover_log.debug(
"starting probe restricted=%s", self.restricted)
self.state = ProbeState.PROBING
self.controller.run_in_bg(self._bg_probe, self._probed)
self.controller.loop.set_alarm_in(self.timeout, self._check_timeout)
def _bg_probe(self):
if self.restricted:
probe_types = {'blockdev'}
else:
probe_types = None
debug_flags = self.controller.debug_flags
if 'bpfail-full' in debug_flags and not self.restricted:
time.sleep(2)
1/0
if 'bpfail-restricted' in debug_flags and self.restricted:
time.sleep(2)
1/0
# Should consider invoking probert in a subprocess here (so we
# can kill it if it gets stuck).
return self.controller.app.prober.get_storage(probe_types=probe_types)
def _probed(self, fut):
if self.state == ProbeState.FAILED:
block_discover_log.debug(
"ignoring result %s for timed out probe", fut)
return
try:
self.result = fut.result()
except Exception:
block_discover_log.exception(
"probing failed restricted=%s", self.restricted)
self.crash_report = self.controller.app.make_apport_report(
self.kind, "block probing", interrupt=False)
self.state = ProbeState.FAILED
else:
block_discover_log.info(
"probing successful restricted=%s", self.restricted)
self.state = ProbeState.DONE
self.cb(self)
def _check_timeout(self, loop, ud):
if self.state != ProbeState.PROBING:
return
self.crash_report = self.controller.app.make_apport_report(
self.kind, "block probing timed out", interrupt=False)
block_discover_log.exception(
"probing timed out restricted=%s", self.restricted)
self.state = ProbeState.FAILED
self.cb(self)
class FilesystemController(BaseController):
def __init__(self, app):
@ -139,52 +71,91 @@ class FilesystemController(BaseController):
self.answers.setdefault('guided', False)
self.answers.setdefault('guided-index', 0)
self.answers.setdefault('manual', [])
self._cur_probe = None
self._monitor = None
self._udev_listen_handle = None
self._probes = {}
self._crash_reports = {}
self._probe_once_task = SingleInstanceTask(self._probe_once)
self._probe_task = SingleInstanceTask(self._probe)
async def _probe_once(self, restricted):
if restricted:
probe_types = {'blockdev'}
fname = 'probe-data-restricted.json'
key = "ProbeDataRestricted"
else:
probe_types = None
fname = 'probe-data.json'
key = "ProbeData"
debug_flags = self.debug_flags
if 'bpfail-full' in debug_flags and not restricted:
await asyncio.sleep(2)
1/0
if 'bpfail-restricted' in debug_flags and restricted:
await asyncio.sleep(2)
1/0
storage = await run_in_thread(
self.app.prober.get_storage, probe_types)
block_discover_log.info(
"probing successful restricted=%s", restricted)
fpath = os.path.join(self.app.block_log_dir, fname)
with open(fpath, 'w') as fp:
json.dump(storage, fp, indent=4)
self.app.note_file_for_apport(key, fpath)
self.model.load_probe_data(storage)
async def _probe(self):
self._crash_reports = {}
for (restricted, kind) in [
(False, ErrorReportKind.BLOCK_PROBE_FAIL),
(True, ErrorReportKind.DISK_PROBE_FAIL),
]:
try:
await self._probe_once_task.start(restricted)
await asyncio.wait_for(self._probe_once_task.task, 5.0)
except Exception:
block_discover_log.exception(
"block probing failed restricted=%s", restricted)
self._crash_reports[restricted] = self.app.make_apport_report(
kind, "block probing", interrupt=False)
continue
break
if self.showing:
self.start_ui()
def start(self):
self._start_probe(restricted=False, start=False)
self._start_task = schedule_task(self._start())
async def _start(self):
target = self.app.base_model.target
if os.path.exists(target):
self.run_in_bg(self._bg_unmount, self._unmounted)
else:
self._cur_probe.start()
def _bg_unmount(self):
cmd = [
sys.executable, '-m', 'curtin', 'unmount',
'-t', self.app.base_model.target,
]
if self.opts.dry_run:
cmd = ['sleep', 0.2]
run_command(cmd)
def _unmounted(self, fut):
cmd = [
sys.executable, '-m', 'curtin', 'unmount',
'-t', self.app.base_model.target,
]
if self.opts.dry_run:
cmd = ['sleep', "0.2"]
await arun_command(cmd)
context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(context)
self._monitor.filter_by(subsystem='block')
self._monitor.enable_receiving()
self.start_listening_udev()
self._cur_probe.start()
await self._probe_task.start()
def start_listening_udev(self):
self._udev_listen_handle = self.loop.watch_file(
self._monitor.fileno(), self._udev_event)
loop = asyncio.get_event_loop()
loop.add_reader(self._monitor.fileno(), self._udev_event)
def stop_listening_udev(self):
if self._udev_listen_handle is not None:
self.loop.remove_watch_file(self._udev_listen_handle)
self._udev_listen_handle = None
loop = asyncio.get_event_loop()
loop.remove_reader(self._monitor.fileno())
def _udev_event(self):
cp = run_command(['udevadm', 'settle', '-t', '0'])
if cp.returncode != 0:
log.debug("waiting 0.1 to let udev event queue settle")
self.stop_listening_udev()
self.loop.set_alarm_in(
0.1, lambda loop, ud: self.start_listening_udev())
loop = asyncio.get_event_loop()
loop.call_later(0.1, self.start_listening_udev)
return
# Drain the udev events in the queue -- if we stopped listening to
# allow udev to settle, it's good bet there is more than one event to
@ -194,59 +165,12 @@ class FilesystemController(BaseController):
while select.select([self._monitor.fileno()], [], [], 0)[0]:
action, dev = self._monitor.receive_device()
log.debug("_udev_event %s %s", action, dev)
self._start_probe(restricted=False)
def _start_probe(self, *, restricted, start=True):
p = Probe(self, restricted, 5.0, self._probe_done)
self._cur_probe = self._probes[restricted] = p
if start:
p.start()
def _probe_done(self, probe):
if probe is not self._cur_probe:
block_discover_log.debug(
"ignoring result %s for superseded probe", probe.result)
return
if probe.state == ProbeState.FAILED:
if not probe.restricted:
self._start_probe(restricted=True)
else:
if self.showing:
self.start_ui()
return
if probe.restricted:
fname = 'probe-data-restricted.json'
key = "ProbeDataRestricted"
else:
fname = 'probe-data.json'
key = "ProbeData"
fpath = os.path.join(self.app.block_log_dir, fname)
with open(fpath, 'w') as fp:
json.dump(probe.result, fp, indent=4)
self.app.note_file_for_apport(key, fpath)
try:
self.model.load_probe_data(probe.result)
except Exception:
block_discover_log.exception(
"load_probe_data failed restricted=%s", probe.restricted)
probe.crash_report = self.app.make_apport_report(
probe.kind, "loading probe data", interrupt=False)
if not probe.restricted:
self._start_probe(restricted=True)
else:
# OK, this is a hack
self._cur_probe.state = ProbeState.FAILED
if self.showing:
self.start_ui()
else:
# Should do something here if probing found no devices.
if self.showing:
self.start_ui()
self._probe_task.start_sync()
def start_ui(self):
if self._cur_probe.state == ProbeState.PROBING:
if not self._probe_task.task.done():
self.ui.set_body(SlowProbing(self))
elif self._cur_probe.state == ProbeState.FAILED:
elif True in self._crash_reports:
self.ui.set_body(ProbingFailed(self))
self.ui.body.show_error()
else:
@ -256,10 +180,9 @@ class FilesystemController(BaseController):
# not today.
self.stop_listening_udev()
self.ui.set_body(GuidedFilesystemView(self))
if self._cur_probe.restricted:
pr = self._probes[False].crash_report
if pr is not None:
self.app.show_error_report(pr)
pr = self._crash_reports.get(False)
if pr is not None:
self.app.show_error_report(pr)
if self.answers['guided']:
self.guided(self.answers.get('guided-method', 'direct'))
elif self.answers['manual']:

View File

@ -82,4 +82,4 @@ class ProbingFailed(BaseView):
def show_error(self, sender=None):
self.controller.app.show_error_report(
self.controller._cur_probe.crash_report)
self.controller._crash_reports[False])