move filesystem controller to new world

this cheats a bit and has an instance of FilesystemModel in both the server and the client
This commit is contained in:
Michael Hudson-Doyle 2020-10-12 13:09:38 +13:00
parent 50896cc214
commit 4adae88563
21 changed files with 649 additions and 490 deletions

View File

@ -1,6 +1,7 @@
[encoding: UTF-8]
subiquity/client/client.py
subiquity/client/controller.py
subiquity/client/controllers/filesystem.py
subiquity/client/controllers/__init__.py
subiquity/client/controllers/keyboard.py
subiquity/client/controllers/mirror.py
@ -28,20 +29,20 @@ subiquity/common/api/tests/test_client.py
subiquity/common/api/tests/test_endtoend.py
subiquity/common/api/tests/test_server.py
subiquity/common/errorreport.py
subiquity/common/filesystem.py
subiquity/common/__init__.py
subiquity/common/keyboard.py
subiquity/common/serialize.py
subiquity/common/tests/__init__.py
subiquity/common/tests/test_filesystem.py
subiquity/common/tests/test_keyboard.py
subiquity/common/types.py
subiquity/controller.py
subiquity/controllers/filesystem.py
subiquity/controllers/identity.py
subiquity/controllers/__init__.py
subiquity/controllers/reboot.py
subiquity/controllers/snaplist.py
subiquity/controllers/ssh.py
subiquity/controllers/tests/__init__.py
subiquity/controllers/tests/test_filesystem.py
subiquitycore/async_helpers.py
subiquitycore/contextlib38.py
subiquitycore/context.py
@ -118,6 +119,7 @@ subiquity/models/tests/test_subiquity.py
subiquity/server/controller.py
subiquity/server/controllers/cmdlist.py
subiquity/server/controllers/debconf.py
subiquity/server/controllers/filesystem.py
subiquity/server/controllers/__init__.py
subiquity/server/controllers/install.py
subiquity/server/controllers/keyboard.py

View File

@ -98,6 +98,7 @@ class SubiquityClient(TuiApplication):
"Proxy",
"Mirror",
"Refresh",
"Filesystem",
"Progress",
]

View File

