Merge pull request #829 from mwhudson/simpler-snaplist-types

make interface between snaplist view and controller client-server friendly
This commit is contained in:
Michael Hudson-Doyle 2020-09-21 12:19:13 +12:00 committed by GitHub
commit 8f8a87ad1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 158 additions and 117 deletions

View File

@ -17,6 +17,8 @@
# split into client and server processes. View code should only use these
# types!
import datetime
import enum
from typing import List
import attr
@ -35,3 +37,46 @@ class SSHData:
install_server: bool
allow_pw: bool
authorized_keys: List[str] = attr.Factory(list)
class SnapCheckState(enum.Enum):
FAILED = enum.auto()
LOADING = enum.auto()
DONE = enum.auto()
@attr.s(auto_attribs=True)
class ChannelSnapInfo:
channel_name: str
revision: str
confinement: str
version: str
size: int
released_at: datetime.datetime = attr.ib(
metadata={'time_fmt': '%Y-%m-%dT%H:%M:%S.%fZ'})
@attr.s(auto_attribs=True, cmp=False)
class SnapInfo:
name: str
summary: str = ''
publisher: str = ''
verified: bool = False
description: str = ''
confinement: str = ''
license: str = ''
channels: List[ChannelSnapInfo] = attr.Factory(list)
@attr.s(auto_attribs=True)
class SnapSelection:
name: str
channel: str
is_classic: bool = False
@attr.s(auto_attribs=True)
class SnapListResponse:
status: SnapCheckState
snaps: List[SnapInfo] = attr.Factory(list)
selections: List[SnapSelection] = attr.Factory(list)

View File

@ -14,6 +14,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from typing import List
import requests.exceptions
@ -29,7 +30,11 @@ from subiquity.controller import (
SubiquityTuiController,
)
from subiquity.models.snaplist import SnapSelection
from subiquity.common.types import (
SnapListResponse,
SnapCheckState,
SnapSelection,
)
from subiquity.ui.views.snaplist import SnapListView
log = logging.getLogger('subiquity.controllers.snaplist')
@ -155,32 +160,41 @@ class SnapListController(SubiquityTuiController):
self.loader = self._make_loader()
self.loader.start()
def make_ui(self):
if self.loader.failed or not self.app.base_model.network.has_network:
async def make_ui(self):
data = await self.get_snap_list(wait=False)
if data.status == SnapCheckState.FAILED:
# If loading snaps failed or the network is disabled, skip the
# screen.
self.configured()
raise Skip()
return SnapListView(self.model, self)
return SnapListView(self, data)
def run_answers(self):
if 'snaps' in self.answers:
to_install = {}
selections = []
for snap_name, selection in self.answers['snaps'].items():
to_install[snap_name] = SnapSelection(**selection)
self.done(to_install)
selections.append(SnapSelection(name=snap_name, **selection))
self.done(selections)
def get_snap_list_task(self):
return self.loader.get_snap_list_task()
async def get_snap_list(self, *, wait: bool) -> SnapListResponse:
if self.loader.failed or not self.app.base_model.network.has_network:
return SnapListResponse(status=SnapCheckState.FAILED)
if not self.loader.snap_list_fetched and not wait:
return SnapListResponse(status=SnapCheckState.LOADING)
await self.loader.get_snap_list_task()
return SnapListResponse(
status=SnapCheckState.DONE,
snaps=self.model.get_snap_list(),
selections=self.model.selections)
def get_snap_info_task(self, snap):
return self.loader.get_snap_info_task(snap)
def done(self, snaps_to_install):
def done(self, selections: List[SnapSelection]):
log.debug(
"SnapListController.done next_screen snaps_to_install=%s",
snaps_to_install)
self.model.set_installed_list(snaps_to_install)
selections)
self.model.set_installed_list(selections)
self.configured()
self.app.next_screen()
@ -188,4 +202,4 @@ class SnapListController(SubiquityTuiController):
self.app.prev_screen()
def make_autoinstall(self):
return self.model.to_install
return self.model.selections

View File

