384 lines
13 KiB
Python
384 lines
13 KiB
Python
# Copyright 2020 Canonical, Ltd.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import asyncio
|
|
import inspect
|
|
import logging
|
|
import os
|
|
import signal
|
|
from typing import Callable, Optional, Union
|
|
|
|
import urwid
|
|
|
|
import yaml
|
|
|
|
from subiquitycore.async_helpers import schedule_task
|
|
from subiquitycore.core import Application
|
|
from subiquitycore.palette import (
|
|
PALETTE_COLOR,
|
|
PALETTE_MONO,
|
|
)
|
|
from subiquitycore.screen import make_screen
|
|
from subiquitycore.tuicontroller import Skip
|
|
from subiquitycore.ui.utils import LoadingDialog
|
|
from subiquitycore.ui.frame import SubiquityCoreUI
|
|
from subiquitycore.utils import astart_command
|
|
from subiquitycore.view import BaseView
|
|
|
|
log = logging.getLogger('subiquitycore.tui')
|
|
|
|
|
|
def extend_dec_special_charmap():
|
|
urwid.escape.DEC_SPECIAL_CHARMAP.update({
|
|
ord('\N{BLACK RIGHT-POINTING SMALL TRIANGLE}'): '>',
|
|
ord('\N{BLACK LEFT-POINTING SMALL TRIANGLE}'): '<',
|
|
ord('\N{BLACK DOWN-POINTING SMALL TRIANGLE}'): 'v',
|
|
ord('\N{BLACK UP-POINTING SMALL TRIANGLE}'): '^',
|
|
ord('\N{check mark}'): '+',
|
|
ord('\N{bullet}'): '*',
|
|
ord('\N{lower half block}'): '=',
|
|
ord('\N{upper half block}'): '=',
|
|
ord('\N{FULL BLOCK}'): urwid.escape.DEC_SPECIAL_CHARMAP[
|
|
ord('\N{BOX DRAWINGS LIGHT VERTICAL}')],
|
|
})
|
|
|
|
|
|
# When waiting for something of unknown duration, block the UI for at
|
|
# most this long before showing an indication of progress.
|
|
MAX_BLOCK_TIME = 0.1
|
|
# If an indication of progress is shown, show it for at least this
|
|
# long to avoid excessive flicker in the UI.
|
|
MIN_SHOW_PROGRESS_TIME = 1.0
|
|
|
|
|
|
class TuiApplication(Application):
|
|
|
|
make_ui = SubiquityCoreUI
|
|
|
|
def __init__(self, opts):
|
|
super().__init__(opts)
|
|
self.ui = self.make_ui()
|
|
|
|
self.answers = {}
|
|
if opts.answers is not None:
|
|
self.answers = yaml.safe_load(opts.answers.read())
|
|
log.debug("Loaded answers %s", self.answers)
|
|
if not opts.dry_run:
|
|
open('/run/casper-no-prompt', 'w').close()
|
|
|
|
# Set rich_mode to the opposite of what we want, so we can
|
|
# call toggle_rich to get the right things set up.
|
|
self.rich_mode = opts.run_on_serial
|
|
self.urwid_loop = None
|
|
self.cur_screen = None
|
|
self.fg_proc = None
|
|
|
|
def run_command_in_foreground(self, cmd, before_hook=None, after_hook=None,
|
|
**kw):
|
|
if self.fg_proc is not None:
|
|
raise Exception("cannot run two fg processes at once")
|
|
screen = self.urwid_loop.screen
|
|
|
|
async def _run():
|
|
self.fg_proc = proc = await astart_command(
|
|
cmd, stdin=None, stdout=None, stderr=None, **kw)
|
|
await proc.communicate()
|
|
self.fg_proc = None
|
|
# One of the main use cases for this function is to run interactive
|
|
# bash in a subshell. Interactive bash of course creates a process
|
|
# group for itself and sets it as the foreground process group for
|
|
# the controlling terminal. Usually on exit, our process group
|
|
# becomes the foreground process group again but when the subshell
|
|
# is killed for some reason it does not. This causes the tcsetattr
|
|
# that screen.start() does to either cause SIGTTOU to be sent, or
|
|
# if that is ignored (either because there is no shell around to do
|
|
# job control things or we are ignoring it) fail with EIO. So we
|
|
# force our process group back into the foreground with this
|
|
# call. That's not quite enough though because tcsetpgrp *also*
|
|
# causes SIGTTOU to be sent to a background process that calls it,
|
|
# but fortunately if we ignore that (done in start_urwid below),
|
|
# the call still succeeds.
|
|
#
|
|
# I would now like a drink.
|
|
os.tcsetpgrp(0, os.getpgrp())
|
|
screen.start()
|
|
if after_hook is not None:
|
|
after_hook()
|
|
|
|
screen.stop()
|
|
urwid.emit_signal(
|
|
screen, urwid.display_common.INPUT_DESCRIPTORS_CHANGED)
|
|
if before_hook is not None:
|
|
before_hook()
|
|
schedule_task(_run())
|
|
|
|
async def make_view_for_controller(self, new) \
|
|
-> Union[BaseView, Callable[[], BaseView]]:
|
|
new.context.enter("starting UI")
|
|
if self.opts.screens and new.name not in self.opts.screens:
|
|
raise Skip
|
|
try:
|
|
maybe_view = new.make_ui()
|
|
if inspect.iscoroutine(maybe_view):
|
|
view = await maybe_view
|
|
else:
|
|
view = maybe_view
|
|
except Skip:
|
|
new.context.exit("(skipped)")
|
|
raise
|
|
else:
|
|
self.cur_screen = new
|
|
return view
|
|
|
|
async def _wait_with_indication(self, awaitable, show, hide=None):
|
|
"""Wait for something but tell the user if it takes a while.
|
|
|
|
When waiting for something that can take an unknown length of
|
|
time, we want to tell the user if it takes more than a moment
|
|
(defined as MAX_BLOCK_TIME) but make sure that we display any
|
|
indication for long enough that the UI is not flickering
|
|
incomprehensibly (MIN_SHOW_PROGRESS_TIME).
|
|
"""
|
|
min_show_task = None
|
|
|
|
def _show():
|
|
self.ui.block_input = False
|
|
nonlocal min_show_task
|
|
min_show_task = self.aio_loop.create_task(
|
|
asyncio.sleep(MIN_SHOW_PROGRESS_TIME))
|
|
show()
|
|
|
|
self.ui.block_input = True
|
|
show_handle = self.aio_loop.call_later(MAX_BLOCK_TIME, _show)
|
|
try:
|
|
result = await awaitable
|
|
finally:
|
|
if min_show_task:
|
|
await min_show_task
|
|
if hide is not None:
|
|
hide()
|
|
else:
|
|
self.ui.block_input = False
|
|
show_handle.cancel()
|
|
|
|
return result
|
|
|
|
def show_progress(self):
|
|
raise NotImplementedError
|
|
|
|
async def wait_with_text_dialog(self, awaitable, message,
|
|
*, can_cancel=False):
|
|
ld = None
|
|
|
|
task_to_cancel = None
|
|
if can_cancel:
|
|
if not isinstance(awaitable, asyncio.Task):
|
|
orig = awaitable
|
|
|
|
async def w():
|
|
return await orig
|
|
awaitable = task_to_cancel = self.aio_loop.create_task(w())
|
|
else:
|
|
task_to_cancel = None
|
|
|
|
def show_load():
|
|
nonlocal ld
|
|
ld = LoadingDialog(
|
|
self.ui.body, self.aio_loop, message, task_to_cancel)
|
|
self.ui.body.show_overlay(ld, width=ld.width)
|
|
|
|
def hide_load():
|
|
ld.close()
|
|
|
|
return await self._wait_with_indication(
|
|
awaitable, show_load, hide_load)
|
|
|
|
async def wait_with_progress(self, awaitable):
|
|
return await self._wait_with_indication(awaitable, self.show_progress)
|
|
|
|
async def _move_screen(self, increment, coro) \
|
|
-> Optional[Union[BaseView, Callable[[], BaseView]]]:
|
|
if coro is not None:
|
|
await coro
|
|
old, self.cur_screen = self.cur_screen, None
|
|
if old is not None:
|
|
old.context.exit("completed")
|
|
old.end_ui()
|
|
cur_index = self.controllers.index
|
|
while True:
|
|
self.controllers.index += increment
|
|
if self.controllers.index < 0:
|
|
self.controllers.index = cur_index
|
|
return None
|
|
if self.controllers.index >= len(self.controllers.instances):
|
|
self.exit()
|
|
return None
|
|
new = self.controllers.cur
|
|
try:
|
|
return await self.make_view_for_controller(new)
|
|
except Skip:
|
|
log.debug("skipping screen %s", new.name)
|
|
continue
|
|
except Exception:
|
|
self.controllers.index = cur_index
|
|
raise
|
|
|
|
async def move_screen(self, increment, coro):
|
|
view_or_callable = await self.wait_with_progress(
|
|
self._move_screen(increment, coro))
|
|
if view_or_callable is not None:
|
|
if callable(view_or_callable):
|
|
view = view_or_callable()
|
|
else:
|
|
view = view_or_callable
|
|
self.ui.set_body(view)
|
|
|
|
def next_screen(self, coro=None):
|
|
self.aio_loop.create_task(self.move_screen(1, coro))
|
|
|
|
def prev_screen(self):
|
|
self.aio_loop.create_task(self.move_screen(-1, None))
|
|
|
|
def select_initial_screen(self):
|
|
self.next_screen()
|
|
|
|
def run_scripts(self, scripts):
|
|
# run_scripts runs (or rather arranges to run, it's all async)
|
|
# a series of python snippets in a helpful namespace. This is
|
|
# all in aid of being able to test some part of the UI without
|
|
# having to click the same buttons over and over again to get
|
|
# the UI to the part you are working on.
|
|
#
|
|
# In the namespace are:
|
|
# * everything from view_helpers
|
|
# * wait, delay execution of subsequent scripts for a while
|
|
# * c, a function that finds a button and clicks it. uses
|
|
# wait, above to wait for the button to appear in case it
|
|
# takes a while.
|
|
from subiquitycore.testing import view_helpers
|
|
|
|
class ScriptState:
|
|
def __init__(self):
|
|
self.ns = view_helpers.__dict__.copy()
|
|
self.waiting = False
|
|
self.wait_count = 0
|
|
self.scripts = scripts
|
|
|
|
ss = ScriptState()
|
|
|
|
def _run_script():
|
|
log.debug("running %s", ss.scripts[0])
|
|
exec(ss.scripts[0], ss.ns)
|
|
if ss.waiting:
|
|
return
|
|
ss.scripts = ss.scripts[1:]
|
|
if ss.scripts:
|
|
self.aio_loop.call_soon(_run_script)
|
|
|
|
def c(pat):
|
|
but = view_helpers.find_button_matching(self.ui, '.*' + pat + '.*')
|
|
if not but:
|
|
ss.wait_count += 1
|
|
if ss.wait_count > 10:
|
|
raise Exception("no button found matching %r after"
|
|
"waiting for 10 secs" % pat)
|
|
wait(1, func=lambda: c(pat))
|
|
return
|
|
ss.wait_count = 0
|
|
view_helpers.click(but)
|
|
|
|
def wait(delay, func=None):
|
|
ss.waiting = True
|
|
|
|
def next():
|
|
ss.waiting = False
|
|
if func is not None:
|
|
func()
|
|
if not ss.waiting:
|
|
ss.scripts = ss.scripts[1:]
|
|
if ss.scripts:
|
|
_run_script()
|
|
self.aio_loop.call_later(delay, next)
|
|
|
|
ss.ns['c'] = c
|
|
ss.ns['wait'] = wait
|
|
ss.ns['ui'] = self.ui
|
|
|
|
self.aio_loop.call_later(0.06, _run_script)
|
|
|
|
def set_rich(self, rich):
|
|
if rich == self.rich_mode:
|
|
return
|
|
self.toggle_rich()
|
|
|
|
def toggle_rich(self):
|
|
if self.rich_mode:
|
|
urwid.util.set_encoding('ascii')
|
|
new_palette = PALETTE_MONO
|
|
self.rich_mode = False
|
|
else:
|
|
urwid.util.set_encoding('utf-8')
|
|
new_palette = PALETTE_COLOR
|
|
self.rich_mode = True
|
|
urwid.CanvasCache.clear()
|
|
self.urwid_loop.screen.register_palette(new_palette)
|
|
self.urwid_loop.screen.clear()
|
|
|
|
def unhandled_input(self, key):
|
|
if self.opts.dry_run and key == 'ctrl x':
|
|
self.exit()
|
|
elif key == 'f3':
|
|
self.urwid_loop.screen.clear()
|
|
elif self.opts.run_on_serial and key in ['ctrl t', 'f4']:
|
|
self.toggle_rich()
|
|
|
|
def extra_urwid_loop_args(self):
|
|
return {}
|
|
|
|
def make_screen(self, inputf=None, outputf=None):
|
|
return make_screen(self.opts.ascii, inputf, outputf)
|
|
|
|
def start_urwid(self, input=None, output=None):
|
|
# This stops the tcsetpgrp call in run_command_in_foreground from
|
|
# suspending us. See the rant there for more details.
|
|
signal.signal(signal.SIGTTOU, signal.SIG_IGN)
|
|
screen = self.make_screen(input, output)
|
|
screen.register_palette(PALETTE_COLOR)
|
|
self.urwid_loop = urwid.MainLoop(
|
|
self.ui, screen=screen,
|
|
handle_mouse=False, pop_ups=True,
|
|
unhandled_input=self.unhandled_input,
|
|
event_loop=urwid.AsyncioEventLoop(loop=self.aio_loop),
|
|
**self.extra_urwid_loop_args()
|
|
)
|
|
extend_dec_special_charmap()
|
|
self.toggle_rich()
|
|
self.urwid_loop.start()
|
|
self.select_initial_screen()
|
|
|
|
async def start(self, start_urwid=True):
|
|
await super().start()
|
|
if start_urwid:
|
|
self.start_urwid()
|
|
|
|
def run(self):
|
|
if self.opts.scripts:
|
|
self.run_scripts(self.opts.scripts)
|
|
try:
|
|
super().run()
|
|
finally:
|
|
if self.urwid_loop is not None:
|
|
self.urwid_loop.stop()
|