Merge pull request #817 from mwhudson/tuiapplication

remove all UI code from "core" application object, add tuiapplication
This commit is contained in:
Michael Hudson-Doyle 2020-09-10 11:31:30 +12:00 committed by GitHub
commit 01e9c04ab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 486 additions and 516 deletions

View File

@ -20,12 +20,12 @@ from console_conf.ui.views import (
ChooserConfirmView,
)
from subiquitycore.controller import BaseController
from subiquitycore.tuicontroller import TuiController
log = logging.getLogger("console_conf.controllers.chooser")
class RecoveryChooserBaseController(BaseController):
class RecoveryChooserBaseController(TuiController):
def __init__(self, app):
super().__init__(app)

View File

@ -20,9 +20,9 @@ import pwd
import shlex
import sys
from subiquitycore.controller import BaseController
from subiquitycore.ssh import host_key_info, get_ips_standalone
from subiquitycore.snapd import SnapdConnection
from subiquitycore.tuicontroller import TuiController
from subiquitycore.utils import disable_console_conf, run_command
from console_conf.ui.views import IdentityView, LoginView
@ -141,7 +141,7 @@ def write_login_details_standalone():
return 0
class IdentityController(BaseController):
class IdentityController(TuiController):
def __init__(self, app):
super().__init__(app)

View File

@ -15,10 +15,10 @@
from console_conf.ui.views import WelcomeView, ChooserWelcomeView
from subiquitycore.controller import BaseController
from subiquitycore.tuicontroller import TuiController
class WelcomeController(BaseController):
class WelcomeController(TuiController):
welcome_view = WelcomeView

View File

@ -15,7 +15,7 @@
import logging
from subiquitycore.core import Application
from subiquitycore.tui import TuiApplication
from console_conf.models.console_conf import ConsoleConfModel
from console_conf.models.systems import RecoverySystemsModel
@ -23,8 +23,9 @@ from console_conf.models.systems import RecoverySystemsModel
log = logging.getLogger("console_conf.core")
class ConsoleConf(Application):
class ConsoleConf(TuiApplication):
from console_conf import controllers as controllers_mod
project = "console_conf"
make_model = ConsoleConfModel
@ -36,8 +37,9 @@ class ConsoleConf(Application):
]
class RecoveryChooser(Application):
class RecoveryChooser(TuiApplication):
from console_conf import controllers as controllers_mod
project = "console_conf"
controllers = [

View File

@ -20,8 +20,10 @@ import jsonschema
from subiquitycore.context import with_context
from subiquitycore.controller import (
BaseController,
)
from subiquitycore.tuicontroller import (
RepeatedController,
Skip,
TuiController,
)
log = logging.getLogger("subiquity.controller")
@ -39,6 +41,9 @@ class SubiquityController(BaseController):
self.context.set('controller', self)
self.setup_autoinstall()
def interactive(self):
return False
def setup_autoinstall(self):
if self.app.autoinstall_config:
with self.context.child("load_autoinstall_data"):
@ -68,15 +73,6 @@ class SubiquityController(BaseController):
"""
pass
def interactive(self):
if not self.app.autoinstall_config:
return True
i_sections = self.app.autoinstall_config.get(
'interactive-sections', [])
if '*' in i_sections or self.autoinstall_key in i_sections:
return True
return False
def configured(self):
"""Let the world know that this controller's model is now configured.
"""
@ -90,16 +86,14 @@ class SubiquityController(BaseController):
return {}
class NoUIController(SubiquityController):
def start_ui(self):
raise Skip
def cancel(self):
pass
class SubiquityTuiController(SubiquityController, TuiController):
def interactive(self):
return False
if not self.app.autoinstall_config:
return True
i_sections = self.app.autoinstall_config.get(
'interactive-sections', [])
return '*' in i_sections or self.autoinstall_key in i_sections
class RepeatedController(RepeatedController):

View File

@ -18,10 +18,10 @@ import os
from subiquitycore.context import with_context
from subiquitycore.utils import arun_command
from subiquity.controller import NoUIController
from subiquity.controller import SubiquityController
class CmdListController(NoUIController):
class CmdListController(SubiquityController):
autoinstall_default = []
autoinstall_schema = {

View File

@ -13,10 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from subiquity.controller import NoUIController
from subiquity.controller import SubiquityController
class DebconfController(NoUIController):
class DebconfController(SubiquityController):
model_name = "debconf_selections"
autoinstall_key = "debconf-selections"

View File

@ -34,7 +34,7 @@ from subiquitycore.utils import (
from subiquity.common.errorreport import ErrorReportKind
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.models.filesystem import (
align_up,
Bootloader,
@ -61,7 +61,7 @@ PREP_GRUB_SIZE_BYTES = 8 * 1024 * 1024 # 8MiB
UEFI_GRUB_SIZE_BYTES = 512 * 1024 * 1024 # 512MiB EFI partition
class FilesystemController(SubiquityController):
class FilesystemController(SubiquityTuiController):
autoinstall_key = "storage"
autoinstall_schema = {'type': 'object'} # ...

View File

@ -19,13 +19,13 @@ import attr
from subiquitycore.context import with_context
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.ui.views import IdentityView
log = logging.getLogger('subiquity.controllers.identity')
class IdentityController(SubiquityController):
class IdentityController(SubiquityTuiController):
autoinstall_key = model_name = "identity"
autoinstall_schema = {

View File

@ -44,7 +44,7 @@ from subiquitycore.utils import (
)
from subiquity.common.errorreport import ErrorReportKind
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.journald import journald_listener
from subiquity.ui.views.installprogress import ProgressView
@ -78,7 +78,7 @@ class TracebackExtractor:
self.traceback.append(line)
class InstallProgressController(SubiquityController):
class InstallProgressController(SubiquityTuiController):
def __init__(self, app):
super().__init__(app)

View File

@ -20,14 +20,14 @@ import attr
from subiquitycore.async_helpers import schedule_task
from subiquitycore.context import with_context
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.models.keyboard import KeyboardSetting
from subiquity.ui.views import KeyboardView
log = logging.getLogger('subiquity.controllers.keyboard')
class KeyboardController(SubiquityController):
class KeyboardController(SubiquityTuiController):
autoinstall_key = model_name = "keyboard"
autoinstall_schema = {

View File

@ -27,7 +27,7 @@ from subiquitycore.async_helpers import (
)
from subiquitycore.context import with_context
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.ui.views.mirror import MirrorView
log = logging.getLogger('subiquity.controllers.mirror')
@ -40,7 +40,7 @@ class CheckState(enum.IntEnum):
DONE = enum.auto()
class MirrorController(SubiquityController):
class MirrorController(SubiquityTuiController):
autoinstall_key = "apt"
autoinstall_schema = { # This is obviously incomplete.

View File

@ -21,7 +21,7 @@ from subiquitycore.context import with_context
from subiquitycore.controllers.network import NetworkController
from subiquity.common.errorreport import ErrorReportKind
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
log = logging.getLogger("subiquity.controllers.network")
@ -65,7 +65,7 @@ NETPLAN_SCHEMA = {
}
class NetworkController(NetworkController, SubiquityController):
class NetworkController(NetworkController, SubiquityTuiController):
ai_data = None
autoinstall_key = "network"

View File

@ -13,10 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from subiquity.controller import NoUIController
from subiquity.controller import SubiquityController
class PackageController(NoUIController):
class PackageController(SubiquityController):
model_name = autoinstall_key = "packages"
autoinstall_default = []

View File

@ -18,13 +18,13 @@ import os
from subiquitycore.context import with_context
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.ui.views.proxy import ProxyView
log = logging.getLogger('subiquity.controllers.proxy')
class ProxyController(SubiquityController):
class ProxyController(SubiquityTuiController):
autoinstall_key = model_name = "proxy"
autoinstall_schema = {

View File

@ -22,12 +22,12 @@ from subiquitycore.async_helpers import schedule_task
from subiquitycore.context import with_context
from subiquitycore.utils import arun_command, run_command
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
log = logging.getLogger("subiquity.controllers.restart")
class RebootController(SubiquityController):
class RebootController(SubiquityTuiController):
def __init__(self, app):
super().__init__(app)

View File

@ -25,12 +25,12 @@ from subiquitycore.async_helpers import (
SingleInstanceTask,
)
from subiquitycore.context import with_context
from subiquitycore.controller import (
from subiquitycore.tuicontroller import (
Skip,
)
from subiquity.controller import (
SubiquityController,
SubiquityTuiController,
)
@ -43,7 +43,7 @@ class CheckState(enum.IntEnum):
UNAVAILABLE = enum.auto()
class RefreshController(SubiquityController):
class RefreshController(SubiquityTuiController):
autoinstall_key = "refresh-installer"
autoinstall_schema = {

View File

@ -29,7 +29,7 @@ from curtin.reporter.handlers import (
LogHandler,
)
from subiquity.controller import NoUIController
from subiquity.controller import SubiquityController
class LogHandler(LogHandler):
@ -46,7 +46,7 @@ INITIAL_CONFIG = {'logging': {'type': 'log'}}
NON_INTERACTIVE_CONFIG = {'builtin': {'type': 'print'}}
class ReportingController(NoUIController):
class ReportingController(SubiquityController):
autoinstall_key = "reporting"
autoinstall_schema = {

View File

@ -21,12 +21,12 @@ from subiquitycore.async_helpers import (
schedule_task,
)
from subiquitycore.context import with_context
from subiquitycore.controller import (
from subiquitycore.tuicontroller import (
Skip,
)
from subiquity.controller import (
SubiquityController,
SubiquityTuiController,
)
from subiquity.models.snaplist import SnapSelection
@ -104,7 +104,7 @@ class SnapdSnapInfoLoader:
return self.tasks[snap]
class SnapListController(SubiquityController):
class SnapListController(SubiquityTuiController):
autoinstall_key = "snaps"
autoinstall_default = []

View File

@ -20,7 +20,7 @@ from subiquitycore.async_helpers import schedule_task
from subiquitycore.context import with_context
from subiquitycore import utils
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.ui.views.ssh import SSHView
log = logging.getLogger('subiquity.controllers.ssh')
@ -32,7 +32,7 @@ class FetchSSHKeysFailure(Exception):
self.output = output
class SSHController(SubiquityController):
class SSHController(SubiquityTuiController):
autoinstall_key = model_name = "ssh"
autoinstall_schema = {

View File

@ -13,10 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from subiquity.controller import NoUIController
from subiquity.controller import SubiquityController
class UserdataController(NoUIController):
class UserdataController(SubiquityController):
model_name = 'userdata'
autoinstall_key = "user-data"

View File

@ -18,14 +18,14 @@ import os
from subiquitycore.screen import is_linux_tty
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.ui.views import WelcomeView
log = logging.getLogger('subiquity.controllers.welcome')
class WelcomeController(SubiquityController):
class WelcomeController(SubiquityTuiController):
autoinstall_key = model_name = "locale"
autoinstall_schema = {'type': 'string'}

View File

@ -24,7 +24,7 @@ from urwid import Text
from subiquitycore.ui.utils import Color
from subiquitycore.utils import run_command
from subiquity.controller import SubiquityController
from subiquity.controller import SubiquityTuiController
from subiquity.ui.views import ZdevView
@ -631,7 +631,7 @@ class ZdevInfo:
return self.type
class ZdevController(SubiquityController):
class ZdevController(SubiquityTuiController):
def __init__(self, app):
super().__init__(app)

View File

@ -31,8 +31,8 @@ from subiquitycore.async_helpers import (
run_in_thread,
schedule_task,
)
from subiquitycore.controller import Skip
from subiquitycore.core import Application
from subiquitycore.tuicontroller import Skip
from subiquitycore.tui import TuiApplication
from subiquitycore.snapd import (
AsyncSnapd,
FakeSnapdConnection,
@ -67,7 +67,7 @@ environment will not survive a reboot. If the install has started, the
installed system will be mounted at /target.""")
class Subiquity(Application):
class Subiquity(TuiApplication):
snapd_socket_path = '/run/snapd.socket'
@ -84,6 +84,7 @@ class Subiquity(Application):
'additionalProperties': True,
}
from subiquity import controllers as controllers_mod
project = "subiquity"
def make_model(self):