@ -16,50 +16,14 @@
import datetime
import logging
import attr
from subiquity.common.types import (
ChannelSnapInfo,
SnapInfo,
)
log = logging.getLogger("subiquity.models.snaplist")
@attr.s(cmp=False)
class SnapInfo:
name = attr.ib()
summary = attr.ib(default='')
publisher = attr.ib(default='')
verified = attr.ib(default=False)
description = attr.ib(default='')
confinement = attr.ib(default='')
license = attr.ib(default='')
channels = attr.ib(default=attr.Factory(list))
partial = attr.ib(default=True)
def update(self, data):
self.summary = data['summary']
self.publisher = data['developer']
self.verified = data['publisher']['validation'] == "verified"
self.description = data['description']
self.confinement = data['confinement']
self.license = data['license']
self.partial = False
@attr.s(cmp=False)
class ChannelSnapInfo:
channel_name = attr.ib()
revision = attr.ib()
confinement = attr.ib()
version = attr.ib()
size = attr.ib()
released_at = attr.ib()
@attr.s(cmp=False)
class SnapSelection:
channel = attr.ib()
is_classic = attr.ib()
risks = ["stable", "candidate", "beta", "edge"]
@ -69,7 +33,8 @@ class SnapListModel:
def __init__(self):
self._snap_info = []
self._snaps_by_name = {}
self.to_install = {} # snap_name -> SnapSelection
self.selections = [] # [SnapSelection]
self.complete_snaps = set()
def _snap_for_name(self, name):
s = self._snaps_by_name.get(name)
@ -80,18 +45,27 @@ class SnapListModel:
def load_find_data(self, data):
for info in data['result']:
self._snap_for_name(info['name']).update(info)
self.update(self._snap_for_name(info['name']), info)
def add_partial_snap(self, name):
self._snaps_for_name(name)
def update(self, snap, data):
snap.summary = data['summary']
snap.publisher = data['developer']
snap.verified = data['publisher']['validation'] == "verified"
snap.description = data['description']
snap.confinement = data['confinement']
snap.license = data['license']
self.complete_snaps.add(snap)
def load_info_data(self, data):
info = data['result'][0]
snap = self._snaps_by_name.get(info['name'])
if snap is None:
return
if snap.partial:
snap.update(info)
if snap not in self.complete_snaps:
self.update_snap(snap, info)
channel_map = info['channels']
for track in info['tracks']:
for risk in risks:
@ -115,7 +89,7 @@ class SnapListModel:
def get_snap_list(self):
return self._snap_info[:]
def set_installed_list(self, to_install):
for name in to_install.keys():
self._snap_for_name(name)
self.to_install = to_install
def set_installed_list(self, selections):
for selection in selections:
self._snap_for_name(selection.name)
self.selections = selections

View File