@ -14,6 +14,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from subiquitycore.tuicontroller import RepeatedController
from .filesystem import FilesystemController
from .keyboard import KeyboardController
from .mirror import MirrorController
from .network import NetworkController
@ -24,6 +25,7 @@ from .welcome import WelcomeController
from .zdev import ZdevController
__all__ = [
'FilesystemController',
'KeyboardController',
'MirrorController',
'NetworkController',

View File

@ -0,0 +1,241 @@
# Copyright 2015 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from subiquitycore.lsb_release import lsb_release
from subiquity.client.controller import SubiquityTuiController
from subiquity.common.filesystem import FilesystemManipulator
from subiquity.common.types import ProbeStatus
from subiquity.models.filesystem import (
Bootloader,
FilesystemModel,
raidlevels_by_value,
)
from subiquity.ui.views import (
FilesystemView,
GuidedDiskSelectionView,
)
from subiquity.ui.views.filesystem.probing import (
SlowProbing,
ProbingFailed,
)
log = logging.getLogger("subiquity.client.controllers.filesystem")
BIOS_GRUB_SIZE_BYTES = 1 * 1024 * 1024 # 1MiB
PREP_GRUB_SIZE_BYTES = 8 * 1024 * 1024 # 8MiB
UEFI_GRUB_SIZE_BYTES = 512 * 1024 * 1024 # 512MiB EFI partition
class FilesystemController(SubiquityTuiController, FilesystemManipulator):
endpoint_name = 'storage'
autoinstall_key = "storage"
autoinstall_schema = {'type': 'object'} # ...
def __init__(self, app):
super().__init__(app)
self.model = None
self.answers.setdefault('guided', False)
self.answers.setdefault('guided-index', 0)
self.answers.setdefault('manual', [])
async def make_ui(self):
status = await self.endpoint.GET()
if status.status == ProbeStatus.PROBING:
self.app.aio_loop.create_task(self._wait_for_probing())
return SlowProbing(self)
else:
return await self.make_ui_real(status)
async def _wait_for_probing(self):
status = await self.endpoint.GET(wait=True)
if isinstance(self.ui.body, SlowProbing):
self.ui.set_body(await self.make_ui_real(status))
async def make_ui_real(self, status):
if status.status == ProbeStatus.FAILED:
self.app.show_error_report(status.error_report)
return ProbingFailed(self, status.error_report)
self.model = FilesystemModel(status.bootloader)
self.model.load_server_data(status)
if self.model.bootloader == Bootloader.PREP:
self.supports_resilient_boot = False
else:
release = lsb_release()['release']
self.supports_resilient_boot = release >= '20.04'
if status.error_report:
self.app.show_error_report(status.error_report)
if self.answers:
self.app.aio_loop.call_soon(self._start_answers)
return GuidedDiskSelectionView(self)
def _start_answers(self):
if self.answers['guided']:
disk = self.model.all_disks()[self.answers['guided-index']]
method = self.answers.get('guided-method')
self.ui.body.form.guided_choice.value = {
'disk': disk,
'use_lvm': method == "lvm",
}
self.ui.body.done(self.ui.body.form)
elif self.answers['manual']:
self.manual()
def run_answers(self):
# Handled above as we only want to run answers when probing
# completes.
pass
def _action_get(self, id):
dev_spec = id[0].split()
dev = None
if dev_spec[0] == "disk":
if dev_spec[1] == "index":
dev = self.model.all_disks()[int(dev_spec[2])]
elif dev_spec[1] == "serial":
dev = self.model._one(type='disk', serial=dev_spec[2])
elif dev_spec[0] == "raid":
if dev_spec[1] == "name":
for r in self.model.all_raids():
if r.name == dev_spec[2]:
dev = r
break
elif dev_spec[0] == "volgroup":
if dev_spec[1] == "name":
for r in self.model.all_volgroups():
if r.name == dev_spec[2]:
dev = r
break
if dev is None:
raise Exception("could not resolve {}".format(id))
if len(id) > 1:
part, index = id[1].split()
if part == "part":
return dev.partitions()[int(index)]
else:
return dev
raise Exception("could not resolve {}".format(id))
def _action_clean_devices_raid(self, devices):
r = {
self._action_get(d): v
for d, v in zip(devices[::2], devices[1::2])
}
for d in r:
assert d.ok_for_raid
return r
def _action_clean_devices_vg(self, devices):
r = {self._action_get(d): 'active' for d in devices}
for d in r:
assert d.ok_for_lvm_vg
return r
def _action_clean_level(self, level):
return raidlevels_by_value[level]
async def _answers_action(self, action):
from subiquitycore.ui.stretchy import StretchyOverlay
from subiquity.ui.views.filesystem.delete import ConfirmDeleteStretchy
log.debug("_answers_action %r", action)
if 'obj' in action:
obj = self._action_get(action['obj'])
action_name = action['action']
if action_name == "MAKE_BOOT":
action_name = "TOGGLE_BOOT"
meth = getattr(
self.ui.body.avail_list,
"_{}_{}".format(obj.type, action_name))
meth(obj)
yield
body = self.ui.body._w
if not isinstance(body, StretchyOverlay):
return
if isinstance(body.stretchy, ConfirmDeleteStretchy):
if action.get("submit", True):
body.stretchy.done()
else:
async for _ in self._enter_form_data(
body.stretchy.form,
action['data'],
action.get("submit", True)):
pass
elif action['action'] == 'create-raid':
self.ui.body.create_raid()
yield
body = self.ui.body._w
async for _ in self._enter_form_data(
body.stretchy.form,
action['data'],
action.get("submit", True),
clean_suffix='raid'):
pass
elif action['action'] == 'create-vg':
self.ui.body.create_vg()
yield
body = self.ui.body._w
async for _ in self._enter_form_data(
body.stretchy.form,
action['data'],
action.get("submit", True),
clean_suffix='vg'):
pass
elif action['action'] == 'done':
if not self.ui.body.done.enabled:
raise Exception("answers did not provide complete fs config")
await self.app.confirm_install()
self.finish()
else:
raise Exception("could not process action {}".format(action))
def manual(self):
self.ui.set_body(FilesystemView(self.model, self))
if self.answers['guided']:
async def t():
await self.app.confirm_install()
self.finish()
self.app.aio_loop.create_task(t())
if self.answers['manual']:
self.app.aio_loop.create_task(self._manual_answers())
async def _manual_answers(self):
await self._run_actions(self.answers['manual'])
self.answers['manual'] = []
def guided(self):
self.ui.set_body(GuidedDiskSelectionView(self))
def reset(self):
log.info("Resetting Filesystem model")
self.app.ui.block_input = True
self.app.aio_loop.create_task(self._reset())
async def _reset(self):
status = await self.endpoint.reset.POST()
self.app.ui.block_input = False
self.model.load_server_data(status)
self.ui.set_body(FilesystemView(self.model, self))
def cancel(self):
self.app.prev_screen()
def finish(self):
log.debug("FilesystemController.finish next_screen")
self.app.next_screen(self.endpoint.POST(self.model._render_actions()))

View File

@ -31,6 +31,7 @@ from subiquity.common.types import (
InstallState,
InstallStatus,
RefreshStatus,
StorageResponse,
ZdevInfo,
)
@ -160,6 +161,13 @@ class API:
class info:
def GET(dev_name: str) -> str: ...
class storage:
def GET(wait: bool = False) -> StorageResponse: ...
def POST(config: Payload[list]): ...
class reset:
def POST() -> StorageResponse: ...
class install:
class status:
def GET(cur: Optional[InstallState] = None) -> InstallStatus: ...

View File

@ -1,4 +1,4 @@
# Copyright 2015 Canonical, Ltd.
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -13,382 +13,25 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import json
import logging
import os
import select
import pyudev
from subiquitycore.async_helpers import (
run_in_thread,
schedule_task,
SingleInstanceTask,
)
from subiquitycore.context import with_context
from subiquitycore.lsb_release import lsb_release
from subiquitycore.utils import (
run_command,
)
from subiquity.common.errorreport import ErrorReportKind
from subiquity.common.types import Bootloader
from subiquity.controller import SubiquityTuiController
from subiquity.models.filesystem import (
align_up,
DeviceAction,
dehumanize_size,
DeviceAction,
Partition,
raidlevels_by_value,
)
from subiquity.ui.views import (
FilesystemView,
GuidedDiskSelectionView,
)
from subiquity.ui.views.filesystem.probing import (
SlowProbing,
ProbingFailed,
)
log = logging.getLogger('subiquity.common.filesystem')
log = logging.getLogger("subiquitycore.controller.filesystem")
block_discover_log = logging.getLogger('block-discover')
BIOS_GRUB_SIZE_BYTES = 1 * 1024 * 1024 # 1MiB
PREP_GRUB_SIZE_BYTES = 8 * 1024 * 1024 # 8MiB
UEFI_GRUB_SIZE_BYTES = 512 * 1024 * 1024 # 512MiB EFI partition
class FilesystemController(SubiquityTuiController):
autoinstall_key = "storage"
autoinstall_schema = {'type': 'object'} # ...
model_name = "filesystem"
def __init__(self, app):
self.ai_data = {}
super().__init__(app)
self.model.target = app.base_model.target
if self.opts.dry_run and self.opts.bootloader:
name = self.opts.bootloader.upper()
self.model.bootloader = getattr(Bootloader, name)
self.answers.setdefault('guided', False)
self.answers.setdefault('guided-index', 0)
self.answers.setdefault('manual', [])
self._monitor = None
self._crash_reports = {}
self._probe_once_task = SingleInstanceTask(
self._probe_once, propagate_errors=False)
self._probe_task = SingleInstanceTask(
self._probe, propagate_errors=False)
if self.model.bootloader == Bootloader.PREP:
self.supports_resilient_boot = False
else:
release = lsb_release()['release']
self.supports_resilient_boot = release >= '20.04'
def load_autoinstall_data(self, data):
log.debug("load_autoinstall_data %s", data)
if data is None:
if not self.interactive():
data = {
'layout': {
'name': 'lvm',
},
}
else:
data = {}
log.debug("self.ai_data = %s", data)
self.ai_data = data
@with_context()
async def apply_autoinstall_config(self, context=None):
self.stop_listening_udev()
await self._start_task
await self._probe_task.wait()
self.convert_autoinstall_config(context=context)
if not self.model.is_root_mounted():
raise Exception("autoinstall config did not mount root")
if self.model.needs_bootloader_partition():
raise Exception(
"autoinstall config did not create needed bootloader "
"partition")
@with_context(name='probe_once', description='restricted={restricted}')
async def _probe_once(self, *, context, restricted):
if restricted:
probe_types = {'blockdev'}
fname = 'probe-data-restricted.json'
key = "ProbeDataRestricted"
else:
probe_types = None
fname = 'probe-data.json'
key = "ProbeData"
storage = await run_in_thread(
self.app.prober.get_storage, probe_types)
fpath = os.path.join(self.app.block_log_dir, fname)
with open(fpath, 'w') as fp:
json.dump(storage, fp, indent=4)
self.app.note_file_for_apport(key, fpath)
self.model.load_probe_data(storage)
@with_context()
async def _probe(self, *, context=None):
async with self.app.install_lock_file.shared():
self._crash_reports = {}
if isinstance(self.ui.body, ProbingFailed):
self.ui.set_body(SlowProbing(self))
schedule_task(self._wait_for_probing())
for (restricted, kind) in [
(False, ErrorReportKind.BLOCK_PROBE_FAIL),
(True, ErrorReportKind.DISK_PROBE_FAIL),
]:
try:
await self._probe_once_task.start(
context=context, restricted=restricted)
# We wait on the task directly here, not
# self._probe_once_task.wait as if _probe_once_task
# gets cancelled, we should be cancelled too.
await asyncio.wait_for(self._probe_once_task.task, 15.0)
except asyncio.CancelledError:
# asyncio.CancelledError is a subclass of Exception in
# Python 3.6 (sadface)
raise
except Exception:
block_discover_log.exception(
"block probing failed restricted=%s", restricted)
report = self.app.make_apport_report(
kind, "block probing", interrupt=False)
if report is not None:
self._crash_reports[restricted] = report
continue
break
@with_context()
def convert_autoinstall_config(self, context=None):
log.debug("self.ai_data = %s", self.ai_data)
if 'layout' in self.ai_data:
layout = self.ai_data['layout']
meth = getattr(self, "guided_" + layout['name'])
disk = self.model.disk_for_match(
self.model.all_disks(),
layout.get("match", {'size': 'largest'}))
meth(disk)
elif 'config' in self.ai_data:
self.model.apply_autoinstall_config(self.ai_data['config'])
self.model.grub = self.ai_data.get('grub', {})
self.model.swap = self.ai_data.get('swap')
def start(self):
self._start_task = schedule_task(self._start())
async def _start(self):
context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(context)
self._monitor.filter_by(subsystem='block')
self._monitor.enable_receiving()
self.start_listening_udev()
await self._probe_task.start()
def start_listening_udev(self):
loop = asyncio.get_event_loop()
loop.add_reader(self._monitor.fileno(), self._udev_event)
def stop_listening_udev(self):
loop = asyncio.get_event_loop()
loop.remove_reader(self._monitor.fileno())
def _udev_event(self):
cp = run_command(['udevadm', 'settle', '-t', '0'])
if cp.returncode != 0:
log.debug("waiting 0.1 to let udev event queue settle")
self.stop_listening_udev()
loop = asyncio.get_event_loop()
loop.call_later(0.1, self.start_listening_udev)
return
# Drain the udev events in the queue -- if we stopped listening to
# allow udev to settle, it's good bet there is more than one event to
# process and we don't want to kick off a full block probe for each
# one. It's a touch unfortunate that pyudev doesn't have a
# non-blocking read so we resort to select().
while select.select([self._monitor.fileno()], [], [], 0)[0]:
action, dev = self._monitor.receive_device()
log.debug("_udev_event %s %s", action, dev)
self._probe_task.start_sync()
async def _wait_for_probing(self):
await self._start_task
await self._probe_task.wait()
if isinstance(self.ui.body, SlowProbing):
self.ui.set_body(self.make_ui())
def make_ui(self):
if self._probe_task.task is None or not self._probe_task.task.done():
schedule_task(self._wait_for_probing())
return SlowProbing(self)
elif True in self._crash_reports:
pr = self._crash_reports[True]
if pr is not None:
self.app.show_error_report(pr.ref())
return ProbingFailed(self)
else:
# Once we've shown the filesystem UI, we stop listening for udev
# events as merging system changes with configuration the user has
# performed would be tricky. Possibly worth doing though! Just
# not today.
self.convert_autoinstall_config()
self.stop_listening_udev()
pr = self._crash_reports.get(False)
if pr is not None:
self.app.show_error_report(pr.ref())
if self.answers:
self.app.aio_loop.call_soon(self._start_answers)
return GuidedDiskSelectionView(self)
def _start_answers(self):
if self.answers['guided']:
disk = self.model.all_disks()[self.answers['guided-index']]
method = self.answers.get('guided-method')
self.ui.body.form.guided_choice.value = {
'disk': disk,
'use_lvm': method == "lvm",
}
self.ui.body.done(self.ui.body.form)
elif self.answers['manual']:
self.manual()
def run_answers(self):
# Handled above as we only want to run answers when probing
# completes.
pass
def _action_get(self, id):
dev_spec = id[0].split()
dev = None
if dev_spec[0] == "disk":
if dev_spec[1] == "index":
dev = self.model.all_disks()[int(dev_spec[2])]
elif dev_spec[1] == "serial":
dev = self.model._one(type='disk', serial=dev_spec[2])
elif dev_spec[0] == "raid":
if dev_spec[1] == "name":
for r in self.model.all_raids():
if r.name == dev_spec[2]:
dev = r
break
elif dev_spec[0] == "volgroup":
if dev_spec[1] == "name":
for r in self.model.all_volgroups():
if r.name == dev_spec[2]:
dev = r
break
if dev is None:
raise Exception("could not resolve {}".format(id))
if len(id) > 1:
part, index = id[1].split()
if part == "part":
return dev.partitions()[int(index)]
else:
return dev
raise Exception("could not resolve {}".format(id))
def _action_clean_devices_raid(self, devices):
r = {
self._action_get(d): v
for d, v in zip(devices[::2], devices[1::2])
}
for d in r:
assert d.ok_for_raid
return r
def _action_clean_devices_vg(self, devices):
r = {self._action_get(d): 'active' for d in devices}
for d in r:
assert d.ok_for_lvm_vg
return r
def _action_clean_level(self, level):
return raidlevels_by_value[level]
async def _answers_action(self, action):
from subiquitycore.ui.stretchy import StretchyOverlay
from subiquity.ui.views.filesystem.delete import ConfirmDeleteStretchy
log.debug("_answers_action %r", action)
if 'obj' in action:
obj = self._action_get(action['obj'])
action_name = action['action']
if action_name == "MAKE_BOOT":
action_name = "TOGGLE_BOOT"
meth = getattr(
self.ui.body.avail_list,
"_{}_{}".format(obj.type, action_name))
meth(obj)
yield
body = self.ui.body._w
if not isinstance(body, StretchyOverlay):
return
if isinstance(body.stretchy, ConfirmDeleteStretchy):
if action.get("submit", True):
body.stretchy.done()
else:
async for _ in self._enter_form_data(
body.stretchy.form,
action['data'],
action.get("submit", True)):
pass
elif action['action'] == 'create-raid':
self.ui.body.create_raid()
yield
body = self.ui.body._w
async for _ in self._enter_form_data(
body.stretchy.form,
action['data'],
action.get("submit", True),
clean_suffix='raid'):
pass
elif action['action'] == 'create-vg':
self.ui.body.create_vg()
yield
body = self.ui.body._w
async for _ in self._enter_form_data(
body.stretchy.form,
action['data'],
action.get("submit", True),
clean_suffix='vg'):
pass
elif action['action'] == 'done':
if not self.ui.body.done.enabled:
raise Exception("answers did not provide complete fs config")
self.finish(self.app.confirm_install())
else:
raise Exception("could not process action {}".format(action))
def manual(self):
self.ui.set_body(FilesystemView(self.model, self))
if self.answers['guided']:
self.finish(self.app.confirm_install())
if self.answers['manual']:
self.app.aio_loop.create_task(
self._run_actions(self.answers['manual']))
self.answers['manual'] = []
def guided(self):
self.ui.set_body(GuidedDiskSelectionView(self))
def reset(self):
log.info("Resetting Filesystem model")
self.model.reset()
self.manual()
def cancel(self):
self.app.prev_screen()
def finish(self, coro=None):
log.debug("FilesystemController.finish next_screen")
self.configured()
self.app.next_screen(coro)
class FilesystemManipulator:
def create_mount(self, fs, spec):
if spec.get('mount') is None:
@ -723,6 +366,7 @@ class FilesystemController(SubiquityTuiController):
part_size = PREP_GRUB_SIZE_BYTES
elif bootloader == Bootloader.BIOS:
part_size = BIOS_GRUB_SIZE_BYTES
log.debug("bootloader %s", bootloader)
if part_size > new_boot_disk.free_for_partitions:
largest_part = max(
new_boot_disk.partitions(), key=lambda p: p.size)
@ -781,12 +425,3 @@ class FilesystemController(SubiquityTuiController):
fstype="ext4",
mount="/",
))
def make_autoinstall(self):
rendered = self.model.render()
r = {
'config': rendered['storage']['config']
}
if 'swap' in rendered:
r['swap'] = rendered['swap']
return r