View File

@ -13,16 +13,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from abc import ABC, abstractmethod
from abc import ABC
import logging
log = logging.getLogger("subiquitycore.controller")
class Skip(Exception):
"""Raise this from a controller's start_ui method to skip a screen."""
class BaseController(ABC):
"""Base class for controllers."""
@ -31,12 +27,10 @@ class BaseController(ABC):
def __init__(self, app):
self.name = type(self).__name__[:-len("Controller")]
self.ui = app.ui
self.signal = app.signal
self.opts = app.opts
self.app = app
self.context = self.app.context.child(self.name, childlevel="DEBUG")
self.answers = app.answers.get(self.name, {})
if self.model_name is not None:
self.model = getattr(self.app.base_model, self.model_name)
@ -57,92 +51,9 @@ class BaseController(ABC):
"""
pass
@abstractmethod
def cancel(self):
pass
@property
def showing(self):
inst = self.app.controllers.cur
while isinstance(inst, RepeatedController):
inst = inst.orig
return inst is self
@abstractmethod
def start_ui(self):
"""Start running this controller's UI.
This method should call self.ui.set_body.
"""
def end_ui(self):
"""Stop running this controller's UI.
This method doesn't actually need to remove this controller's UI
as the next one is about to replace it, it's more of a hook to
stop any background tasks that can be stopped when the UI is not
running.
"""
def serialize(self):
return None
def deserialize(self, data):
if data is not None:
raise Exception("missing deserialize method on {}".format(self))
# Stuff for fine grained actions, used by filesystem and network
# controller at time of writing this comment.
def _enter_form_data(self, form, data, submit, clean_suffix=''):
for k, v in data.items():
c = getattr(
self, '_action_clean_{}_{}'.format(k, clean_suffix), None)
if c is None:
c = getattr(self, '_action_clean_{}'.format(k), lambda x: x)
field = getattr(form, k)
from subiquitycore.ui.selector import Selector
v = c(v)
if isinstance(field.widget, Selector):
field.widget._emit('select', v)
field.value = v
yield
yield
for bf in form._fields:
bf.validate()
form.validated()
if submit:
if not form.done_btn.enabled:
raise Exception("answers left form invalid!")
form._click_done(None)
def _run_actions(self, actions):
for action in actions:
yield from self._answers_action(action)
def _run_iterator(self, it, delay=None):
if delay is None:
delay = 0.2/self.app.scale_factor
try:
next(it)
except StopIteration:
return
self.app.aio_loop.call_later(delay, self._run_iterator, it, delay/1.1)
class RepeatedController(BaseController):
def __init__(self, orig, index):
self.name = "{}-{}".format(orig.name, index)
self.orig = orig
self.index = index
self.context = orig.context
def register_signals(self):
pass
def start_ui(self):
self.orig.start_ui(self.index)
def cancel(self):
self.orig.cancel()

View File

@ -24,13 +24,13 @@ from probert.network import IFF_UP, NetworkEventReceiver
from subiquitycore.async_helpers import SingleInstanceTask
from subiquitycore.context import with_context
from subiquitycore.controller import BaseController
from subiquitycore.file_util import write_file
from subiquitycore.models.network import (
BondParameters,
NetDevAction,
)
from subiquitycore import netplan
from subiquitycore.tuicontroller import TuiController
from subiquitycore.ui.stretchy import StretchyOverlay
from subiquitycore.ui.views.network import (
NetworkView,
@ -134,7 +134,7 @@ network:
'''
class NetworkController(BaseController):
class NetworkController(TuiController):
model_name = "network"
root = "/"

View File

@ -14,151 +14,20 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import fcntl
import json
import logging
import os
import struct
import sys
import urwid
import yaml
from subiquitycore.async_helpers import schedule_task
from subiquitycore.context import (
Context,
)
from subiquitycore.controller import (
Skip,
)
from subiquitycore.palette import PALETTE_COLOR, PALETTE_MONO
from subiquitycore.controllerset import ControllerSet
from subiquitycore.prober import Prober
from subiquitycore.screen import is_linux_tty, make_screen
from subiquitycore.signals import Signal
from subiquitycore.ui.frame import SubiquityCoreUI
from subiquitycore.utils import arun_command
log = logging.getLogger('subiquitycore.core')
# /usr/include/linux/kd.h
K_RAW = 0x00
K_XLATE = 0x01
K_MEDIUMRAW = 0x02
K_UNICODE = 0x03
K_OFF = 0x04
KDGKBMODE = 0x4B44 # gets current keyboard mode
KDSKBMODE = 0x4B45 # sets current keyboard mode
def extend_dec_special_charmap():
urwid.escape.DEC_SPECIAL_CHARMAP.update({
ord('\N{BLACK RIGHT-POINTING SMALL TRIANGLE}'): '>',
ord('\N{BLACK LEFT-POINTING SMALL TRIANGLE}'): '<',
ord('\N{BLACK DOWN-POINTING SMALL TRIANGLE}'): 'v',
ord('\N{BLACK UP-POINTING SMALL TRIANGLE}'): '^',
ord('\N{check mark}'): '+',
ord('\N{bullet}'): '*',
ord('\N{lower half block}'): '=',
ord('\N{upper half block}'): '=',
ord('\N{FULL BLOCK}'): urwid.escape.DEC_SPECIAL_CHARMAP[
ord('\N{BOX DRAWINGS LIGHT VERTICAL}')],
})
class KeyCodesFilter:
"""input_filter that can pass (medium) raw keycodes to the application
See http://lct.sourceforge.net/lct/x60.html for terminology.
Call enter_keycodes_mode()/exit_keycodes_mode() to switch into and
out of keycodes mode. In keycodes mode, the only events passed to
the application are "press $N" / "release $N" where $N is the
keycode the user pressed or released.
Much of this is cribbed from the source of the "showkeys" utility.
"""
def __init__(self):
self._fd = os.open("/proc/self/fd/"+str(sys.stdin.fileno()), os.O_RDWR)
self.filtering = False
def enter_keycodes_mode(self):
log.debug("enter_keycodes_mode")
self.filtering = True
# Read the old keyboard mode (it will proably always be K_UNICODE but
# well).
o = bytearray(4)
fcntl.ioctl(self._fd, KDGKBMODE, o)
self._old_mode = struct.unpack('i', o)[0]
# Set the keyboard mode to K_MEDIUMRAW, which causes the keyboard
# driver in the kernel to pass us keycodes.
fcntl.ioctl(self._fd, KDSKBMODE, K_MEDIUMRAW)
def exit_keycodes_mode(self):
log.debug("exit_keycodes_mode")
self.filtering = False
fcntl.ioctl(self._fd, KDSKBMODE, self._old_mode)
def filter(self, keys, codes):
# Luckily urwid passes us the raw results from read() we can
# turn into keycodes.
if self.filtering:
i = 0
r = []
n = len(codes)
while i < len(codes):
# This is straight from showkeys.c.
if codes[i] & 0x80:
p = 'release '
else:
p = 'press '
if i + 2 < n and (codes[i] & 0x7f) == 0:
if (codes[i + 1] & 0x80) != 0:
if (codes[i + 2] & 0x80) != 0:
kc = (((codes[i + 1] & 0x7f) << 7) |
(codes[i + 2] & 0x7f))
i += 3
else:
kc = codes[i] & 0x7f
i += 1
r.append(p + str(kc))
return r
else:
return keys
class DummyKeycodesFilter:
# A dummy implementation of the same interface as KeyCodesFilter
# we can use when not running in a linux tty.
def enter_keycodes_mode(self):
pass
def exit_keycodes_mode(self):
pass
def filter(self, keys, codes):
return keys
class AsyncioEventLoop(urwid.AsyncioEventLoop):
# This is fixed in the latest urwid.
def _exception_handler(self, loop, context):
exc = context.get('exception')
if exc:
log.debug("_exception_handler %r", exc)
loop.stop()
if not isinstance(exc, urwid.ExitMainLoop):
# Store the exc_info so we can re-raise after the loop stops
self._exc_info = (type(exc), exc, exc.__traceback__)
else:
loop.default_exception_handler(context)
class Application:
# A concrete subclass must set project and controllers attributes, e.g.:
@ -175,9 +44,8 @@ class Application:
# controllers in order, calling the start_ui method on the controller
# instance.
make_ui = SubiquityCoreUI
def __init__(self, opts):
self._exc = None
self.debug_flags = ()
if opts.dry_run:
# Recognized flags are:
@ -192,7 +60,6 @@ class Application:
prober = Prober(opts.machine_config, self.debug_flags)
self.ui = self.make_ui()
self.opts = opts
opts.project = self.project
@ -202,58 +69,30 @@ class Application:
self.state_dir = os.path.join(self.root, 'run', self.project)
os.makedirs(self.state_path('states'), exist_ok=True)
self.answers = {}
if opts.answers is not None:
self.answers = yaml.safe_load(opts.answers.read())
log.debug("Loaded answers %s", self.answers)
if not opts.dry_run:
open('/run/casper-no-prompt', 'w').close()
# Set rich_mode to the opposite of what we want, so we can
# call toggle_rich to get the right things set up.
self.rich_mode = opts.run_on_serial
if is_linux_tty():
self.input_filter = KeyCodesFilter()
else:
self.input_filter = DummyKeycodesFilter()
self.scale_factor = float(
os.environ.get('SUBIQUITY_REPLAY_TIMESCALE', "1"))
self.updated = os.path.exists(self.state_path('updating'))
self.signal = Signal()
self.prober = prober
self.new_event_loop()
self.urwid_loop = None
controllers_mod = __import__(
'{}.controllers'.format(self.project), None, None, [''])
self.controllers = ControllerSet(
controllers_mod, self.controllers, init_args=(self,))
self.controllers_mod, self.controllers, init_args=(self,))
self.context = Context.new(self)
def _exception_handler(self, loop, context):
exc = context.get('exception')
if exc:
loop.stop()
self._exc = exc
else:
loop.default_exception_handler(context)
def new_event_loop(self):
new_loop = asyncio.new_event_loop()
new_loop.set_exception_handler(self._exception_handler)
asyncio.set_event_loop(new_loop)
self.aio_loop = new_loop
def run_command_in_foreground(self, cmd, before_hook=None, after_hook=None,
**kw):
screen = self.urwid_loop.screen
async def _run():
await arun_command(
cmd, stdin=None, stdout=None, stderr=None, **kw)
screen.start()
if after_hook is not None:
after_hook()
screen.stop()
urwid.emit_signal(
screen, urwid.display_common.INPUT_DESCRIPTORS_CHANGED)
if before_hook is not None:
before_hook()
schedule_task(_run())
def _connect_base_signals(self):
"""Connect signals used in the core controller."""
# Registers signals from each controller
@ -271,51 +110,6 @@ class Application:
with open(self.state_path('states', cur.name), 'w') as fp:
json.dump(cur.serialize(), fp)
def select_screen(self, new):
new.context.enter("starting UI")
if self.opts.screens and new.name not in self.opts.screens:
raise Skip
try:
new.start_ui()
except Skip:
new.context.exit("(skipped)")
raise
with open(self.state_path('last-screen'), 'w') as fp:
fp.write(new.name)
def _move_screen(self, increment):
self.save_state()
old = self.controllers.cur
if old is not None:
old.context.exit("completed")
old.end_ui()
cur_index = self.controllers.index
while True:
self.controllers.index += increment
if self.controllers.index < 0:
self.controllers.index = cur_index
return
if self.controllers.index >= len(self.controllers.instances):
self.exit()
new = self.controllers.cur
try:
self.select_screen(new)
except Skip:
log.debug("skipping screen %s", new.name)
continue
else:
return
def next_screen(self, *args):
self._move_screen(1)
def prev_screen(self, *args):
self._move_screen(-1)
def select_initial_screen(self, controller_index):
self.controllers.index = controller_index - 1
self.next_screen()
def report_start_event(self, context, description):
log = logging.getLogger(context.full_name())
level = getattr(logging, context.level)
@ -328,100 +122,9 @@ class Application:
# EventLoop -------------------------------------------------------------------
def _remove_last_screen(self):
last_screen = self.state_path('last-screen')
if os.path.exists(last_screen):
os.unlink(last_screen)
def exit(self):
self._remove_last_screen()
self.aio_loop.stop()
def run_scripts(self, scripts):
# run_scripts runs (or rather arranges to run, it's all async)
# a series of python snippets in a helpful namespace. This is
# all in aid of being able to test some part of the UI without
# having to click the same buttons over and over again to get
# the UI to the part you are working on.
#
# In the namespace are:
# * everything from view_helpers
# * wait, delay execution of subsequent scripts for a while
# * c, a function that finds a button and clicks it. uses
# wait, above to wait for the button to appear in case it
# takes a while.
from subiquitycore.testing import view_helpers
class ScriptState:
def __init__(self):
self.ns = view_helpers.__dict__.copy()
self.waiting = False
self.wait_count = 0
self.scripts = scripts
ss = ScriptState()
def _run_script():
log.debug("running %s", ss.scripts[0])
exec(ss.scripts[0], ss.ns)
if ss.waiting:
return
ss.scripts = ss.scripts[1:]
if ss.scripts:
self.aio_loop.call_soon(_run_script)
def c(pat):
but = view_helpers.find_button_matching(self.ui, '.*' + pat + '.*')
if not but:
ss.wait_count += 1
if ss.wait_count > 10:
raise Exception("no button found matching %r after"
"waiting for 10 secs" % pat)
wait(1, func=lambda: c(pat))
return
ss.wait_count = 0
view_helpers.click(but)
def wait(delay, func=None):
ss.waiting = True
def next():
ss.waiting = False
if func is not None:
func()
if not ss.waiting:
ss.scripts = ss.scripts[1:]
if ss.scripts:
_run_script()
self.aio_loop.call_later(delay, next)
ss.ns['c'] = c
ss.ns['wait'] = wait
ss.ns['ui'] = self.ui
self.aio_loop.call_later(0.06, _run_script)
def toggle_rich(self):
if self.rich_mode:
urwid.util.set_encoding('ascii')
new_palette = PALETTE_MONO
self.rich_mode = False
else:
urwid.util.set_encoding('utf-8')
new_palette = PALETTE_COLOR
self.rich_mode = True
urwid.CanvasCache.clear()
self.urwid_loop.screen.register_palette(new_palette)
self.urwid_loop.screen.clear()
def unhandled_input(self, key):
if self.opts.dry_run and key == 'ctrl x':
self.exit()
elif key == 'f3':
self.urwid_loop.screen.clear()
elif self.opts.run_on_serial and key in ['ctrl t', 'f4']:
self.toggle_rich()
def start_controllers(self):
log.debug("starting controllers")
for controller in self.controllers.instances:
@ -436,56 +139,17 @@ class Application:
with open(state_path) as fp:
controller.deserialize(json.load(fp))
last_screen = None
state_path = self.state_path('last-screen')
if os.path.exists(state_path):
with open(state_path) as fp:
last_screen = fp.read().strip()
controller_index = 0
for i, controller in enumerate(self.controllers.instances):
if controller.name == last_screen:
controller_index = i
# Screens that have already been seen should be marked as configured.
for controller in self.controllers.instances[:controller_index]:
controller.configured()
return controller_index
def make_screen(self, inputf=None, outputf=None):
return make_screen(self.opts.ascii, inputf, outputf)
def run(self, input=None, output=None):
log.debug("Application.run")
self.urwid_loop = urwid.MainLoop(
self.ui, screen=self.make_screen(input, output),
handle_mouse=False, pop_ups=True,
input_filter=self.input_filter.filter,
unhandled_input=self.unhandled_input,
event_loop=AsyncioEventLoop(loop=self.aio_loop))
extend_dec_special_charmap()
self.toggle_rich()
def run(self):
self.base_model = self.make_model()
try:
if self.opts.scripts:
self.run_scripts(self.opts.scripts)
self.controllers.load_all()
initial_controller_index = 0
if self.updated:
initial_controller_index = self.load_serialized_state()
self.aio_loop.call_soon(
self.select_initial_screen, initial_controller_index)
self.load_serialized_state()
self._connect_base_signals()
self.start_controllers()
self.urwid_loop.run()
except Exception:
log.exception("Exception in controller.run():")
raise
self.aio_loop.run_forever()
finally:
self.aio_loop.run_until_complete(
self.aio_loop.shutdown_asyncgens())
if self._exc:
exc, self._exc = self._exc, None
raise exc

279
subiquitycore/tui.py Normal file
View File

@ -0,0 +1,279 @@
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
import yaml
import urwid
from subiquitycore.async_helpers import schedule_task
from subiquitycore.core import Application
from subiquitycore.palette import (
PALETTE_COLOR,
PALETTE_MONO,
)
from subiquitycore.screen import make_screen
from subiquitycore.tuicontroller import Skip
from subiquitycore.ui.frame import SubiquityCoreUI
from subiquitycore.utils import arun_command
log = logging.getLogger('subiquitycore.tui')
def extend_dec_special_charmap():
urwid.escape.DEC_SPECIAL_CHARMAP.update({
ord('\N{BLACK RIGHT-POINTING SMALL TRIANGLE}'): '>',
ord('\N{BLACK LEFT-POINTING SMALL TRIANGLE}'): '<',
ord('\N{BLACK DOWN-POINTING SMALL TRIANGLE}'): 'v',
ord('\N{BLACK UP-POINTING SMALL TRIANGLE}'): '^',
ord('\N{check mark}'): '+',
ord('\N{bullet}'): '*',
ord('\N{lower half block}'): '=',
ord('\N{upper half block}'): '=',
ord('\N{FULL BLOCK}'): urwid.escape.DEC_SPECIAL_CHARMAP[
ord('\N{BOX DRAWINGS LIGHT VERTICAL}')],
})
class TuiApplication(Application):
make_ui = SubiquityCoreUI
def __init__(self, opts):
super().__init__(opts)
self.ui = self.make_ui()
self.answers = {}
if opts.answers is not None:
self.answers = yaml.safe_load(opts.answers.read())
log.debug("Loaded answers %s", self.answers)
if not opts.dry_run:
open('/run/casper-no-prompt', 'w').close()
# Set rich_mode to the opposite of what we want, so we can
# call toggle_rich to get the right things set up.
self.rich_mode = opts.run_on_serial
self.urwid_loop = None
self.cur_screen = None
def _remove_last_screen(self):
last_screen = self.state_path('last-screen')
if os.path.exists(last_screen):
os.unlink(last_screen)
def exit(self):
self._remove_last_screen()
super().exit()
def run_command_in_foreground(self, cmd, before_hook=None, after_hook=None,
**kw):
screen = self.urwid_loop.screen
async def _run():
await arun_command(
cmd, stdin=None, stdout=None, stderr=None, **kw)
screen.start()
if after_hook is not None:
after_hook()
screen.stop()
urwid.emit_signal(
screen, urwid.display_common.INPUT_DESCRIPTORS_CHANGED)
if before_hook is not None:
before_hook()
schedule_task(_run())
def select_screen(self, new):
new.context.enter("starting UI")
if self.opts.screens and new.name not in self.opts.screens:
raise Skip
try:
new.start_ui()
self.cur_screen = new
except Skip:
new.context.exit("(skipped)")
raise
with open(self.state_path('last-screen'), 'w') as fp:
fp.write(new.name)
def _move_screen(self, increment):
self.save_state()
old, self.cur_screen = self.cur_screen, None
if old is not None:
old.context.exit("completed")
old.end_ui()
cur_index = self.controllers.index
while True:
self.controllers.index += increment
if self.controllers.index < 0:
self.controllers.index = cur_index
return
if self.controllers.index >= len(self.controllers.instances):
self.exit()
return
new = self.controllers.cur
try:
self.select_screen(new)
except Skip:
log.debug("skipping screen %s", new.name)
continue
else:
return
def next_screen(self, *args):
self._move_screen(1)
def prev_screen(self, *args):
self._move_screen(-1)
def select_initial_screen(self, controller_index):
for controller in self.controllers.instances[:controller_index]:
controller.configured()
self.controllers.index = controller_index - 1
for controller in self.controllers.instances[:controller_index]:
controller.configured()
self.next_screen()
def run_scripts(self, scripts):
# run_scripts runs (or rather arranges to run, it's all async)
# a series of python snippets in a helpful namespace. This is
# all in aid of being able to test some part of the UI without
# having to click the same buttons over and over again to get
# the UI to the part you are working on.
#
# In the namespace are:
# * everything from view_helpers
# * wait, delay execution of subsequent scripts for a while
# * c, a function that finds a button and clicks it. uses
# wait, above to wait for the button to appear in case it
# takes a while.
from subiquitycore.testing import view_helpers
class ScriptState:
def __init__(self):
self.ns = view_helpers.__dict__.copy()
self.waiting = False
self.wait_count = 0
self.scripts = scripts
ss = ScriptState()
def _run_script():
log.debug("running %s", ss.scripts[0])
exec(ss.scripts[0], ss.ns)
if ss.waiting:
return
ss.scripts = ss.scripts[1:]
if ss.scripts:
self.aio_loop.call_soon(_run_script)
def c(pat):
but = view_helpers.find_button_matching(self.ui, '.*' + pat + '.*')
if not but:
ss.wait_count += 1
if ss.wait_count > 10:
raise Exception("no button found matching %r after"
"waiting for 10 secs" % pat)
wait(1, func=lambda: c(pat))
return
ss.wait_count = 0
view_helpers.click(but)
def wait(delay, func=None):
ss.waiting = True
def next():
ss.waiting = False
if func is not None:
func()
if not ss.waiting:
ss.scripts = ss.scripts[1:]
if ss.scripts:
_run_script()
self.aio_loop.call_later(delay, next)
ss.ns['c'] = c
ss.ns['wait'] = wait
ss.ns['ui'] = self.ui
self.aio_loop.call_later(0.06, _run_script)
def toggle_rich(self):
if self.rich_mode:
urwid.util.set_encoding('ascii')
new_palette = PALETTE_MONO
self.rich_mode = False
else:
urwid.util.set_encoding('utf-8')
new_palette = PALETTE_COLOR
self.rich_mode = True
urwid.CanvasCache.clear()
self.urwid_loop.screen.register_palette(new_palette)
self.urwid_loop.screen.clear()
def unhandled_input(self, key):
if self.opts.dry_run and key == 'ctrl x':
self.exit()
elif key == 'f3':
self.urwid_loop.screen.clear()
elif self.opts.run_on_serial and key in ['ctrl t', 'f4']:
self.toggle_rich()
def extra_urwid_loop_args(self):
return {}
def make_screen(self, inputf=None, outputf=None):
return make_screen(self.opts.ascii, inputf, outputf)
def start_urwid(self, input=None, output=None):
screen = self.make_screen(input, output)
screen.register_palette(PALETTE_COLOR)
self.urwid_loop = urwid.MainLoop(
self.ui, screen=screen,
handle_mouse=False, pop_ups=True,
unhandled_input=self.unhandled_input,
event_loop=urwid.AsyncioEventLoop(loop=self.aio_loop),
**self.extra_urwid_loop_args()
)
extend_dec_special_charmap()
self.toggle_rich()
self.urwid_loop.start()
def initial_controller_index(self):
if not self.updated:
return 0
state_path = self.state_path('last-screen')
if not os.path.exists(state_path):
return 0
with open(state_path) as fp:
last_screen = fp.read().strip()
controller_index = 0
for i, controller in enumerate(self.controllers.instances):
if controller.name == last_screen:
controller_index = i
return controller_index
def run(self):
if self.opts.scripts:
self.run_scripts(self.opts.scripts)
self.aio_loop.call_soon(self.start_urwid)
self.aio_loop.call_soon(
lambda: self.select_initial_screen(
self.initial_controller_index()))
try:
super().run()
finally:
self.urwid_loop.stop()

View File

@ -0,0 +1,119 @@
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from abc import abstractmethod
import logging
from subiquitycore.controller import BaseController
log = logging.getLogger("subiquitycore.tuicontroller")
class Skip(Exception):
"""Raise this from a controller's start_ui method to skip a screen."""
class TuiController(BaseController):
"""Base class for controllers."""
def __init__(self, app):
super().__init__(app)
self.ui = app.ui
self.answers = app.answers.get(self.name, {})
@abstractmethod
def cancel(self):
pass
@property
def showing(self):
inst = self.app.controllers.cur
while isinstance(inst, RepeatedController):
inst = inst.orig
return inst is self
@abstractmethod
def start_ui(self):
"""Start running this controller's UI.
This method should call self.ui.set_body.
"""
def end_ui(self):
"""Stop running this controller's UI.
This method doesn't actually need to remove this controller's UI
as the next one is about to replace it, it's more of a hook to
stop any background tasks that can be stopped when the UI is not
running.
"""
# Stuff for fine grained actions, used by filesystem and network
# controller at time of writing this comment.
def _enter_form_data(self, form, data, submit, clean_suffix=''):
for k, v in data.items():
c = getattr(
self, '_action_clean_{}_{}'.format(k, clean_suffix), None)
if c is None:
c = getattr(self, '_action_clean_{}'.format(k), lambda x: x)
field = getattr(form, k)
from subiquitycore.ui.selector import Selector
v = c(v)
if isinstance(field.widget, Selector):
field.widget._emit('select', v)
field.value = v
yield
for bf in form._fields:
bf.validate()
form.validated()
if submit:
if not form.done_btn.enabled:
raise Exception("answers left form invalid!")
form._click_done(None)
def _run_actions(self, actions):
for action in actions:
yield from self._answers_action(action)
def _run_iterator(self, it, delay=None):
if delay is None:
delay = 0.2/self.app.scale_factor
try:
next(it)
except StopIteration:
return
self.app.aio_loop.call_later(delay, self._run_iterator, it, delay/1.1)
class RepeatedController(BaseController):
def __init__(self, orig, index):
self.name = "{}-{}".format(orig.name, index)
self.orig = orig
self.index = index
self.context = orig.context
def register_signals(self):
pass
def start_ui(self):
self.orig.start_ui(self.index)
def end_ui(self):
self.orig.end_ui()
def cancel(self):
self.orig.cancel()