@ -194,14 +194,13 @@ class SubiquityModel:
config['ssh_authorized_keys'] = self.ssh.authorized_keys
if self.ssh.install_server:
config['ssh_pwauth'] = self.ssh.pwauth
if self.snaplist.to_install:
if self.snaplist.selections:
cmds = []
for snap_name, selection in sorted(
self.snaplist.to_install.items()):
for selection in self.snaplist.selections:
cmd = ['snap', 'install', '--channel=' + selection.channel]
if selection.is_classic:
cmd.append('--classic')
cmd.append(snap_name)
cmd.append(selection.name)
cmds.append(' '.join(cmd))
config['snap'] = {
'commands': cmds,

View File

@ -54,8 +54,11 @@ from subiquitycore.ui.utils import (
)
from subiquitycore.view import BaseView
from subiquity.common.types import (
SnapCheckState,
SnapSelection,
)
from subiquity.models.filesystem import humanize_size
from subiquity.models.snaplist import SnapSelection
log = logging.getLogger("subiquity.views.snaplist")
@ -122,14 +125,16 @@ class SnapInfoView(WidgetWrap):
channel_rows = []
for csi in snap.channels:
latest_update = max(latest_update, csi.released_at)
selection = SnapSelection(
name=self.snap.name,
channel=csi.channel_name,
is_classic=csi.confinement == "classic")
btn = StarRadioButton(
radio_group,
csi.channel_name,
state=csi.channel_name == cur_channel,
on_state_change=self.state_change,
user_data=SnapSelection(
channel=csi.channel_name,
is_classic=csi.confinement == "classic"))
user_data=selection)
channel_rows.append(Color.menu_button(TableRow([
btn,
Text(csi.version),
@ -203,7 +208,7 @@ class SnapInfoView(WidgetWrap):
log.debug(
"selecting %s from %s", self.snap.name, selection.channel)
self.parent.snap_boxes[self.snap.name].set_state(True)
self.parent.to_install[self.snap.name] = selection
self.parent.selections_by_name[self.snap.name] = selection
def render(self, size, focus):
maxcol, maxrow = size
@ -288,22 +293,6 @@ class SnapCheckBox(CheckBox):
super().__init__(
snap.name, state=state, on_state_change=self.state_change)
def loaded(self):
if len(self.snap.channels) == 0: # or other indication of failure
ff = FetchingFailed(self, self.snap)
self.parent.show_overlay(ff, width=ff.width)
else:
cur_chan = None
if self.snap.name in self.parent.to_install:
cur_chan = self.parent.to_install[self.snap.name].channel
siv = SnapInfoView(self.parent, self.snap, cur_chan)
self.parent.show_screen(screen(
siv,
[other_btn(
label=_("Close"),
on_press=self.parent.show_main_screen)],
focus_buttons=False))
async def load_info(self):
app = self.parent.controller.app
await app.wait_with_text_dialog(
@ -311,7 +300,21 @@ class SnapCheckBox(CheckBox):
self.parent.controller.get_snap_info_task(self.snap)),
_("Fetching info for {snap}").format(snap=self.snap.name),
can_cancel=True)
self.loaded()
if len(self.snap.channels) == 0: # or other indication of failure
ff = FetchingFailed(self, self.snap)
self.parent.show_overlay(ff, width=ff.width)
else:
cur_chan = None
selection = self.parent.selections_by_name.get(self.snap.name)
if selection is not None:
cur_chan = selection.channel
siv = SnapInfoView(self.parent, self.snap, cur_chan)
self.parent.show_screen(screen(
siv,
[other_btn(
label=_("Close"),
on_press=self.parent.show_main_screen)],
focus_buttons=False))
def keypress(self, size, key):
if key.startswith("enter"):
@ -322,54 +325,55 @@ class SnapCheckBox(CheckBox):
def state_change(self, sender, new_state):
if new_state:
log.debug("selecting %s", self.snap.name)
self.parent.to_install[self.snap.name] = SnapSelection(
self.parent.selections_by_name[self.snap.name] = SnapSelection(
name=self.snap.name,
channel='stable',
is_classic=self.snap.confinement == "classic")
else:
log.debug("unselecting %s", self.snap.name)
self.parent.to_install.pop(self.snap.name, None)
self.parent.selections_by_name.pop(self.snap.name, None)
class SnapListView(BaseView):
title = _("Featured Server Snaps")
def __init__(self, model, controller):
self.model = model
def __init__(self, controller, data):
self.controller = controller
self.to_install = model.to_install.copy()
self.load()
def loaded(self):
snap_list = self.model.get_snap_list()
if len(snap_list) == 0:
self.offer_retry()
if data.status == SnapCheckState.LOADING:
self.wait_load()
else:
self.make_main_screen(snap_list)
self.show_main_screen()
self.loaded(data)
async def _wait(self, t, spinner):
spinner.stop()
await t
self.loaded()
def load(self, sender=None):
t = self.controller.get_snap_list_task()
if t.done():
self.loaded()
return
def wait_load(self):
spinner = Spinner(self.controller.app.aio_loop, style='dots')
spinner.start()
self._w = screen(
[spinner], [ok_btn(label=_("Continue"), on_press=self.done)],
excerpt=_("Loading server snaps from store, please wait..."))
schedule_task(self._wait(t, spinner))
schedule_task(self._wait_load(spinner))
async def _wait_load(self, spinner):
# If we show the loading screen at all, we want to show it for
# at least a second to avoid flickering at the user.
min_wait = self.controller.app.aio_loop.create_task(asyncio.sleep(1))
data = await self.controller.get_snap_list(wait=True)
await min_wait
spinner.stop()
if data.status == SnapCheckState.FAILED:
self.offer_retry()
else:
self.loaded(data)
def loaded(self, data):
self.make_main_screen(data)
self.show_main_screen()
def offer_retry(self):
self._w = screen(
[Text(_("Sorry, loading snaps from the store failed."))],
[
other_btn(label=_("Try again"), on_press=self.load),
other_btn(label=_("Try again"), on_press=self.wait_load),
ok_btn(label=_("Continue"), on_press=self.done),
])
@ -414,16 +418,19 @@ class SnapListView(BaseView):
log.debug("pre-seeded snaps %s", names)
return names
def make_main_screen(self, snap_list):
def make_main_screen(self, data):
self.selections_by_name = {
selection.name: selection for selection in data.selections
}
self.snap_boxes = {}
body = []
preinstalled = self.get_preinstalled_snaps()
for snap in snap_list:
for snap in data.snaps:
if snap.name in preinstalled:
log.debug("not offering preseeded snap %r", snap.name)
continue
box = self.snap_boxes[snap.name] = SnapCheckBox(
self, snap, snap.name in self.to_install)
self, snap, snap.name in self.selections_by_name)
publisher = snap.publisher
if snap.verified:
publisher = [publisher, ('verified', '\N{check mark}')]
@ -455,8 +462,10 @@ class SnapListView(BaseView):
"package, publisher and versions available."))
def done(self, sender=None):
log.debug("snaps to install %s", self.to_install)
self.controller.done(self.to_install)
log.debug("snaps to install %s", self.selections_by_name)
self.controller.done(sorted(
self.selections_by_name.values(),
key=lambda s: s.name))
def cancel(self, sender=None):
if self._w is self._main_screen: