Merge pull request #655 from bboozzoo/bboozzoo/uc20-recovery-chooser

console_conf: implement UC20 recovery chooser
This commit is contained in:
Dimitri John Ledkov 2020-04-01 22:05:30 +01:00 committed by GitHub
commit 590b4b8f23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1074 additions and 27 deletions

View File

@ -33,6 +33,12 @@ i18n:
dryrun: probert i18n
$(MAKE) ui-view DRYRUN="--dry-run --bootloader uefi"
dryrun-console-conf:
$(MAKE) ui-view-console-conf DRYRUN="--dry-run"
ui-view-console-conf:
$(PYTHON) -m console_conf.cmd.tui $(DRYRUN) $(MACHARGS)
ui-view:
$(PYTHON) -m subiquity $(DRYRUN) $(MACHARGS)

View File

@ -16,10 +16,11 @@
import argparse
import sys
import os
import logging
from subiquitycore.log import setup_logger
from subiquitycore import __version__ as VERSION
from console_conf.core import ConsoleConf
from console_conf.core import ConsoleConf, RecoveryChooser
class ClickAction(argparse.Action):
@ -53,6 +54,10 @@ def parse_options(argv):
parser.add_argument('--click', metavar="PAT", action=ClickAction,
help='Synthesize a click on a button matching PAT')
parser.add_argument('--answers')
parser.add_argument('--recovery-chooser-mode', action='store_true',
dest='chooser_systems',
help=('Run as a recovery chooser interacting with the '
'calling process over stdin/stdout streams'))
return parser.parse_args(argv)
@ -69,9 +74,30 @@ def main():
logger.info("Starting console-conf v{}".format(VERSION))
logger.info("Arguments passed: {}".format(sys.argv))
if opts.chooser_systems:
# when running as a chooser, the stdin/stdout streams are set up by the
# process that runs us, attempt to restore the tty in/out by looking at
# stderr
chooser_input, chooser_output = restore_std_streams_from(sys.stderr)
interface = RecoveryChooser(opts, chooser_input, chooser_output)
else:
interface = ConsoleConf(opts)
interface.run()
def restore_std_streams_from(from_file):
"""
Attempt to restore the original sys.std{in,out} streams by inspecting the
tty that stderr is hooked up to. Returns the chooser input/output streams.
"""
tty = os.ttyname(from_file.fileno())
# we have tty now
chooser_input, chooser_output = sys.stdin, sys.stdout
sys.stdin = open(tty, 'r')
sys.stdout = open(tty, 'w')
return chooser_input, chooser_output
if __name__ == '__main__':
sys.exit(main())

View File

@ -17,9 +17,17 @@
from .identity import IdentityController
from subiquitycore.controllers.network import NetworkController
from .welcome import WelcomeController
from .welcome import WelcomeController, RecoveryChooserWelcomeController
from .chooser import (
RecoveryChooserController,
RecoveryChooserConfirmController
)
__all__ = [
'IdentityController',
'NetworkController',
'WelcomeController',
'RecoveryChooserWelcomeController',
'RecoveryChooserController',
'RecoveryChooserConfirmController',
]

View File

@ -0,0 +1,67 @@
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from console_conf.ui.views import (
ChooserView,
ChooserCurrentSystemView,
ChooserConfirmView,
)
from subiquitycore.controller import BaseController
log = logging.getLogger("console_conf.controllers.chooser")
class RecoveryChooserBaseController(BaseController):
def __init__(self, app):
super().__init__(app)
self.model = app.base_model
def cancel(self):
# exit without taking any action
self.app.exit()
class RecoveryChooserController(RecoveryChooserBaseController):
def start_ui(self):
if self.model.current and self.model.current.actions:
# only when we have a current system and it has actions available
more = len(self.model.systems) > 1
view = ChooserCurrentSystemView(self, self.model.current,
has_more=more)
else:
view = ChooserView(self, self.model.systems)
self.ui.set_body(view)
def select(self, system, action):
self.model.select(system, action)
self.app.next_screen()
def more_options(self):
self.ui.set_body(ChooserView(self, self.model.systems))
class RecoveryChooserConfirmController(RecoveryChooserBaseController):
def start_ui(self):
view = ChooserConfirmView(self, self.model.selection)
self.ui.set_body(view)
def confirm(self):
log.warning("user action %s", self.model.selection)
# output the choice
self.app.respond(self.model.selection)
self.app.exit()

