Merge pull request #516 from mwhudson/core-cleanups

clean up how data is passed around at the centre of subiquity
This commit is contained in:
Michael Hudson-Doyle 2019-08-16 14:51:22 +12:00 committed by GitHub
commit 384f228b99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 167 additions and 192 deletions

View File

@ -194,9 +194,9 @@ def write_login_details_standalone():
class IdentityController(BaseController):
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.identity
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.identity
def default(self):
footer = ""
@ -240,7 +240,7 @@ class IdentityController(BaseController):
login_details_path = '/run/console-conf/login-details.txt'
self.model.add_user(result)
ips = []
net_model = self.controllers['Network'].model
net_model = self.app.controller_instances['Network'].model
for dev in net_model.get_all_netdevs():
ips.extend(dev.actual_global_ip_addresses)
with open(login_details_path, 'w') as fp:
@ -258,7 +258,7 @@ class IdentityController(BaseController):
self.ui.set_header(title)
self.ui.set_footer(footer)
net_model = self.controllers['Network'].model
net_model = self.app.controller_instances['Network'].model
ifaces = net_model.get_all_netdevs()
login_view = LoginView(self.opts, self.model, self, ifaces)

View File

@ -6,6 +6,6 @@ from .identity import IdentityModel
class ConsoleConfModel:
"""The overall model for console-conf."""
def __init__(self, common):
def __init__(self):
self.network = NetworkModel(support_wlan=True)
self.identity = IdentityModel()

View File

@ -56,14 +56,12 @@ class ProbeState(enum.IntEnum):
class FilesystemController(BaseController):
def __init__(self, common):
super().__init__(common)
self.block_log_dir = common.get('block_log_dir')
self.model = self.base_model.filesystem
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.filesystem
if self.opts.dry_run and self.opts.bootloader:
name = self.opts.bootloader.upper()
self.model.bootloader = getattr(Bootloader, name)
self.answers = self.all_answers.get("Filesystem", {})
self.answers.setdefault('guided', False)
self.answers.setdefault('guided-index', 0)
self.answers.setdefault('manual', [])
@ -78,7 +76,7 @@ class FilesystemController(BaseController):
5.0, lambda loop, ud: self._check_probe_timeout())
def _bg_probe(self, probe_types=None):
return self.prober.get_storage(probe_types=probe_types)
return self.app.prober.get_storage(probe_types=probe_types)
def _probed(self, fut, restricted=False):
if not restricted and self._probe_state != ProbeState.PROBING:
@ -91,7 +89,7 @@ class FilesystemController(BaseController):
fname = 'probe-data-restricted.json'
else:
fname = 'probe-data.json'
with open(os.path.join(self.block_log_dir, fname), 'w') as fp:
with open(os.path.join(self.app.block_log_dir, fname), 'w') as fp:
json.dump(storage, fp, indent=4)
self.model.load_probe_data(storage)
except Exception:

View File

@ -24,10 +24,9 @@ log = logging.getLogger('subiquity.controllers.identity')
class IdentityController(BaseController):
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.identity
self.answers = self.all_answers.get('Identity', {})
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.identity
def default(self):
self.ui.set_body(IdentityView(self.model, self))

View File