View File

@ -0,0 +1,14 @@
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@ -15,9 +15,8 @@
import unittest
from subiquitycore.context import Context
from subiquity.controllers.filesystem import (
FilesystemController,
from subiquity.common.filesystem import (
FilesystemManipulator,
)
from subiquity.models.tests.test_filesystem import (
make_disk,
@ -29,61 +28,41 @@ from subiquity.models.filesystem import (
)
class Thing:
# Just something to hang attributes off
pass
def make_manipulator(bootloader=None):
manipulator = FilesystemManipulator()
manipulator.model = make_model(bootloader)
manipulator.supports_resilient_boot = True
return manipulator
class MiniApplication:
ui = signal = loop = None
project = "mini"
autoinstall_config = {}
answers = {}
opts = Thing()
opts.dry_run = True
opts.bootloader = None
def report_start_event(*args): pass
def report_finish_event(*args): pass
def make_manipulator_and_disk(bootloader=None):
manipulator = make_manipulator(bootloader)
return manipulator, make_disk(manipulator.model)
def make_controller(bootloader=None):
app = MiniApplication()
app.base_model = bm = Thing()
app.context = Context.new(app)
bm.target = '/target'
bm.filesystem = make_model(bootloader)
controller = FilesystemController(app)
return controller
def make_controller_and_disk(bootloader=None):
controller = make_controller(bootloader)
return controller, make_disk(controller.model)
class TestFilesystemController(unittest.TestCase):
class TestFilesystemManipulator(unittest.TestCase):
def test_delete_encrypted_vg(self):
controller, disk = make_controller_and_disk()
manipulator, disk = make_manipulator_and_disk()
spec = {
'password': 'passw0rd',
'devices': {disk},
'name': 'vg0',
}
vg = controller.create_volgroup(spec)
controller.delete_volgroup(vg)
vg = manipulator.create_volgroup(spec)
manipulator.delete_volgroup(vg)
dm_crypts = [
a for a in controller.model._actions if a.type == 'dm_crypt']
a for a in manipulator.model._actions if a.type == 'dm_crypt']
self.assertEqual(dm_crypts, [])
def test_can_only_add_boot_once(self):
# This is really testing model code but it's much easier to test with a
# controller around.
# manipulator around.
for bl in Bootloader:
controller, disk = make_controller_and_disk(bl)
manipulator, disk = make_manipulator_and_disk(bl)
if DeviceAction.TOGGLE_BOOT not in disk.supported_actions:
continue
controller.add_boot_disk(disk)
manipulator.add_boot_disk(disk)
self.assertFalse(
disk._can_TOGGLE_BOOT,
"add_boot_disk(disk) did not make _can_TOGGLE_BOOT false "
@ -98,43 +77,43 @@ class TestFilesystemController(unittest.TestCase):
if device.fs():
self.assertIs(device.fs().mount(), None)
def add_existing_boot_partition(self, controller, disk):
if controller.model.bootloader == Bootloader.BIOS:
part = controller.model.add_partition(
def add_existing_boot_partition(self, manipulator, disk):
if manipulator.model.bootloader == Bootloader.BIOS:
part = manipulator.model.add_partition(
disk, size=1 << 20, flag="bios_grub")
elif controller.model.bootloader == Bootloader.UEFI:
part = controller.model.add_partition(
elif manipulator.model.bootloader == Bootloader.UEFI:
part = manipulator.model.add_partition(
disk, size=512 << 20, flag="boot")
elif controller.model.bootloader == Bootloader.PREP:
part = controller.model.add_partition(
elif manipulator.model.bootloader == Bootloader.PREP:
part = manipulator.model.add_partition(
disk, size=8 << 20, flag="prep")
part.preserve = True
return part
def assertIsBootDisk(self, controller, disk):
if controller.model.bootloader == Bootloader.BIOS:
def assertIsBootDisk(self, manipulator, disk):
if manipulator.model.bootloader == Bootloader.BIOS:
self.assertTrue(disk.grub_device)
self.assertEqual(disk.partitions()[0].flag, "bios_grub")
elif controller.model.bootloader == Bootloader.UEFI:
elif manipulator.model.bootloader == Bootloader.UEFI:
for part in disk.partitions():
if part.flag == "boot" and part.grub_device:
return
self.fail("{} is not a boot disk".format(disk))
elif controller.model.bootloader == Bootloader.PREP:
elif manipulator.model.bootloader == Bootloader.PREP:
for part in disk.partitions():
if part.flag == "prep" and part.grub_device:
self.assertEqual(part.wipe, 'zero')
return
self.fail("{} is not a boot disk".format(disk))
def assertIsNotBootDisk(self, controller, disk):
if controller.model.bootloader == Bootloader.BIOS:
def assertIsNotBootDisk(self, manipulator, disk):
if manipulator.model.bootloader == Bootloader.BIOS:
self.assertFalse(disk.grub_device)
elif controller.model.bootloader == Bootloader.UEFI:
elif manipulator.model.bootloader == Bootloader.UEFI:
for part in disk.partitions():
if part.flag == "boot" and part.grub_device:
self.fail("{} is a boot disk".format(disk))
elif controller.model.bootloader == Bootloader.PREP:
elif manipulator.model.bootloader == Bootloader.PREP:
for part in disk.partitions():
if part.flag == "prep" and part.grub_device:
self.fail("{} is a boot disk".format(disk))
@ -143,23 +122,23 @@ class TestFilesystemController(unittest.TestCase):
for bl in Bootloader:
if bl == Bootloader.NONE:
continue
controller = make_controller(bl)
controller.supports_resilient_boot = True
manipulator = make_manipulator(bl)
manipulator.supports_resilient_boot = True
disk1 = make_disk(controller.model, preserve=False)
disk2 = make_disk(controller.model, preserve=False)
disk2p1 = controller.model.add_partition(
disk1 = make_disk(manipulator.model, preserve=False)
disk2 = make_disk(manipulator.model, preserve=False)
disk2p1 = manipulator.model.add_partition(
disk2, size=disk2.free_for_partitions)
controller.add_boot_disk(disk1)
self.assertIsBootDisk(controller, disk1)
manipulator.add_boot_disk(disk1)
self.assertIsBootDisk(manipulator, disk1)
if bl == Bootloader.UEFI:
self.assertIsMountedAtBootEFI(disk1.partitions()[0])
size_before = disk2p1.size
controller.add_boot_disk(disk2)
self.assertIsBootDisk(controller, disk1)
self.assertIsBootDisk(controller, disk2)
manipulator.add_boot_disk(disk2)
self.assertIsBootDisk(manipulator, disk1)
self.assertIsBootDisk(manipulator, disk2)
if bl == Bootloader.UEFI:
self.assertIsMountedAtBootEFI(disk1.partitions()[0])
self.assertNotMounted(disk2.partitions()[0])
@ -168,15 +147,15 @@ class TestFilesystemController(unittest.TestCase):
self.assertEqual(
disk2.partitions()[0].size + disk2p1.size, size_before)
controller.remove_boot_disk(disk1)
self.assertIsNotBootDisk(controller, disk1)
self.assertIsBootDisk(controller, disk2)
manipulator.remove_boot_disk(disk1)
self.assertIsNotBootDisk(manipulator, disk1)
self.assertIsBootDisk(manipulator, disk2)
if bl == Bootloader.UEFI:
self.assertIsMountedAtBootEFI(disk2.partitions()[0])
self.assertEqual(len(disk1.partitions()), 0)
controller.remove_boot_disk(disk2)
self.assertIsNotBootDisk(controller, disk2)
manipulator.remove_boot_disk(disk2)
self.assertIsNotBootDisk(manipulator, disk2)
self.assertEqual(len(disk2.partitions()), 1)
self.assertEqual(disk2p1.size, size_before)
@ -184,23 +163,23 @@ class TestFilesystemController(unittest.TestCase):
for bl in Bootloader:
if bl == Bootloader.NONE:
continue
controller = make_controller(bl)
controller.supports_resilient_boot = False
manipulator = make_manipulator(bl)
manipulator.supports_resilient_boot = False
disk1 = make_disk(controller.model, preserve=False)
disk2 = make_disk(controller.model, preserve=False)
disk2p1 = controller.model.add_partition(
disk1 = make_disk(manipulator.model, preserve=False)
disk2 = make_disk(manipulator.model, preserve=False)
disk2p1 = manipulator.model.add_partition(
disk2, size=disk2.free_for_partitions)
controller.add_boot_disk(disk1)
self.assertIsBootDisk(controller, disk1)
manipulator.add_boot_disk(disk1)
self.assertIsBootDisk(manipulator, disk1)
if bl == Bootloader.UEFI:
self.assertIsMountedAtBootEFI(disk1.partitions()[0])
size_before = disk2p1.size
controller.add_boot_disk(disk2)
self.assertIsNotBootDisk(controller, disk1)
self.assertIsBootDisk(controller, disk2)
manipulator.add_boot_disk(disk2)
self.assertIsNotBootDisk(manipulator, disk1)
self.assertIsBootDisk(manipulator, disk2)
if bl == Bootloader.UEFI:
self.assertIsMountedAtBootEFI(disk2.partitions()[0])
self.assertEqual(len(disk2.partitions()), 2)
@ -212,34 +191,34 @@ class TestFilesystemController(unittest.TestCase):
for bl in Bootloader:
if bl == Bootloader.NONE:
continue
controller = make_controller(bl)
manipulator = make_manipulator(bl)
disk1 = make_disk(controller.model, preserve=True)
part = self.add_existing_boot_partition(controller, disk1)
disk1 = make_disk(manipulator.model, preserve=True)
part = self.add_existing_boot_partition(manipulator, disk1)
wipe_before = part.wipe
controller.add_boot_disk(disk1)
self.assertIsBootDisk(controller, disk1)
manipulator.add_boot_disk(disk1)
self.assertIsBootDisk(manipulator, disk1)
if bl == Bootloader.UEFI:
self.assertIsMountedAtBootEFI(part)
controller.remove_boot_disk(disk1)
self.assertIsNotBootDisk(controller, disk1)
manipulator.remove_boot_disk(disk1)
self.assertIsNotBootDisk(manipulator, disk1)
self.assertEqual(len(disk1.partitions()), 1)
self.assertEqual(part.wipe, wipe_before)
if bl == Bootloader.UEFI:
self.assertNotMounted(part)
def test_mounting_partition_makes_boot_disk(self):
controller = make_controller(Bootloader.UEFI)
disk1 = make_disk(controller.model, preserve=True)
disk1p1 = controller.model.add_partition(
manipulator = make_manipulator(Bootloader.UEFI)
disk1 = make_disk(manipulator.model, preserve=True)
disk1p1 = manipulator.model.add_partition(
disk1, size=512 << 20, flag="boot")
disk1p1.preserve = True
disk1p2 = controller.model.add_partition(
disk1p2 = manipulator.model.add_partition(
disk1, size=disk1.free_for_partitions)
disk1p2.preserve = True
controller.partition_disk_handler(
manipulator.partition_disk_handler(
disk1, disk1p2, {'fstype': 'ext4', 'mount': '/'})
efi_mnt = controller.model._mount_for_path("/boot/efi")
efi_mnt = manipulator.model._mount_for_path("/boot/efi")
self.assertEqual(efi_mnt.device.volume, disk1p1)

View File

@ -115,6 +115,12 @@ class ZdevInfo:
return self.type
class ProbeStatus(enum.Enum):
PROBING = enum.auto()
FAILED = enum.auto()
DONE = enum.auto()
class Bootloader(enum.Enum):
NONE = "NONE" # a system where the bootloader is external, e.g. s390x
BIOS = "BIOS" # BIOS, where the bootloader dd-ed to the start of a device
@ -122,6 +128,16 @@ class Bootloader(enum.Enum):
PREP = "PREP" # ppc64el, which puts grub on a PReP partition
@attr.s(auto_attribs=True)
class StorageResponse:
status: ProbeStatus
bootloader: Optional[Bootloader] = None
error_report: Optional[ErrorReportRef] = None
orig_config: Optional[list] = None
config: Optional[list] = None
blockdev: Optional[dict] = None
@attr.s(auto_attribs=True)
class IdentityData:
realname: str = ''

View File

@ -13,14 +13,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .filesystem import FilesystemController
from .identity import IdentityController
from .reboot import RebootController
from .snaplist import SnapListController
from .ssh import SSHController
__all__ = [
'FilesystemController',
'IdentityController',
'RebootController',
'SnapListController',

View File

@ -1 +0,0 @@
#

View File

@ -1233,10 +1233,11 @@ LUKS_OVERHEAD = 16*(2**20)
@fsobj("dm_crypt")
class DM_Crypt:
volume = attributes.ref(backlink="_constructed_device") # _Formattable
key = attr.ib(metadata={'redact': True})
key = attr.ib(metadata={'redact': True}, default=None)
keyfile = attr.ib(default=None)
def serialize_key(self):
if self.key:
if self.key and not self.keyfile:
f = tempfile.NamedTemporaryFile(
prefix='luks-key-', mode='w', delete=False)
f.write(self.key)
@ -1335,8 +1336,10 @@ class FilesystemModel(object):
else:
return Bootloader.BIOS
def __init__(self):
self.bootloader = self._probe_bootloader()
def __init__(self, bootloader=None):
if bootloader is None:
bootloader = self._probe_bootloader()
self.bootloader = bootloader
self._probe_data = None
self.reset()
@ -1345,13 +1348,25 @@ class FilesystemModel(object):
self._orig_config = storage_config.extract_storage_config(
self._probe_data)["storage"]["config"]
self._actions = self._actions_from_config(
self._orig_config, self._probe_data['blockdev'])
self._orig_config,
self._probe_data['blockdev'],
is_probe_data=True)
else:
self._orig_config = []
self._actions = []
self.swap = None
self.grub = None
def load_server_data(self, status):
log.debug('load_server_data %s', status)
self._orig_config = status.orig_config
self._probe_data = {'blockdev': status.blockdev}
self._actions = self._actions_from_config(
status.config,
status.blockdev)
self.swap = None
self.grub = None
def _make_matchers(self, match):
matchers = []
@ -1423,7 +1438,7 @@ class FilesystemModel(object):
action['path'] = disk.path
action['serial'] = disk.serial
self._actions = self._actions_from_config(
ai_config, self._probe_data['blockdev'], is_autoinstall=True)
ai_config, self._probe_data['blockdev'], is_probe_data=False)
for p in self._all(type="partition") + self._all(type="lvm_partition"):
[parent] = list(dependencies(p))
if isinstance(p.size, int):
@ -1442,7 +1457,7 @@ class FilesystemModel(object):
else:
p.size = dehumanize_size(p.size)
def _actions_from_config(self, config, blockdevs, is_autoinstall=False):
def _actions_from_config(self, config, blockdevs, is_probe_data=False):
"""Convert curtin storage config into action instances.
curtin represents storage "actions" as defined in
@ -1469,7 +1484,7 @@ class FilesystemModel(object):
exclusions = set()
seen_multipaths = set()
for action in config:
if not is_autoinstall and action['type'] == 'mount':
if is_probe_data and action['type'] == 'mount':
if not action['path'].startswith(self.target):
# Completely ignore mounts under /target, they are
# probably leftovers from a previous install
@ -1502,7 +1517,7 @@ class FilesystemModel(object):
if kw['type'] == 'disk':
path = kw['path']
kw['info'] = StorageInfo({path: blockdevs[path]})
if not is_autoinstall:
if is_probe_data:
kw['preserve'] = True
obj = byid[action['id']] = c(m=self, **kw)
multipath = kw.get('multipath')
@ -1526,7 +1541,7 @@ class FilesystemModel(object):
objs = [o for o in objs if o not in exclusions]
if not is_autoinstall:
if is_probe_data:
for o in objs:
if o.type == "partition" and o.flag == "swap":
if o._fs is None:
@ -1535,7 +1550,7 @@ class FilesystemModel(object):
return objs
def _render_actions(self):
def _render_actions(self, include_all=False):
# The curtin storage config has the constraint that an action must be
# preceded by all the things that it depends on. We handle this by
# repeatedly iterating over all actions and checking if we can emit
@ -1591,7 +1606,7 @@ class FilesystemModel(object):
work = [
a for a in self._actions
if not getattr(a, 'preserve', False)
if not getattr(a, 'preserve', False) or include_all
]
while work:

View File

@ -15,6 +15,7 @@
from .cmdlist import EarlyController, LateController, ErrorController
from .debconf import DebconfController
from .filesystem import FilesystemController
from .install import InstallController
from .keyboard import KeyboardController
from .locale import LocaleController
@ -31,6 +32,7 @@ __all__ = [
'DebconfController',
'EarlyController',
'ErrorController',
'FilesystemController',
'InstallController',
'KeyboardController',
'LateController',

View File

@ -0,0 +1,247 @@
# Copyright 2015 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import json
import logging
import os
import select
import pyudev
from subiquitycore.async_helpers import (
run_in_thread,
schedule_task,
SingleInstanceTask,
)
from subiquitycore.context import with_context
from subiquitycore.utils import (
run_command,
)
from subiquitycore.lsb_release import lsb_release
from subiquity.common.apidef import API
from subiquity.common.errorreport import ErrorReportKind
from subiquity.common.filesystem import FilesystemManipulator
from subiquity.common.types import (
ProbeStatus,
StorageResponse,
)
from subiquity.models.filesystem import (
Bootloader,
)
from subiquity.server.controller import (
SubiquityController,
)
log = logging.getLogger("subiquity.server.controller.filesystem")
block_discover_log = logging.getLogger('block-discover')
class FilesystemController(SubiquityController, FilesystemManipulator):
endpoint = API.storage
autoinstall_key = "storage"
autoinstall_schema = {'type': 'object'} # ...
model_name = "filesystem"
def __init__(self, app):
self.ai_data = {}
super().__init__(app)
self.model.target = app.base_model.target
if self.opts.dry_run and self.opts.bootloader:
name = self.opts.bootloader.upper()
self.model.bootloader = getattr(Bootloader, name)
self._monitor = None
self._crash_reports = {}
self._probe_once_task = SingleInstanceTask(
self._probe_once, propagate_errors=False)
self._probe_task = SingleInstanceTask(
self._probe, propagate_errors=False)
def load_autoinstall_data(self, data):
log.debug("load_autoinstall_data %s", data)
if data is None:
if not self.interactive():
data = {
'layout': {
'name': 'lvm',
},
}
else:
data = {}
log.debug("self.ai_data = %s", data)
self.ai_data = data
@with_context()
async def apply_autoinstall_config(self, context=None):
await self._start_task
self.stop_listening_udev()
await self._probe_task.wait()
self.convert_autoinstall_config(context=context)
if not self.model.is_root_mounted():
raise Exception("autoinstall config did not mount root")
if self.model.needs_bootloader_partition():
raise Exception(
"autoinstall config did not create needed bootloader "
"partition")
async def GET(self, wait: bool = False) -> StorageResponse:
if self._probe_task.task is None or not self._probe_task.task.done():
if wait:
await self._start_task
await self._probe_task.wait()
else:
return StorageResponse(status=ProbeStatus.PROBING)
if True in self._crash_reports:
return StorageResponse(
status=ProbeStatus.FAILED,
error_report=self._crash_reports[True].ref())
else:
if False in self._crash_reports:
err_ref = self._crash_reports[False].ref()
else:
err_ref = None
return StorageResponse(
status=ProbeStatus.DONE,
bootloader=self.model.bootloader,
error_report=err_ref,
orig_config=self.model._orig_config,
config=self.model._render_actions(include_all=True),
blockdev=self.model._probe_data['blockdev'])
async def POST(self, config: list):
self.model._actions = self.model._actions_from_config(
config, self.model._probe_data['blockdev'], is_probe_data=False)
self.configured()
async def reset_POST(self, context, request) -> StorageResponse:
log.info("Resetting Filesystem model")
self.model.reset()
return await self.GET(context)
@with_context(name='probe_once', description='restricted={restricted}')
async def _probe_once(self, *, context, restricted):
if restricted:
probe_types = {'blockdev'}
fname = 'probe-data-restricted.json'
key = "ProbeDataRestricted"
else:
probe_types = None
fname = 'probe-data.json'
key = "ProbeData"
storage = await run_in_thread(
self.app.prober.get_storage, probe_types)
fpath = os.path.join(self.app.block_log_dir, fname)
with open(fpath, 'w') as fp:
json.dump(storage, fp, indent=4)
self.app.note_file_for_apport(key, fpath)
self.model.load_probe_data(storage)
@with_context()
async def _probe(self, *, context=None):
self._crash_reports = {}
for (restricted, kind) in [
(False, ErrorReportKind.BLOCK_PROBE_FAIL),
(True, ErrorReportKind.DISK_PROBE_FAIL),
]:
try:
await self._probe_once_task.start(
context=context, restricted=restricted)
# We wait on the task directly here, not
# self._probe_once_task.wait as if _probe_once_task
# gets cancelled, we should be cancelled too.
await asyncio.wait_for(self._probe_once_task.task, 15.0)
except asyncio.CancelledError:
# asyncio.CancelledError is a subclass of Exception in
# Python 3.6 (sadface)
raise
except Exception:
block_discover_log.exception(
"block probing failed restricted=%s", restricted)
report = self.app.make_apport_report(kind, "block probing")
if report is not None:
self._crash_reports[restricted] = report
continue
break
@with_context()
def convert_autoinstall_config(self, context=None):
log.debug("self.ai_data = %s", self.ai_data)
if 'layout' in self.ai_data:
layout = self.ai_data['layout']
meth = getattr(self, "guided_" + layout['name'])
disk = self.model.disk_for_match(
self.model.all_disks(),
layout.get("match", {'size': 'largest'}))
meth(disk)
elif 'config' in self.ai_data:
self.model.apply_autoinstall_config(self.ai_data['config'])
self.model.grub = self.ai_data.get('grub', {})
self.model.swap = self.ai_data.get('swap')
def start(self):
if self.model.bootloader == Bootloader.PREP:
self.supports_resilient_boot = False
else:
release = lsb_release()['release']
self.supports_resilient_boot = release >= '20.04'
self._start_task = schedule_task(self._start())
async def _start(self):
context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(context)
self._monitor.filter_by(subsystem='block')
self._monitor.enable_receiving()
self.start_listening_udev()
await self._probe_task.start()
def start_listening_udev(self):
loop = asyncio.get_event_loop()
loop.add_reader(self._monitor.fileno(), self._udev_event)
def stop_listening_udev(self):
loop = asyncio.get_event_loop()
loop.remove_reader(self._monitor.fileno())
def _udev_event(self):
cp = run_command(['udevadm', 'settle', '-t', '0'])
if cp.returncode != 0:
log.debug("waiting 0.1 to let udev event queue settle")
self.stop_listening_udev()
loop = asyncio.get_event_loop()
loop.call_later(0.1, self.start_listening_udev)
return
# Drain the udev events in the queue -- if we stopped listening to
# allow udev to settle, it's good bet there is more than one event to
# process and we don't want to kick off a full block probe for each
# one. It's a touch unfortunate that pyudev doesn't have a
# non-blocking read so we resort to select().
while select.select([self._monitor.fileno()], [], [], 0)[0]:
action, dev = self._monitor.receive_device()
log.debug("_udev_event %s %s", action, dev)
self._probe_task.start_sync()
def make_autoinstall(self):
rendered = self.model.render()
r = {
'config': rendered['storage']['config']
}
if 'swap' in rendered:
r['swap'] = rendered['swap']
return r

View File

@ -125,6 +125,7 @@ class SubiquityServer(Application):
"Network",
"Proxy",
"Mirror",
"Filesystem",
"Install",
"Late",
]

View File

@ -67,8 +67,9 @@ class ProbingFailed(BaseView):
title = _("Probing for devices to install to failed")
def __init__(self, controller):
def __init__(self, controller, error_ref):
self.controller = controller
self.error_ref = error_ref
super().__init__(screen([
Text(_(fail_text)),
Text(""),
@ -81,5 +82,4 @@ class ProbingFailed(BaseView):
self.controller.cancel()
def show_error(self, sender=None):
self.controller.app.show_error_report(
self.controller._crash_reports[False])
self.controller.app.show_error_report(self.error_ref)

View File

@ -5,7 +5,7 @@ import urwid
from subiquitycore.testing import view_helpers
from subiquity.controllers.filesystem import FilesystemController
from subiquity.client.controllers.filesystem import FilesystemController
from subiquity.models.filesystem import (
Bootloader,
Disk,

View File

@ -21,7 +21,7 @@ import urwid
from subiquitycore.testing import view_helpers
from subiquitycore.view import BaseView
from subiquity.controllers.filesystem import FilesystemController
from subiquity.client.controllers.filesystem import FilesystemController
from subiquity.ui.views.filesystem.lvm import VolGroupStretchy
from subiquity.ui.views.filesystem.tests.test_partition import (
make_model_and_disk,

View File

@ -6,7 +6,7 @@ import urwid
from subiquitycore.testing import view_helpers
from subiquitycore.view import BaseView
from subiquity.controllers.filesystem import FilesystemController
from subiquity.client.controllers.filesystem import FilesystemController
from subiquity.models.filesystem import (
dehumanize_size,
)

View File

@ -6,7 +6,7 @@ import urwid
from subiquitycore.testing import view_helpers
from subiquitycore.view import BaseView
from subiquity.controllers.filesystem import FilesystemController
from subiquity.client.controllers.filesystem import FilesystemController
from subiquity.models.filesystem import (
raidlevels_by_value,
)

View File

@ -256,7 +256,7 @@ class InstallConfirmation(Stretchy):
def ok(self, sender):
if isinstance(self.app.ui.body, ProgressView):
self.app.ui.body.hide_continue()
if self.app.controllers.InstallProgress.showing:
if self.app.controllers.Progress.showing:
self.app.remove_global_overlay(self)
self.app.aio_loop.create_task(self.app.confirm_install())
else:
@ -277,8 +277,7 @@ You can wait for this to complete or switch to a shell.
class InstallRunning(Stretchy):
def __init__(self, parent, app, tty):
self.parent = parent
def __init__(self, app, tty):
self.app = app
self.btn = Toggleable(other_btn(
_("Switch to a shell"), on_press=self._debug_shell))