View File

@ -0,0 +1,189 @@
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from unittest import mock
from subiquitycore.context import Context
from console_conf.controllers.chooser import (
RecoveryChooserController,
RecoveryChooserConfirmController,
)
from console_conf.models.systems import (
RecoverySystemsModel,
SelectedSystemAction,
)
class MockedApplication:
signal = loop = None
project = "mini"
autoinstall_config = {}
answers = {}
opts = None
def make_app():
app = MockedApplication()
app.ui = mock.Mock()
app.base_model = mock.Mock()
app.context = Context.new(app)
app.exit = mock.Mock()
app.respond = mock.Mock()
app.next_screen = mock.Mock()
return app
model1_non_current = {
"current": False,
"label": "1234",
"brand": {
"display-name": "brand 1",
"username": "brand-1",
"id": "brand-1-id",
"validation": "certified",
},
"model": {
"display-name": "model 1",
"brand-id": "brand-1",
"model": "model-1",
},
"actions": [
{"title": "action 1", "mode": "action1"},
{"title": "action 2", "mode": "action2"},
]
}
model2_current = {
"current": True,
"label": "other-label",
"brand": {
"display-name": "brand 2",
"username": "brand-2",
"id": "brand-2-id",
"validation": "unproven",
},
"model": {
"display-name": "model 2",
"brand-id": "brand-2",
"model": "model-2",
},
"actions": [
{"title": "action 1", "mode": "action1"},
]
}
def make_model():
return RecoverySystemsModel.from_systems([model1_non_current,
model2_current])
class TestChooserConfirmController(unittest.TestCase):
def test_abort(self):
app = make_app()
c = RecoveryChooserConfirmController(app)
c.cancel()
app.respond.assert_not_called()
app.exit.assert_called()
def test_confirm(self):
app = make_app()
c = RecoveryChooserConfirmController(app)
c.model = mock.Mock(selection='selection')
c.confirm()
app.respond.assert_called_with('selection')
app.exit.assert_called()
@mock.patch('console_conf.controllers.chooser.ChooserConfirmView')
def test_confirm_view(self, ccv):
app = make_app()
c = RecoveryChooserConfirmController(app)
c.model = make_model()
c.model.select(c.model.systems[0], c.model.systems[0].actions[0])
c.start_ui()
ccv.assert_called()
class TestChooserController(unittest.TestCase):
def test_abort(self):
app = make_app()
c = RecoveryChooserController(app)
c.cancel()
app.respond.assert_not_called()
app.exit.assert_called()
def test_select(self):
app = make_app()
c = RecoveryChooserController(app)
c.model = make_model()
c.select(c.model.systems[0], c.model.systems[0].actions[0])
exp = SelectedSystemAction(system=c.model.systems[0],
action=c.model.systems[0].actions[0])
self.assertEqual(c.model.selection, exp)
app.next_screen.assert_called()
app.respond.assert_not_called()
app.exit.assert_not_called()
@mock.patch('console_conf.controllers.chooser.ChooserCurrentSystemView')
@mock.patch('console_conf.controllers.chooser.ChooserView')
def test_current_ui_first(self, cv, ccsv):
app = make_app()
c = RecoveryChooserController(app)
c.model = make_model()
c.start_ui()
# current system view is constructed
ccsv.assert_called_with(c, c.model.current, has_more=True)
# but the all systems view is not
cv.assert_not_called()
ccsv.reset_mock()
# user selects more options and the view is replaced
c.more_options()
# we get the all systems view now
cv.assert_called_with(c, c.model.systems)
# and the current system view was not constructed
ccsv.assert_not_called()
@mock.patch('console_conf.controllers.chooser.ChooserCurrentSystemView')
@mock.patch('console_conf.controllers.chooser.ChooserView')
def test_only_one_and_current(self, cv, ccsv):
app = make_app()
c = RecoveryChooserController(app)
c.model = RecoverySystemsModel.from_systems([model2_current])
c.start_ui()
# current system view is constructed
ccsv.assert_called_with(c, c.model.current, has_more=False)
# but the all systems view is not
cv.assert_not_called()
@mock.patch('console_conf.controllers.chooser.ChooserCurrentSystemView')
@mock.patch('console_conf.controllers.chooser.ChooserView')
def test_all_systems_first_no_current(self, cv, ccsv):
app = make_app()
c = RecoveryChooserController(app)
c.model = RecoverySystemsModel.from_systems([model1_non_current])
# sanity
self.assertIsNone(c.model.current)
c.start_ui()
# we get the all systems view now
cv.assert_called()
# current system view is not constructed
ccsv.assert_not_called()

View File

@ -13,15 +13,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from console_conf.ui.views import WelcomeView
from console_conf.ui.views import WelcomeView, ChooserWelcomeView
from subiquitycore.controller import BaseController
class WelcomeController(BaseController):
welcome_view = WelcomeView
def start_ui(self):
view = WelcomeView(self)
view = self.welcome_view(self)
self.ui.set_body(view)
def done(self):
@ -30,3 +32,7 @@ class WelcomeController(BaseController):
def cancel(self):
# Can't go back from here!
pass
class RecoveryChooserWelcomeController(WelcomeController):
welcome_view = ChooserWelcomeView

View File

@ -18,8 +18,9 @@ import logging
from subiquitycore.core import Application
from console_conf.models.console_conf import ConsoleConfModel
from console_conf.models.systems import RecoverySystemsModel
log = logging.getLogger('console_conf.core')
log = logging.getLogger("console_conf.core")
class ConsoleConf(Application):
@ -35,3 +36,33 @@ class ConsoleConf(Application):
"Network",
"Identity",
]
class RecoveryChooser(Application):
from console_conf.palette import COLORS, STYLES, STYLES_MONO
project = "console_conf"
controllers = [
"RecoveryChooserWelcome",
"RecoveryChooser",
"RecoveryChooserConfirm",
]
def __init__(self, opts, chooser_input, chooser_output):
"""Takes the options and raw input/output streams for communicating with the
chooser parent process.
"""
self._chooser_output = chooser_output
# make_model is used by super()'s constructor, but we need to use the
# instance data
self.make_model = lambda: RecoverySystemsModel.from_systems_stream(
chooser_input
)
super().__init__(opts)
def respond(self, choice):
"""Produce a response to the parent process"""
self.base_model.to_response_stream(choice, self._chooser_output)

View File

@ -0,0 +1,230 @@
#!/usr/bin/env python3
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import json
import attr
import jsonschema
log = logging.getLogger("console_conf.models.systems")
# This json schema describes the recovery systems data. Allow additional
# properties at each level so that console-conf does not have to be exactly in
# sync with snapd.
_RECOVERY_SYSTEMS_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "systems",
"type": "object",
"additionalProperties": True,
"required": ["systems"],
"properties": {
"systems": {
"type": "array",
"items": {
"type": "object",
"required": ["label", "brand", "model"],
"properties": {
"label": {"type": "string"},
"actions": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": True,
"required": ["title", "mode"],
"properties": {
"title": {"type": "string"},
"mode": {"type": "string"},
},
},
},
"brand": {
"type": "object",
"additionalProperties": True,
"required": ["id", "username", "display-name"],
"properties": {
"id": {"type": "string"},
"username": {"type": "string"},
"display-name": {"type": "string"},
"validation": {"type": "string"},
},
},
"model": {
"type": "object",
"additionalProperties": True,
"required": ["model", "brand-id", "display-name"],
"properties": {
"model": {"type": "string"},
"brand-id": {"type": "string"},
"display-name": {"type": "string"},
},
},
},
},
},
},
}
class RecoverySystemsModel:
"""Recovery chooser data"""
def __init__(self, systems_data):
self.systems = systems_data
# current selection
self._selection = None
self._current = None
# find which system is current one, but be robust if none is marked as
# such
cs = [s for s in systems_data if s.current]
if cs:
self._current = cs[0]
def select(self, system, action):
self._selection = SelectedSystemAction(system=system, action=action)
@property
def selection(self):
return self._selection
@property
def current(self):
return self._current
@staticmethod
def from_systems(recovery_systems):
systems = []
for syst in recovery_systems:
m = syst["model"]
b = syst["brand"]
model = SystemModel(
model=m["model"],
brand_id=m["brand-id"],
display_name=m["display-name"]
)
brand = Brand(
ID=b["id"],
username=b["username"],
display_name=b["display-name"],
validation=b.get("validation", "unproven"),
)
actions = []
for a in syst.get("actions", []):
actions.append(SystemAction(title=a["title"], mode=a["mode"]))
s = RecoverySystem(
current=syst.get("current", False),
label=syst["label"],
model=model,
brand=brand,
actions=actions,
)
systems.append(s)
return RecoverySystemsModel(systems)
@staticmethod
def from_systems_stream(chooser_input):
"""Deserialize recovery systems from input JSON stream."""
try:
dec = json.load(chooser_input)
jsonschema.validate(dec, _RECOVERY_SYSTEMS_SCHEMA)
systems = dec.get("systems", [])
except json.JSONDecodeError:
log.exception("cannot decode recovery systems info")
raise
except jsonschema.ValidationError:
log.exception("cannot validate recovery systems data")
raise
return RecoverySystemsModel.from_systems(systems)
@staticmethod
def to_response_stream(obj, chooser_output):
"""Serialize an object with selected action as JSON to the given output
stream.
"""
if not isinstance(obj, SelectedSystemAction):
raise TypeError("unexpected type: {}".format(type(obj)))
choice = {
"label": obj.system.label,
"action": {
"mode": obj.action.mode,
"title": obj.action.title,
},
}
return json.dump(choice, fp=chooser_output)
@attr.s
class RecoverySystem:
current = attr.ib()
label = attr.ib()
model = attr.ib()
brand = attr.ib()
actions = attr.ib()
def __eq__(self, other):
return self.current == other.current and \
self.label == other.label and \
self.model == other.model and \
self.brand == other.brand and \
self.actions == other.actions
@attr.s
class Brand:
ID = attr.ib()
username = attr.ib()
display_name = attr.ib()
validation = attr.ib()
def __eq__(self, other):
return self.ID == other.ID and \
self.username == other.username and \
self.display_name == other.display_name and \
self.validation == other.validation
@attr.s
class SystemModel:
model = attr.ib()
brand_id = attr.ib()
display_name = attr.ib()
def __eq__(self, other):
return self.model == other.model and \
self.brand_id == other.brand_id and \
self.display_name == other.display_name
@attr.s
class SystemAction:
title = attr.ib()
mode = attr.ib()
def __eq__(self, other):
return self.title == other.title and \
self.mode == other.mode
@attr.s
class SelectedSystemAction:
system = attr.ib()
action = attr.ib()
def __eq__(self, other):
return self.system == other.system and \
self.action == other.action

View File

@ -0,0 +1,257 @@
#!/usr/bin/env python3
# Copyright 2020 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import json
import jsonschema
from io import StringIO
from console_conf.models.systems import (
RecoverySystemsModel,
RecoverySystem,
Brand,
SystemModel,
SystemAction,
SelectedSystemAction,
)
class RecoverySystemsModelTests(unittest.TestCase):
reference = {
"systems": [
{
"current": True,
"label": "1234",
"brand": {
"id": "brand-id",
"username": "brand-username",
"display-name": "this is my brand",
"validation": "verified",
},
"model": {
"model": "core20-amd64",
"brand-id": "brand-id",
"display-name": "Core 20 AMD64 system",
},
"actions": [
{"title": "reinstall", "mode": "install"},
{"title": "recover", "mode": "recover"},
]
},
{
"label": "other",
"brand": {
"id": "other-brand-id",
"username": "other-brand",
"display-name": "my brand",
"validation": "unproven",
},
"model": {
"model": "my-brand-box",
"brand-id": "other-brand-id",
"display-name": "Funky box",
},
"actions": [
{"title": "reinstall", "mode": "install"},
]
}
]
}
def test_from_systems_stream_happy(self):
raw = json.dumps(self.reference)
systems = RecoverySystemsModel.from_systems_stream(StringIO(raw))
exp = RecoverySystemsModel([
RecoverySystem(
current=True,
label="1234",
model=SystemModel(
model="core20-amd64",
brand_id="brand-id",
display_name="Core 20 AMD64 system"),
brand=Brand(
ID="brand-id",
username="brand-username",
display_name="this is my brand",
validation="verified"),
actions=[SystemAction(title="reinstall", mode="install"),
SystemAction(title="recover", mode="recover")]
),
RecoverySystem(
current=False,
label="other",
model=SystemModel(
model="my-brand-box",
brand_id="other-brand-id",
display_name="Funky box"),
brand=Brand(
ID="other-brand-id",
username="other-brand",
display_name="my brand",
validation="unproven"),
actions=[SystemAction(title="reinstall", mode="install")]
),
])
self.assertEqual(systems.systems, exp.systems)
self.assertEqual(systems.current, exp.systems[0])
def test_from_systems_stream_invalid_empty(self):
with self.assertRaises(jsonschema.ValidationError):
RecoverySystemsModel.from_systems_stream(StringIO("{}"))
def test_from_systems_stream_invalid_missing_system_label(self):
raw = json.dumps({
"systems": [
{
"brand": {
"id": "brand-id",
"username": "brand-username",
"display-name": "this is my brand",
"validation": "verified",
},
"model": {
"model": "core20-amd64",
"brand-id": "brand-id",
"display-name": "Core 20 AMD64 system",
},
"actions": [
{"title": "reinstall", "mode": "install"},
{"title": "recover", "mode": "recover"},
]
},
]
})
with self.assertRaises(jsonschema.ValidationError):
RecoverySystemsModel.from_systems_stream(StringIO(raw))
def test_from_systems_stream_invalid_missing_brand(self):
raw = json.dumps({
"systems": [
{
"label": "1234",
"model": {
"model": "core20-amd64",
"brand-id": "brand-id",
"display-name": "Core 20 AMD64 system",
},
"actions": [
{"title": "reinstall", "mode": "install"},
{"title": "recover", "mode": "recover"},
]
},
]
})
with self.assertRaises(jsonschema.ValidationError):
RecoverySystemsModel.from_systems_stream(StringIO(raw))
def test_from_systems_stream_invalid_missing_model(self):
raw = json.dumps({
"systems": [
{
"label": "1234",
"brand": {
"id": "brand-id",
"username": "brand-username",
"display-name": "this is my brand",
"validation": "verified",
},
"actions": [
{"title": "reinstall", "mode": "install"},
{"title": "recover", "mode": "recover"},
]
},
]
})
with self.assertRaises(jsonschema.ValidationError):
RecoverySystemsModel.from_systems_stream(StringIO(raw))
def test_from_systems_stream_valid_no_actions(self):
raw = json.dumps({
"systems": [
{
"label": "1234",
"model": {
"model": "core20-amd64",
"brand-id": "brand-id",
"display-name": "Core 20 AMD64 system",
},
"brand": {
"id": "brand-id",
"username": "brand-username",
"display-name": "this is my brand",
"validation": "verified",
},
"actions": [],
},
]
})
RecoverySystemsModel.from_systems_stream(StringIO(raw))
def test_selection(self):
raw = json.dumps(self.reference)
model = RecoverySystemsModel.from_systems_stream(StringIO(raw))
model.select(model.systems[1], model.systems[1].actions[0])
self.assertEqual(model.selection,
SelectedSystemAction(
system=model.systems[1],
action=model.systems[1].actions[0]))
def test_to_response_stream(self):
raw = json.dumps(self.reference)
model = RecoverySystemsModel.from_systems_stream(StringIO(raw))
model.select(model.systems[1], model.systems[1].actions[0])
stream = StringIO()
RecoverySystemsModel.to_response_stream(model.selection, stream)
fromjson = json.loads(stream.getvalue())
self.assertEqual(fromjson, {
"label": "other",
"action": {
"mode": "install",
"title": "reinstall",
},
})
def test_no_current(self):
reference = {
"systems": [
{
"current": False,
"label": "1234",
"brand": {
"id": "brand-id",
"username": "brand-username",
"display-name": "this is my brand",
"validation": "verified",
},
"model": {
"model": "core20-amd64",
"brand-id": "brand-id",
"display-name": "Core 20 AMD64 system",
},
"actions": [
{"title": "reinstall", "mode": "install"},
{"title": "recover", "mode": "recover"},
]
},
],
}
systems = RecoverySystemsModel.from_systems(reference["systems"])
self.assertEqual(len(systems.systems), 1)
self.assertEqual(systems.systems[0].label, "1234")
self.assertIsNone(systems.current)

View File

@ -17,9 +17,19 @@
from .identity import IdentityView
from .login import LoginView
from .welcome import WelcomeView
from .welcome import WelcomeView, ChooserWelcomeView
from .chooser import (
ChooserView,
ChooserCurrentSystemView,
ChooserConfirmView
)
__all__ = [
'IdentityView',
'LoginView',
'WelcomeView',
"IdentityView",
"LoginView",
"WelcomeView",
"ChooserWelcomeView",
"ChooserView",
"ChooserCurrentSystemView",
"ChooserConfirmView",
]

View File

@ -0,0 +1,199 @@
# Copyright 2015 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
""" Chooser
Chooser provides a view with recovery chooser actions.
"""
import logging
from urwid import (
connect_signal,
Text,
)
from subiquitycore.ui.buttons import (
danger_btn,
reset_btn,
forward_btn,
)
from subiquitycore.ui.actionmenu import (
Action,
ActionMenu,
)
from subiquitycore.ui.container import Pile, ListBox
from subiquitycore.ui.utils import (
button_pile,
screen,
make_action_menu_row,
Color,
)
from subiquitycore.ui.table import TableRow, TablePile
from subiquitycore.view import BaseView
log = logging.getLogger("console_conf.views.chooser")
class ChooserCurrentSystemView(BaseView):
title = "Ubuntu Core"
def __init__(self, controller, current, has_more=False):
fmt = "Select one of available actions for \"{}\" by \"{}\"{}."
maybe_more = " or view all available systems" if has_more else ""
excerpt = fmt.format(current.model.display_name,
current.brand.display_name,
maybe_more)
self.controller = controller
log.debug('current system: %s', current)
log.debug('more systems available: %s', has_more)
actions = []
for action in current.actions:
actions.append(forward_btn(label=action.title,
on_press=self._current_system_action,
user_arg=(current, action)))
if has_more:
# add a button to show the other systems
actions.append(Text(""))
actions.append(forward_btn(label="Show all available systems",
on_press=self._more_options))
lb = ListBox(actions)
buttons = [
reset_btn("ABORT", on_press=self.abort),
]
super().__init__(screen(
lb,
buttons=button_pile(buttons),
focus_buttons=False,
narrow_rows=True,
excerpt=excerpt))
def _current_system_action(self, sender, arg):
current, action = arg
self.controller.select(current, action)
def _more_options(self, sender):
self.controller.more_options()
def abort(self, result):
self.controller.cancel()
class ChooserView(BaseView):
title = "Ubuntu Core"
excerpt = ("Select one of available recovery systems and a desired "
"action to execute.")
def __init__(self, controller, systems):
self.controller = controller
heading_table = TablePile([
TableRow([
Color.info_minor(Text(header)) for header in [
"LABEL", "MODEL", "PUBLISHER", ""
]
])
],
spacing=2)
trows = []
systems = sorted(systems,
key=lambda s: (s.brand.display_name,
s.model.display_name,
s.current,
s.label))
for s in systems:
actions = []
log.debug('actions: %s', s.actions)
for act in s.actions:
actions.append(Action(label=act.title,
value=act,
enabled=True))
menu = ActionMenu(actions)
connect_signal(menu, 'action', self._system_action, s)
srow = make_action_menu_row([
Text(s.label),
Text(s.model.display_name),
Text(s.brand.display_name),
Text("(current)" if s.current else ""),
menu,
], menu)
trows.append(srow)
systems_table = TablePile(trows, spacing=2)
systems_table.bind(heading_table)
rows = [
Pile([heading_table, systems_table]),
]
buttons = [
reset_btn("ABORT", on_press=self.abort),
]
super().__init__(screen(
rows=rows,
buttons=button_pile(buttons),
focus_buttons=False,
excerpt=self.excerpt))
def _system_action(self, sender, action, system):
self.controller.select(system, action)
def abort(self, result):
self.controller.cancel()
class ChooserConfirmView(BaseView):
title = "Ubuntu Core"
excerpt = ("Summary of the selected action.")
def __init__(self, controller, selection):
self.controller = controller
buttons = [
danger_btn("CONFIRM", on_press=self.confirm),
reset_btn("ABORT", on_press=self.abort),
]
using_summary = "System seed of device {} version {} from {}".format(
selection.system.model.display_name,
selection.system.label,
selection.system.brand.display_name
)
summary = [
TableRow([Text("Action:"), Color.info_error(Text(
selection.action.title))]),
TableRow([Text("Using:"), Text(using_summary)]),
]
rows = [
Pile([Text("")]),
Pile([TablePile(summary)])
]
super().__init__(screen(
rows=rows,
buttons=button_pile(buttons),
focus_buttons=False,
excerpt=self.excerpt))
def abort(self, result):
self.controller.cancel()
def confirm(self, result):
self.controller.confirm()