@ -205,9 +205,9 @@ class InstallProgressController(BaseController):
('installprogress:snap-config-done', 'snap_config_done'),
]
def __init__(self, common):
super().__init__(common)
self.answers = self.all_answers.get('InstallProgress', {})
def __init__(self, app):
super().__init__(app)
self.model = app.base_model
self.answers.setdefault('reboot', False)
self.progress_view = None
self.progress_view_showing = False
@ -225,7 +225,7 @@ class InstallProgressController(BaseController):
self.sm = None
def tpath(self, *path):
return os.path.join(self.base_model.target, *path)
return os.path.join(self.model.target, *path)
def filesystem_config_done(self):
self.curtin_start_install()
@ -335,7 +335,7 @@ class InstallProgressController(BaseController):
ident = self._event_syslog_identifier
self._write_config(config_location,
self.base_model.render(syslog_identifier=ident))
self.model.render(syslog_identifier=ident))
return curtin_cmd
@ -383,13 +383,13 @@ class InstallProgressController(BaseController):
pass
def start_postinstall_configuration(self):
has_network = self.base_model.network.has_network
has_network = self.model.network.has_network
def filter_task(func):
if func._extra.get('net_only') and not has_network:
return False
if func._name == 'install_openssh' \
and not self.base_model.ssh.install_server:
and not self.model.ssh.install_server:
return False
return True
@ -411,12 +411,12 @@ class InstallProgressController(BaseController):
@task(label="configuring cloud-init")
def _bg_configure_cloud_init(self):
self.base_model.configure_cloud_init()
self.model.configure_cloud_init()
@task(label="installing openssh")
def _bg_install_openssh(self):
if self.opts.dry_run:
cmd = ["sleep", str(2/self.scale_factor)]
cmd = ["sleep", str(2/self.app.scale_factor)]
else:
cmd = [
sys.executable, "-m", "curtin", "system-install", "-t",
@ -428,12 +428,12 @@ class InstallProgressController(BaseController):
@task(label="restoring apt configuration")
def _bg_restore_apt_config(self):
if self.opts.dry_run:
cmds = [["sleep", str(1/self.scale_factor)]]
cmds = [["sleep", str(1/self.app.scale_factor)]]
else:
cmds = [
["umount", self.tpath('etc/apt')],
]
if self.base_model.network.has_network:
if self.model.network.has_network:
cmds.append([
sys.executable, "-m", "curtin", "in-target", "-t",
"/target", "--", "apt-get", "update",
@ -460,7 +460,8 @@ class InstallProgressController(BaseController):
net_only=True)
def _bg_run_uu(self):
if self.opts.dry_run:
self.uu = utils.start_command(["sleep", str(10/self.scale_factor)])
self.uu = utils.start_command([
"sleep", str(10/self.app.scale_factor)])
self.uu.wait()
else:
self._bg_run_command_logged([

View File

@ -29,10 +29,9 @@ class KeyboardController(BaseController):
('l10n:language-selected', 'language_selected'),
]
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.keyboard
self.answers = self.all_answers.get("Keyboard", {})
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.keyboard
def language_selected(self, code):
log.debug("language_selected %s", code)

View File

@ -37,11 +37,10 @@ class MirrorController(BaseController):
('snapd-network-change', 'snapd_network_changed'),
]
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.mirror
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.mirror
self.check_state = CheckState.NOT_STARTED
self.answers = self.all_answers.get('Mirror', {})
if 'country-code' in self.answers:
self.check_state = CheckState.DONE
self.model.set_country(self.answers['country-code'])

View File

@ -25,10 +25,9 @@ log = logging.getLogger('subiquity.controllers.proxy')
class ProxyController(BaseController):
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.proxy
self.answers = self.all_answers.get('Proxy', {})
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.proxy
def default(self):
self.ui.set_body(ProxyView(self.model, self))

View File

@ -50,8 +50,8 @@ class RefreshController(BaseController):
('snapd-network-change', 'snapd_network_changed'),
]
def __init__(self, common):
super().__init__(common)
def __init__(self, app):
super().__init__(app)
self.snap_name = os.environ.get("SNAP_NAME", "subiquity")
self.check_state = CheckState.NOT_STARTED
self.switch_state = SwitchState.NOT_STARTED
@ -62,14 +62,13 @@ class RefreshController(BaseController):
self.view = None
self.offered_first_time = False
self.answers = self.all_answers.get("Refresh", {})
def start(self):
self.switch_state = SwitchState.SWITCHING
self.run_in_bg(self._bg_get_snap_details, self._got_snap_details)
def _bg_get_snap_details(self):
return self.snapd_connection.get(
return self.app.snapd_connection.get(
'v2/snaps/{snap_name}'.format(snap_name=self.snap_name))
def _got_snap_details(self, fut):
@ -121,7 +120,7 @@ class RefreshController(BaseController):
def _bg_switch_snap(self, channel):
log.debug("switching %s to %s", self.snap_name, channel)
try:
response = self.snapd_connection.post(
response = self.app.snapd_connection.post(
'v2/snaps/{}'.format(self.snap_name),
{'action': 'switch', 'channel': channel})
response.raise_for_status()
@ -131,7 +130,7 @@ class RefreshController(BaseController):
change = response.json()["change"]
while True:
try:
response = self.snapd_connection.get(
response = self.app.snapd_connection.get(
'v2/changes/{}'.format(change))
response.raise_for_status()
except requests.exceptions.RequestException:
@ -165,7 +164,7 @@ class RefreshController(BaseController):
if self.network_state == "down":
return
# If we restarted into this version, don't check for a new version.
if self.updated:
if self.app.updated:
return
# If we got an answer, don't check again.
if self.check_state.is_definite():
@ -174,7 +173,7 @@ class RefreshController(BaseController):
self.run_in_bg(self._bg_check_for_update, self._check_result)
def _bg_check_for_update(self):
return self.snapd_connection.get('v2/find', select='refresh')
return self.app.snapd_connection.get('v2/find', select='refresh')
def _check_result(self, fut):
# If we managed to send concurrent requests and one has
@ -206,14 +205,14 @@ class RefreshController(BaseController):
self.view.update_check_state()
def start_update(self, callback):
update_marker = os.path.join(self.application.state_dir, 'updating')
update_marker = os.path.join(self.app.state_dir, 'updating')
open(update_marker, 'w').close()
self.run_in_bg(
self._bg_start_update,
lambda fut: self.update_started(fut, callback))
def _bg_start_update(self):
return self.snapd_connection.post(
return self.app.snapd_connection.post(
'v2/snaps/{}'.format(self.snap_name), {'action': 'refresh'})
def update_started(self, fut, callback):
@ -235,7 +234,7 @@ class RefreshController(BaseController):
lambda fut: self.got_progress(fut, callback))
def _bg_get_progress(self, change):
return self.snapd_connection.get('v2/changes/{}'.format(change))
return self.app.snapd_connection.get('v2/changes/{}'.format(change))
def got_progress(self, fut, callback):
try:
@ -251,7 +250,7 @@ class RefreshController(BaseController):
def default(self, index=1):
from subiquity.ui.views.refresh import RefreshView
if self.updated:
if self.app.updated:
raise Skip()
show = False
if index == 1:

View File

@ -136,14 +136,13 @@ class SnapListController(BaseController):
def _make_loader(self):
return SnapdSnapInfoLoader(
self.model, self.run_in_bg, self.snapd_connection,
self.model, self.run_in_bg, self.app.snapd_connection,
self.opts.snap_section)
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.snaplist
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.snaplist
self.loader = self._make_loader()
self.answers = self.all_answers.get('SnapList', {})
def snapd_network_changed(self):
# If the loader managed to load the list of snaps, the
@ -156,7 +155,7 @@ class SnapListController(BaseController):
self.loader.start()
def default(self):
if self.loader.failed or not self.base_model.network.has_network:
if self.loader.failed or not self.app.base_model.network.has_network:
# If loading snaps failed or the network is disabled, skip the
# screen.
self.signal.emit_signal("installprogress:snap-config-done")

View File

@ -31,10 +31,9 @@ class FetchSSHKeysFailure(Exception):
class SSHController(BaseController):
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.ssh
self.answers = self.all_answers.get('SSH', {})
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.ssh
def default(self):
self.ui.set_body(SSHView(self.model, self))
@ -45,8 +44,8 @@ class SSHController(BaseController):
"pwauth": self.answers.get("pwauth", True),
}
self.done(d)
elif 'ssh-import-id' in self.all_answers.get('Identity', {}):
import_id = self.all_answers['Identity']['ssh-import-id']
elif 'ssh-import-id' in self.app.answers.get('Identity', {}):
import_id = self.app.answers['Identity']['ssh-import-id']
d = {
"install_server": True,
"pwauth": True,
@ -107,7 +106,7 @@ class SSHController(BaseController):
# Happens if the fetch is cancelled.
return
user_spec, ssh_import_id, key_material, fingerprints = result
if 'ssh-import-id' in self.all_answers.get("Identity", {}):
if 'ssh-import-id' in self.app.answers.get("Identity", {}):
user_spec['authorized_keys'] = key_material.splitlines()
self.loop.set_alarm_in(0.0,
lambda loop, ud: self.done(user_spec))

View File

@ -13,7 +13,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import defaultdict
import unittest
from subiquity.controllers.filesystem import (
@ -34,17 +33,19 @@ class Thing:
pass
def make_controller(bootloader=None):
common = defaultdict(type(None))
bm = Thing()
bm.filesystem = make_model(bootloader)
common['base_model'] = bm
common['answers'] = {}
class MiniApplication:
ui = signal = loop = run_in_bg = None
answers = {}
opts = Thing()
opts.dry_run = True
opts.bootloader = None
common['opts'] = opts
controller = FilesystemController(common)
def make_controller(bootloader=None):
app = MiniApplication()
app.base_model = bm = Thing()
bm.filesystem = make_model(bootloader)
controller = FilesystemController(app)
return controller

View File

@ -26,10 +26,9 @@ log = logging.getLogger('subiquity.controllers.welcome')
class WelcomeController(BaseController):
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.locale
self.answers = self.all_answers.get("Welcome", {})
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.locale
log.debug("Welcome: answers=%s", self.answers)
def start(self):

View File

@ -629,9 +629,8 @@ class ZdevInfo:
class ZdevController(BaseController):
def __init__(self, common):
super().__init__(common)
self.answers = self.all_answers.get('Zdev', {})
def __init__(self, app):
super().__init__(app)
if self.opts.dry_run:
if platform.machine() == 's390x':
zdevinfos = self.lszdev()

View File

@ -37,11 +37,11 @@ class Subiquity(Application):
project = "subiquity"
def make_model(self, common):
def make_model(self):
root = '/'
if common['opts'].dry_run:
if self.opts.dry_run:
root = os.path.abspath('.subiquity')
return SubiquityModel(root, common['opts'].sources)
return SubiquityModel(root, self.opts.sources)
controllers = [
"Welcome",
@ -64,8 +64,8 @@ class Subiquity(Application):
self.controllers.remove("Zdev")
super().__init__(ui, opts)
self.common['ui'].progress_completion += 1
self.common['block_log_dir'] = block_log_dir
self.ui.progress_completion += 1
self.block_log_dir = block_log_dir
if opts.snaps_from_examples:
connection = FakeSnapdConnection(
os.path.join(
@ -74,22 +74,19 @@ class Subiquity(Application):
"examples", "snaps"))
else:
connection = SnapdConnection(self.root, self.snapd_socket_path)
self.common['snapd_connection'] = connection
signal = self.common['signal']
signal.connect_signals([
self.snapd_connection = connection
self.signal.connect_signals([
('network-proxy-set', self._proxy_set),
('network-change', self._network_change),
])
def _network_change(self):
self.common['signal'].emit_signal('snapd-network-change')
self.signal.emit_signal('snapd-network-change')
def _proxy_set(self):
proxy_model = self.common['base_model'].proxy
signal = self.common['signal']
conn = self.common['snapd_connection']
self.run_in_bg(
lambda: conn.configure_proxy(proxy_model),
lambda: self.snapd_connection.configure_proxy(
self.base_model.proxy),
lambda fut: (
fut.result(), signal.emit_signal('snapd-network-change')),
fut.result(), self.signal.emit_signal('snapd-network-change')),
)

View File

@ -82,7 +82,8 @@ class KeyboardSetting:
@classmethod
def from_config_file(cls, config_file):
content = open(config_file).read()
with open(config_file) as fp:
content = fp.read()
def optval(opt, default):
match = re.search(r'(?m)^\s*%s=(.*)$' % (opt,), content)

View File

@ -167,7 +167,7 @@ class AutoDetectPressKey(AutoDetectBase):
@property
def input_filter(self):
return self.keyboard_detector.keyboard_view.controller.input_filter
return self.keyboard_detector.keyboard_view.controller.app.input_filter
def start(self):
self.input_filter.enter_keycodes_mode()

View File

@ -25,23 +25,18 @@ class BaseController(ABC):
signals = []
def __init__(self, common):
self.ui = common['ui']
self.signal = common['signal']
self.opts = common['opts']
self.loop = common['loop']
self.prober = common['prober']
self.controllers = common['controllers']
self.pool = common['pool']
self.base_model = common['base_model']
self.all_answers = common['answers']
self.input_filter = common['input_filter']
self.scale_factor = common['scale_factor']
self.run_in_bg = common['run_in_bg']
self.updated = common['updated']
self.application = common['application']
if 'snapd_connection' in common:
self.snapd_connection = common['snapd_connection']
@classmethod
def _controller_name(cls):
return cls.__name__[:-len("Controller")]
def __init__(self, app):
self.ui = app.ui
self.signal = app.signal
self.opts = app.opts
self.loop = app.loop
self.run_in_bg = app.run_in_bg
self.app = app
self.answers = app.answers.get(self._controller_name(), {})
def register_signals(self):
"""Defines signals associated with controller from model."""
@ -99,7 +94,7 @@ class BaseController(ABC):
def _run_iterator(self, it, delay=None):
if delay is None:
delay = 0.2/self.scale_factor
delay = 0.2/self.app.scale_factor
try:
next(it)
except StopIteration:

View File

@ -183,10 +183,9 @@ class NetworkController(BaseController):
root = "/"
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.network
self.answers = self.all_answers.get("Network", {})
def __init__(self, app):
super().__init__(app)
self.model = app.base_model.network
self.view = None
self.view_shown = False
self.dhcp_check_handle = None
@ -213,7 +212,7 @@ class NetworkController(BaseController):
def start(self):
self._observer_handles = []
self.observer, self._observer_fds = (
self.prober.probe_network(self.network_event_receiver))
self.app.prober.probe_network(self.network_event_receiver))
self.start_watching()
def stop_watching(self):
@ -402,7 +401,7 @@ class NetworkController(BaseController):
self.model.parse_netplan_configs(self.root)
if self.opts.dry_run:
delay = 0.1/self.scale_factor
delay = 0.1/self.app.scale_factor
tasks = [
('one', BackgroundProcess(['sleep', str(delay)])),
('two', PythonSleep(delay)),

View File

@ -238,6 +238,8 @@ class Application:
log.exception(err)
raise ApplicationError(err)
self.ui = ui
self.opts = opts
opts.project = self.project
self.root = '/'
@ -246,42 +248,33 @@ class Application:
self.state_dir = os.path.join(self.root, 'run', self.project)
os.makedirs(os.path.join(self.state_dir, 'states'), exist_ok=True)
answers = {}
self.answers = {}
if opts.answers is not None:
answers = yaml.safe_load(opts.answers.read())
log.debug("Loaded answers %s", answers)
self.answers = yaml.safe_load(opts.answers.read())
log.debug("Loaded answers %s", self.answers)
if not opts.dry_run:
open('/run/casper-no-prompt', 'w').close()
if is_linux_tty():
log.debug("is_linux_tty")
input_filter = KeyCodesFilter()
self.input_filter = KeyCodesFilter()
else:
input_filter = DummyKeycodesFilter()
self.input_filter = DummyKeycodesFilter()
scale = float(os.environ.get('SUBIQUITY_REPLAY_TIMESCALE', "1"))
updated = os.path.exists(os.path.join(self.state_dir, 'updating'))
self.common = {
"application": self,
"updated": updated,
"ui": ui,
"opts": opts,
"signal": Signal(),
"prober": prober,
"loop": None,
"pool": futures.ThreadPoolExecutor(10),
"answers": answers,
"input_filter": input_filter,
"scale_factor": scale,
"run_in_bg": self.run_in_bg,
}
self.scale_factor = float(
os.environ.get('SUBIQUITY_REPLAY_TIMESCALE', "1"))
self.updated = os.path.exists(os.path.join(self.state_dir, 'updating'))
self.signal = Signal()
self.prober = prober
self.loop = None
self.pool = futures.ThreadPoolExecutor(10)
if opts.screens:
self.controllers = [c for c in self.controllers
if c in opts.screens]
else:
self.controllers = self.controllers[:]
ui.progress_completion = len(self.controllers)
self.common['controllers'] = dict.fromkeys(self.controllers)
self.controller_instances = dict.fromkeys(self.controllers)
self.controller_index = -1
def run_in_bg(self, func, callback):
@ -291,12 +284,12 @@ class Application:
the result of func(). The result of callback is discarded. An
exception will crash the process so be careful!
"""
fut = self.common['pool'].submit(func)
fut = self.pool.submit(func)
def in_main_thread(ignored):
callback(fut)
pipe = self.common['loop'].watch_pipe(in_main_thread)
pipe = self.loop.watch_pipe(in_main_thread)
def in_random_thread(ignored):
os.write(pipe, b'x')
@ -308,23 +301,23 @@ class Application:
signals = []
signals.append(('quit', self.exit))
if self.common['opts'].dry_run:
if self.opts.dry_run:
signals.append(('control-x-quit', self.exit))
signals.append(('refresh', self.redraw_screen))
signals.append(('next-screen', self.next_screen))
signals.append(('prev-screen', self.prev_screen))
self.common['signal'].connect_signals(signals)
self.signal.connect_signals(signals)
# Registers signals from each controller
for controller, controller_class in self.common['controllers'].items():
for controller_class in self.controller_instances.values():
controller_class.register_signals()
log.debug(self.common['signal'])
log.debug(self.signal)
def save_state(self):
if self.controller_index < 0:
return
cur_controller_name = self.controllers[self.controller_index]
cur_controller = self.common['controllers'][cur_controller_name]
cur_controller = self.controller_instances[cur_controller_name]
state_path = os.path.join(
self.state_dir, 'states', cur_controller_name)
with open(state_path, 'w') as fp:
@ -332,10 +325,10 @@ class Application:
def select_screen(self, index):
self.controller_index = index
self.common['ui'].progress_current = index
self.ui.progress_current = index
controller_name = self.controllers[self.controller_index]
log.debug("moving to screen %s", controller_name)
controller = self.common['controllers'][controller_name]
controller = self.controller_instances[controller_name]
controller.default()
state_path = os.path.join(self.state_dir, 'last-screen')
with open(state_path, 'w') as fp:
@ -371,9 +364,9 @@ class Application:
# EventLoop -------------------------------------------------------------------
def redraw_screen(self):
if self.common['loop'] is not None:
if self.loop is not None:
try:
self.common['loop'].draw_screen()
self.loop.draw_screen()
except AssertionError as e:
log.critical("Redraw screen error: {}".format(e))
@ -394,7 +387,6 @@ class Application:
# wait, above to wait for the button to appear in case it
# takes a while.
from subiquitycore.testing import view_helpers
loop = self.common['loop']
class ScriptState:
def __init__(self):
@ -412,11 +404,10 @@ class Application:
return
ss.scripts = ss.scripts[1:]
if ss.scripts:
loop.set_alarm_in(0.01, _run_script)
self.loop.set_alarm_in(0.01, _run_script)
def c(pat):
but = view_helpers.find_button_matching(self.common['ui'],
'.*' + pat + '.*')
but = view_helpers.find_button_matching(self.ui, '.*' + pat + '.*')
if not but:
ss.wait_count += 1
if ss.wait_count > 10:
@ -438,62 +429,66 @@ class Application:
ss.scripts = ss.scripts[1:]
if ss.scripts:
_run_script()
loop.set_alarm_in(delay, next)
self.loop.set_alarm_in(delay, next)
ss.ns['c'] = c
ss.ns['wait'] = wait
ss.ns['ui'] = self.common['ui']
ss.ns['ui'] = self.ui
self.common['loop'].set_alarm_in(0.06, _run_script)
self.loop.set_alarm_in(0.06, _run_script)
def unhandled_input(self, key):
if key == 'ctrl x':
self.signal.emit_signal('control-x-quit')
def run(self):
if not hasattr(self, 'loop'):
if (self.common['opts'].run_on_serial and
os.ttyname(0) != "/dev/ttysclp0"):
palette = self.STYLES_MONO
screen = urwid.raw_display.Screen()
else:
screen, palette = setup_screen(self.COLORS, self.STYLES)
if (self.opts.run_on_serial and
os.ttyname(0) != "/dev/ttysclp0"):
palette = self.STYLES_MONO
screen = urwid.raw_display.Screen()
else:
screen, palette = setup_screen(self.COLORS, self.STYLES)
self.common['loop'] = urwid.MainLoop(
self.common['ui'], palette=palette, screen=screen,
handle_mouse=False, pop_ups=True,
input_filter=self.common['input_filter'].filter)
self.loop = urwid.MainLoop(
self.ui, palette=palette, screen=screen,
handle_mouse=False, pop_ups=True,
input_filter=self.input_filter.filter,
unhandled_input=self.unhandled_input)
log.debug("Running event loop: {}".format(
self.common['loop'].event_loop))
self.common['base_model'] = self.make_model(self.common)
log.debug("Running event loop: {}".format(
self.loop.event_loop))
self.base_model = self.make_model()
try:
if self.common['opts'].scripts:
self.run_scripts(self.common['opts'].scripts)
if self.opts.scripts:
self.run_scripts(self.opts.scripts)
controllers_mod = __import__('%s.controllers' % self.project,
None, None, [''])
for i, k in enumerate(self.controllers):
if self.common['controllers'][k] is None:
if self.controller_instances[k] is None:
log.debug("Importing controller: {}".format(k))
klass = getattr(controllers_mod, k+"Controller")
self.common['controllers'][k] = klass(self.common)
self.controller_instances[k] = klass(self)
else:
count = 1
for k2 in self.controllers[:i]:
if k2 == k or k2.startswith(k + '-'):
count += 1
orig = self.common['controllers'][k]
orig = self.controller_instances[k]
k += '-' + str(count)
self.controllers[i] = k
self.common['controllers'][k] = RepeatedController(
self.controller_instances[k] = RepeatedController(
orig, count)
log.debug("*** %s", self.common['controllers'])
log.debug("*** %s", self.controller_instances)
initial_controller_index = 0
if self.common['updated']:
if self.updated:
for k in self.controllers:
state_path = os.path.join(self.state_dir, 'states', k)
if not os.path.exists(state_path):
continue
with open(state_path) as fp:
self.common['controllers'][k].deserialize(
self.controller_instances[k].deserialize(
json.load(fp))
last_screen = None
@ -512,16 +507,16 @@ class Application:
except Skip:
self.next_screen()
self.common['loop'].set_alarm_in(
self.loop.set_alarm_in(
0.00, lambda loop, ud: tty.setraw(0))
self.common['loop'].set_alarm_in(
self.loop.set_alarm_in(
0.05, select_initial_screen, initial_controller_index)
self._connect_base_signals()
for k in self.controllers:
self.common['controllers'][k].start()
self.controller_instances[k].start()
self.common['loop'].run()
self.loop.run()
except Exception:
log.exception("Exception in controller.run():")
raise

View File

@ -70,9 +70,6 @@ class BaseView(WidgetWrap):
pass
def keypress(self, size, key):
if key in ['ctrl x']:
self.controller.signal.emit_signal('control-x-quit')
return None
key = super().keypress(size, key)
if key == 'esc':
if hasattr(self._w, 'bottom_w'):