# Copyright 2020 Canonical, Ltd. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import asyncio import inspect import logging import os import signal from typing import Callable, Optional, Union import urwid import yaml from subiquitycore.async_helpers import schedule_task from subiquitycore.core import Application from subiquitycore.palette import ( PALETTE_COLOR, PALETTE_MONO, ) from subiquitycore.screen import make_screen from subiquitycore.tuicontroller import Skip from subiquitycore.ui.utils import LoadingDialog from subiquitycore.ui.frame import SubiquityCoreUI from subiquitycore.utils import astart_command from subiquitycore.view import BaseView log = logging.getLogger('subiquitycore.tui') def extend_dec_special_charmap(): urwid.escape.DEC_SPECIAL_CHARMAP.update({ ord('\N{BLACK RIGHT-POINTING SMALL TRIANGLE}'): '>', ord('\N{BLACK LEFT-POINTING SMALL TRIANGLE}'): '<', ord('\N{BLACK DOWN-POINTING SMALL TRIANGLE}'): 'v', ord('\N{BLACK UP-POINTING SMALL TRIANGLE}'): '^', ord('\N{check mark}'): '*', ord('\N{circled white star}'): '*', ord('\N{bullet}'): '*', ord('\N{lower half block}'): '=', ord('\N{upper half block}'): '=', ord('\N{FULL BLOCK}'): urwid.escape.DEC_SPECIAL_CHARMAP[ ord('\N{BOX DRAWINGS LIGHT VERTICAL}')], }) # When waiting for something of unknown duration, block the UI for at # most this long before showing an indication of progress. MAX_BLOCK_TIME = 0.1 # If an indication of progress is shown, show it for at least this # long to avoid excessive flicker in the UI. MIN_SHOW_PROGRESS_TIME = 1.0 class TuiApplication(Application): make_ui = SubiquityCoreUI def __init__(self, opts): super().__init__(opts) self.ui = self.make_ui() self.answers = {} if opts.answers is not None: self.answers = yaml.safe_load(opts.answers.read()) log.debug("Loaded answers %s", self.answers) if not opts.dry_run: open('/run/casper-no-prompt', 'w').close() # Set rich_mode to the opposite of what we want, so we can # call toggle_rich to get the right things set up. self.rich_mode = opts.run_on_serial self.urwid_loop = None self.cur_screen = None self.fg_proc = None def run_command_in_foreground(self, cmd, before_hook=None, after_hook=None, **kw): if self.fg_proc is not None: raise Exception("cannot run two fg processes at once") screen = self.urwid_loop.screen async def _run(): self.fg_proc = proc = await astart_command( cmd, stdin=None, stdout=None, stderr=None, **kw) await proc.communicate() self.fg_proc = None # One of the main use cases for this function is to run interactive # bash in a subshell. Interactive bash of course creates a process # group for itself and sets it as the foreground process group for # the controlling terminal. Usually on exit, our process group # becomes the foreground process group again but when the subshell # is killed for some reason it does not. This causes the tcsetattr # that screen.start() does to either cause SIGTTOU to be sent, or # if that is ignored (either because there is no shell around to do # job control things or we are ignoring it) fail with EIO. So we # force our process group back into the foreground with this # call. That's not quite enough though because tcsetpgrp *also* # causes SIGTTOU to be sent to a background process that calls it, # but fortunately if we ignore that (done in start_urwid below), # the call still succeeds. # # I would now like a drink. os.tcsetpgrp(0, os.getpgrp()) screen.start() if after_hook is not None: after_hook() screen.stop() urwid.emit_signal( screen, urwid.display_common.INPUT_DESCRIPTORS_CHANGED) if before_hook is not None: before_hook() schedule_task(_run()) async def make_view_for_controller(self, new) \ -> Union[BaseView, Callable[[], BaseView]]: new.context.enter("starting UI") if self.opts.screens and new.name not in self.opts.screens: raise Skip try: maybe_view = new.make_ui() if inspect.iscoroutine(maybe_view): view = await maybe_view else: view = maybe_view except Skip: new.context.exit("(skipped)") raise else: self.cur_screen = new return view async def _wait_with_indication(self, awaitable, show, hide=None): """Wait for something but tell the user if it takes a while. When waiting for something that can take an unknown length of time, we want to tell the user if it takes more than a moment (defined as MAX_BLOCK_TIME) but make sure that we display any indication for long enough that the UI is not flickering incomprehensibly (MIN_SHOW_PROGRESS_TIME). """ min_show_task = None def _show(): self.ui.block_input = False nonlocal min_show_task min_show_task = self.aio_loop.create_task( asyncio.sleep(MIN_SHOW_PROGRESS_TIME)) show() self.ui.block_input = True show_handle = self.aio_loop.call_later(MAX_BLOCK_TIME, _show) try: result = await awaitable finally: if min_show_task: await min_show_task if hide is not None: hide() else: self.ui.block_input = False show_handle.cancel() return result def show_progress(self): raise NotImplementedError async def wait_with_text_dialog(self, awaitable, message, *, can_cancel=False): ld = None task_to_cancel = None if can_cancel: if not isinstance(awaitable, asyncio.Task): orig = awaitable async def w(): return await orig awaitable = task_to_cancel = self.aio_loop.create_task(w()) else: task_to_cancel = None def show_load(): nonlocal ld ld = LoadingDialog( self.ui.body, self.aio_loop, message, task_to_cancel) self.ui.body.show_overlay(ld, width=ld.width) def hide_load(): ld.close() return await self._wait_with_indication( awaitable, show_load, hide_load) async def wait_with_progress(self, awaitable): return await self._wait_with_indication(awaitable, self.show_progress) async def _move_screen(self, increment, coro) \ -> Optional[Union[BaseView, Callable[[], BaseView]]]: if coro is not None: await coro old, self.cur_screen = self.cur_screen, None if old is not None: old.context.exit("completed") old.end_ui() cur_index = self.controllers.index while True: self.controllers.index += increment if self.controllers.index < 0: self.controllers.index = cur_index return None if self.controllers.index >= len(self.controllers.instances): self.exit() return None new = self.controllers.cur try: return await self.make_view_for_controller(new) except Skip: log.debug("skipping screen %s", new.name) continue except Exception: self.controllers.index = cur_index raise async def move_screen(self, increment, coro): view_or_callable = await self.wait_with_progress( self._move_screen(increment, coro)) if view_or_callable is not None: if callable(view_or_callable): view = view_or_callable() else: view = view_or_callable self.ui.set_body(view) def next_screen(self, coro=None): self.aio_loop.create_task(self.move_screen(1, coro)) def prev_screen(self): self.aio_loop.create_task(self.move_screen(-1, None)) def select_initial_screen(self): self.next_screen() def run_scripts(self, scripts): # run_scripts runs (or rather arranges to run, it's all async) # a series of python snippets in a helpful namespace. This is # all in aid of being able to test some part of the UI without # having to click the same buttons over and over again to get # the UI to the part you are working on. # # In the namespace are: # * everything from view_helpers # * wait, delay execution of subsequent scripts for a while # * c, a function that finds a button and clicks it. uses # wait, above to wait for the button to appear in case it # takes a while. from subiquitycore.testing import view_helpers class ScriptState: def __init__(self): self.ns = view_helpers.__dict__.copy() self.waiting = False self.wait_count = 0 self.scripts = scripts ss = ScriptState() def _run_script(): log.debug("running %s", ss.scripts[0]) exec(ss.scripts[0], ss.ns) if ss.waiting: return ss.scripts = ss.scripts[1:] if ss.scripts: self.aio_loop.call_soon(_run_script) def c(pat): but = view_helpers.find_button_matching(self.ui, '.*' + pat + '.*') if not but: ss.wait_count += 1 if ss.wait_count > 10: raise Exception("no button found matching %r after" "waiting for 10 secs" % pat) wait(1, func=lambda: c(pat)) return ss.wait_count = 0 view_helpers.click(but) def wait(delay, func=None): ss.waiting = True def next(): ss.waiting = False if func is not None: func() if not ss.waiting: ss.scripts = ss.scripts[1:] if ss.scripts: _run_script() self.aio_loop.call_later(delay, next) ss.ns['c'] = c ss.ns['wait'] = wait ss.ns['ui'] = self.ui self.aio_loop.call_later(0.06, _run_script) def set_rich(self, rich): if rich == self.rich_mode: return self.toggle_rich() def toggle_rich(self): if self.rich_mode: urwid.util.set_encoding('ascii') new_palette = PALETTE_MONO self.rich_mode = False else: urwid.util.set_encoding('utf-8') new_palette = PALETTE_COLOR self.rich_mode = True urwid.CanvasCache.clear() self.urwid_loop.screen.register_palette(new_palette) self.urwid_loop.screen.clear() def unhandled_input(self, key): if self.opts.dry_run and key == 'ctrl x': self.exit() elif key == 'f3': self.urwid_loop.screen.clear() elif self.opts.run_on_serial and key in ['ctrl t', 'f4']: self.toggle_rich() def extra_urwid_loop_args(self): return {} def make_screen(self, inputf=None, outputf=None): return make_screen(self.opts.ascii, inputf, outputf) def start_urwid(self, input=None, output=None): # This stops the tcsetpgrp call in run_command_in_foreground from # suspending us. See the rant there for more details. signal.signal(signal.SIGTTOU, signal.SIG_IGN) screen = self.make_screen(input, output) screen.register_palette(PALETTE_COLOR) self.urwid_loop = urwid.MainLoop( self.ui, screen=screen, handle_mouse=False, pop_ups=True, unhandled_input=self.unhandled_input, event_loop=urwid.AsyncioEventLoop(loop=self.aio_loop), **self.extra_urwid_loop_args() ) extend_dec_special_charmap() self.toggle_rich() self.urwid_loop.start() self.select_initial_screen() async def start(self, start_urwid=True): await super().start() if start_urwid: self.start_urwid() def run(self): if self.opts.scripts: self.run_scripts(self.opts.scripts) try: super().run() finally: if self.urwid_loop is not None: self.urwid_loop.stop()