View File

@ -29,16 +29,28 @@ log = logging.getLogger("console_conf.views.welcome")
class WelcomeView(BaseView):
title = "Ubuntu Core"
excerpt = ("Configure the network and setup an administrator "
"account on this all-snap Ubuntu Core system.")
excerpt = (
"Configure the network and setup an administrator "
"account on this all-snap Ubuntu Core system."
)
def __init__(self, controller):
self.controller = controller
super().__init__(screen(
super().__init__(
screen(
rows=[],
buttons=button_pile([done_btn("OK", on_press=self.confirm)]),
focus_buttons=True,
excerpt=self.excerpt))
excerpt=self.excerpt,
)
)
def confirm(self, result):
self.controller.done()
class ChooserWelcomeView(WelcomeView):
excerpt = (
"System recovery triggered. Proceed to select one of available "
"systems and execute a recovery action."
)

View File

@ -156,9 +156,9 @@ class Subiquity(Application):
cmdline = [sys.executable] + sys.argv
os.execvp(cmdline[0], cmdline)
def make_screen(self):
def make_screen(self, input=None, output=None):
if self.interactive():
return super().make_screen()
return super().make_screen(input, output)
else:
r, w = os.pipe()
s = urwid.raw_display.Screen(

View File

@ -60,9 +60,9 @@ KDSKBMODE = 0x4B45 # sets current keyboard mode
class TwentyFourBitScreen(urwid.raw_display.Screen):
def __init__(self, _urwid_name_to_rgb):
def __init__(self, _urwid_name_to_rgb, **kwargs):
self._urwid_name_to_rgb = _urwid_name_to_rgb
super().__init__()
super().__init__(**kwargs)
def _cc(self, color):
"""Return the "SGR" parameter for selecting color.
@ -610,7 +610,7 @@ class Application:
if os.isatty(fd):
tty.setraw(fd)
def make_screen(self):
def make_screen(self, inputf=None, outputf=None):
"""Return a screen to be passed to MainLoop.
colors is a list of exactly 8 tuples (name, (r, g, b)), the same as
@ -619,6 +619,11 @@ class Application:
# On the linux console, we overwrite the first 8 colors to be those
# defined by colors. Otherwise, we return a screen that uses ISO
# 8613-3ish codes to display the colors.
if inputf is None:
inputf = sys.stdin
if outputf is None:
outputf = sys.stdout
if len(self.COLORS) != 8:
raise Exception(
"make_screen must be passed a list of exactly 8 colors")
@ -631,18 +636,19 @@ class Application:
for j in range(3):
curpal[i*3+j] = self.COLORS[i][1][j]
fcntl.ioctl(sys.stdout.fileno(), PIO_CMAP, curpal)
return urwid.raw_display.Screen()
return urwid.raw_display.Screen(input=inputf, output=outputf)
elif self.opts.ascii:
return urwid.raw_display.Screen()
return urwid.raw_display.Screen(input=inputf, output=outputf)
else:
_urwid_name_to_rgb = {}
for i, n in enumerate(urwid_8_names):
_urwid_name_to_rgb[n] = self.COLORS[i][1]
return TwentyFourBitScreen(_urwid_name_to_rgb)
return TwentyFourBitScreen(_urwid_name_to_rgb,
input=inputf, output=outputf)
def run(self):
def run(self, input=None, output=None):
log.debug("Application.run")
screen = self.make_screen()
screen = self.make_screen(input, output)
self.urwid_loop = urwid.MainLoop(
self.ui, palette=self.color_palette, screen=screen,