Merge branch 'master' into maas-option

This commit is contained in:
Michael Hudson-Doyle 2018-02-12 12:40:17 +13:00
commit 0355f2c316
16 changed files with 34473 additions and 123 deletions

View File

@ -1,5 +1,7 @@
Welcome:
lang: en_US
Keyboard:
layout: us
Network:
accept-default: yes
Filesystem:

33600
kbdnames.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -25,16 +25,18 @@ parts:
- lsb-release
- pkg-config
- python3-distutils-extra
- python3-urwid
stage-packages:
- curtin
- libsystemd0
- iso-codes
- lsb-release
- python3-distutils-extra
- python3-urwid
python-packages:
- attrs
- pyyaml
- systemd-python
- urwid
#- urwid
source: .
source-type: git
wrappers:
@ -47,6 +49,8 @@ parts:
'bin/subiquity-loadkeys': usr/bin/subiquity-loadkeys
'bin/curtin-journald-forwarder': usr/bin/curtin-journald-forwarder
'bin/started': usr/bin/started
stage:
- usr/bin
prime:
- usr/bin
users-and-groups:
@ -59,6 +63,15 @@ parts:
cut -d ' ' -f 2- > users-and-groups
stage:
- users-and-groups
kbdnames:
plugin: dump
build-packages:
- console-setup
- xkb-data-i18n
prepare: |
/usr/share/console-setup/kbdnames-maker /usr/share/console-setup/KeyboardNames.pl > kbdnames.txt
stage:
- kbdnames.txt
probert:
plugin: python
build-packages: [python-setuptools, libnl-3-dev, libnl-genl-3-dev, libnl-route-3-dev]

View File

@ -20,4 +20,5 @@ from .identity import IdentityController # NOQA
from .installpath import InstallpathController # NOQA
from .installprogress import InstallProgressController # NOQA
from .filesystem import FilesystemController # NOQA
from .keyboard import KeyboardController # NOQA
from .welcome import WelcomeController # NOQA

View File

@ -17,7 +17,6 @@ import datetime
import fcntl
import logging
import os
import random
import subprocess
import sys
@ -38,15 +37,13 @@ from subiquity.ui.views import ProgressView
log = logging.getLogger("subiquitycore.controller.installprogress")
TARGET = '/target'
class InstallState:
NOT_STARTED = 0
RUNNING_INSTALL = 1
DONE_INSTALL = 2
RUNNING_POSTINSTALL = 3
DONE_POSTINSTALL = 4
ERROR_INSTALL = -1
ERROR_POSTINSTALL = -2
RUNNING = 1
DONE = 2
ERROR = -1
class InstallProgressController(BaseController):
@ -61,18 +58,19 @@ class InstallProgressController(BaseController):
self.answers.setdefault('reboot', False)
self.progress_view = None
self.install_state = InstallState.NOT_STARTED
self.postinstall_written = False
self.tail_proc = None
self.journald_forwarder_proc = None
self.curtin_event_stack = []
self._identity_config_done = False
def filesystem_config_done(self):
self.curtin_start_install()
def identity_config_done(self):
self.postinstall_written = True
if self.install_state == InstallState.DONE_INSTALL:
self.curtin_start_postinstall()
if self.install_state == InstallState.DONE:
self.curtin_configure_cloud_init()
else:
self._identity_config_done = True
def curtin_error(self):
log.debug('curtin_error')
@ -85,18 +83,16 @@ class InstallProgressController(BaseController):
else:
self.default()
def run_command_logged(self, cmd, logfile_location):
def run_command_logged(self, cmd, logfile_location, env):
with open(logfile_location, 'wb', buffering=0) as logfile:
log.debug("running %s", cmd)
cp = subprocess.run(
cmd, stdout=logfile, stderr=subprocess.STDOUT, stdin=subprocess.DEVNULL)
cmd, env=env, stdout=logfile, stderr=subprocess.STDOUT, stdin=subprocess.DEVNULL)
log.debug("completed %s", cmd)
return cp.returncode
def curtin_event(self, event):
event_type = event.get("CURTIN_EVENT_TYPE")
#if random.randrange(1000) == 0 or len(event) > 0:
# log.debug("got curtin event from journald: %r", event)
if event_type not in ['start', 'finish']:
return
if event_type == 'start':
@ -130,86 +126,63 @@ class InstallProgressController(BaseController):
conf.write(datestr)
conf.write(yaml.dump(config))
def _get_curtin_command(self, install_step):
config_file_name = 'subiquity-curtin-%s.conf' % (install_step,)
def _get_curtin_command(self):
config_file_name = 'subiquity-curtin-install.conf'
if self.opts.dry_run:
config_location = os.path.join('.subiquity/', config_file_name)
log.debug("Installprogress: this is a dry-run")
config_location = os.path.join('.subiquity/', config_file_name)
curtin_cmd = [
"python3", "scripts/replay-curtin-log.py",
self.reporting_url, "examples/curtin-events-%s.json" % (install_step,),
self.reporting_url, "examples/curtin-events.json",
]
else:
config_location = os.path.join('/tmp', config_file_name)
log.debug("Installprogress: this is the *REAL* thing")
configs = [config_location]
curtin_cmd = curtin_install_cmd(configs)
config_location = os.path.join('/var/log/installer', config_file_name)
curtin_cmd = curtin_install_cmd(config_location)
self._write_config(
config_location,
self.base_model.render(
install_step=install_step, reporting_url=self.reporting_url))
self.base_model.render(target=TARGET, reporting_url=self.reporting_url))
return curtin_cmd
def curtin_start_install(self):
log.debug('Curtin Install: calling curtin with '
'storage/net/postinstall config')
self.install_state = InstallState.RUNNING_INSTALL
log.debug('Curtin Install: starting curtin')
self.install_state = InstallState.RUNNING
self.start_journald_forwarder()
self.start_journald_listener("curtin_event", self.curtin_event)
curtin_cmd = self._get_curtin_command("install")
curtin_cmd = self._get_curtin_command()
log.debug('Curtin install cmd: {}'.format(curtin_cmd))
self.run_in_bg(lambda: self.run_command_logged(curtin_cmd, CURTIN_INSTALL_LOG), self.curtin_install_completed)
env = os.environ.copy()
if 'SNAP' in env:
del env['SNAP']
self.run_in_bg(
lambda: self.run_command_logged(curtin_cmd, CURTIN_INSTALL_LOG, env),
self.curtin_install_completed)
def curtin_install_completed(self, fut):
returncode = fut.result()
log.debug('curtin_install: returncode: {}'.format(returncode))
self.stop_tail_proc()
if returncode > 0:
self.install_state = InstallState.ERROR_INSTALL
self.install_state = InstallState.ERROR
self.curtin_error()
return
self.install_state = InstallState.DONE_INSTALL
self.install_state = InstallState.DONE
log.debug('After curtin install OK')
if self.postinstall_written:
self.curtin_start_postinstall()
if self._identity_config_done:
self.loop.set_alarm_in(0.01, lambda loop, userdata: self.curtin_configure_cloud_init())
def cancel(self):
pass
def curtin_start_postinstall(self):
log.debug('Curtin Post Install: calling curtin '
'with postinstall config')
if not self.postinstall_written:
log.error('Attempting to spawn curtin install without a config')
raise Exception('AIEEE!')
self.install_state = InstallState.RUNNING_POSTINSTALL
if self.progress_view is not None:
self.progress_view.clear_log_tail()
self.progress_view.set_status(_("Running postinstall step"))
self.start_tail_proc()
curtin_cmd = self._get_curtin_command("postinstall")
log.debug('Curtin postinstall cmd: {}'.format(curtin_cmd))
self.run_in_bg(lambda: self.run_command_logged(curtin_cmd, CURTIN_POSTINSTALL_LOG), self.curtin_postinstall_completed)
def curtin_postinstall_completed(self, fut):
returncode = fut.result()
log.debug('curtin_postinstall: returncode: {}'.format(returncode))
self.stop_tail_proc()
if returncode > 0:
self.install_state = InstallState.ERROR_POSTINSTALL
self.curtin_error()
return
log.debug('After curtin postinstall OK')
self.install_state = InstallState.DONE_POSTINSTALL
def curtin_configure_cloud_init(self):
# If we need to do anything that takes time here (like running
# dpkg-reconfigure maas-rack-controller, for example...) we
# should switch to doing that work in a background thread.
self.base_model.configure_cloud_init(TARGET)
self.ui.progress_current += 1
self.ui.set_header(_("Installation complete!"), "")
self.ui.set_footer("")
@ -235,16 +208,8 @@ class InstallProgressController(BaseController):
log.debug("curtin journald forwarder listening on %s", self.reporting_url)
def start_tail_proc(self):
if self.install_state == InstallState.ERROR_INSTALL:
install_log = CURTIN_INSTALL_LOG
elif self.install_state == InstallState.ERROR_POSTINSTALL:
install_log = CURTIN_POSTINSTALL_LOG
elif self.install_state < InstallState.RUNNING_POSTINSTALL:
install_log = CURTIN_INSTALL_LOG
else:
install_log = CURTIN_POSTINSTALL_LOG
self.progress_view.clear_log_tail()
tail_cmd = ['tail', '-n', '1000', '-F', install_log]
tail_cmd = ['tail', '-n', '1000', '-F', CURTIN_INSTALL_LOG]
log.debug('tail cmd: {}'.format(" ".join(tail_cmd)))
self.tail_proc = utils.run_command_start(tail_cmd)
stdout_fileno = self.tail_proc.stdout.fileno()
@ -264,6 +229,7 @@ class InstallProgressController(BaseController):
log.debug('dry-run enabled, skipping reboot, quiting instead')
self.signal.emit_signal('quit')
else:
# Should probably run curtin -c $CONFIG unmount -t TARGET first.
utils.run_command(["/sbin/reboot"])
def quit(self):
@ -279,14 +245,11 @@ class InstallProgressController(BaseController):
self.ui.set_header(title, excerpt)
self.ui.set_footer(footer)
self.progress_view = ProgressView(self)
self.start_tail_proc()
if self.install_state < 0:
self.curtin_error()
self.ui.set_body(self.progress_view)
return
if self.install_state < InstallState.RUNNING_POSTINSTALL:
self.progress_view.set_status(_("Running install step"))
else:
self.progress_view.set_status(_("Running postinstall step"))
self.ui.set_body(self.progress_view)
self.start_tail_proc()

View File

@ -0,0 +1,67 @@
# Copyright 2015 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from subiquitycore.controller import BaseController
from subiquity.ui.views import KeyboardView
log = logging.getLogger('subiquity.controllers.keyboard')
class KeyboardController(BaseController):
signals = [
('l10n:language-selected', 'language_selected'),
]
def __init__(self, common):
super().__init__(common)
self.model = self.base_model.keyboard
self.answers = self.all_answers.get("Keyboard", {})
def language_selected(self, code):
log.debug("language_selected %s", code)
if not self.model.has_language(code):
code = code.split('_')[0]
if not self.model.has_language(code):
code = 'C'
log.debug("loading launguage %s", code)
self.model.load_language(code)
def default(self):
if self.model.current_lang is None:
self.model.load_language('C')
title = "Keyboard configuration"
if self.opts.run_on_serial:
excerpt = 'Please select the layout of the keyboard directly attached to the system, if any.'
else:
excerpt = 'Please select your keyboard layout below, or select "Identify keyboard" to detect your layout automatically.'
footer = _("Use UP, DOWN and ENTER keys to select your keyboard.")
self.ui.set_header(title, excerpt)
self.ui.set_footer(footer)
view = KeyboardView(self.model, self, self.opts)
self.ui.set_body(view)
if 'layout' in self.answers:
layout = self.answers['layout']
variant = self.answers.get('variant', '')
self.done(layout, variant)
def done(self, layout, variant):
self.model.set_keyboard(layout, variant)
self.signal.emit_signal('next-screen')
def cancel(self):
self.signal.emit_signal('prev-screen')

View File

@ -32,6 +32,7 @@ class Subiquity(Application):
controllers = [
"Welcome",
"Keyboard",
"Installpath",
"Network",
"Filesystem",

View File

@ -54,17 +54,12 @@ def curtin_find_install_path():
return p
def curtin_install_cmd(configs):
def curtin_install_cmd(config):
'''
curtin -vvv --showtrace install -c $CONFIGS cp:///
curtin -vvv --showtrace install -c $CONFIG cp:///
'''
curtin = curtin_find_curtin()
install_path = curtin_find_install_path()
install_cmd = [curtin, '-vvv', '--showtrace']
for c in configs:
install_cmd += ['-c', '{}'.format(c)]
install_cmd += ['install', 'cp://{}'.format(install_path)]
log.info('curtin install command: {}'.format(" ".join(install_cmd)))
return install_cmd
return [
curtin_find_curtin(),
'-vvv', '--showtrace', '-c', config,
'install', 'cp://' + curtin_find_install_path()]

View File

@ -0,0 +1,103 @@
from collections import defaultdict
import gzip
import io
import logging
import os
import re
from subiquitycore.utils import run_command
log = logging.getLogger("subiquity.models.keyboard")
etc_default_keyboard_template = """\
# KEYBOARD CONFIGURATION FILE
# Consult the keyboard(5) manual page.
XKBMODEL="pc105"
XKBLAYOUT="{layout}"
XKBVARIANT="{variant}"
XKBOPTIONS=""
BACKSPACE="guess"
"""
class KeyboardModel:
def __init__(self, root):
self.root = root
self.layout = 'us'
self.variant = ''
self._kbnames_file = os.path.join(os.environ.get("SNAP", '.'), 'kbdnames.txt')
self._clear()
if os.path.exists(self.config_path):
content = open(self.config_path).read()
pat_tmpl = '(?m)^\s*%s=(.*)$'
log.debug("%r", content)
layout_match = re.search(pat_tmpl%("XKBLAYOUT",), content)
if layout_match:
log.debug("%s", layout_match)
self.layout = layout_match.group(1).strip('"')
variant_match = re.search(pat_tmpl%("XKBVARIANT",), content)
if variant_match:
log.debug("%s", variant_match)
self.variant = variant_match.group(1).strip('"')
if self.variant == '':
self.variant = None
@property
def config_path(self):
return os.path.join(self.root, 'etc', 'default', 'keyboard')
@property
def config_content(self):
return etc_default_keyboard_template.format(layout=self.layout, variant=self.variant)
def has_language(self, code):
self.load_language(code)
return bool(self.layouts)
def load_language(self, code):
if code == self.current_lang:
return
self._clear()
with open(self._kbnames_file) as kbdnames:
self._load_file(code, kbdnames)
self.current_lang = code
def _clear(self):
self.current_lang = None
self.layouts = {}
self.variants = defaultdict(dict)
def _load_file(self, code, kbdnames):
for line in kbdnames:
line = line.rstrip('\n')
got_lang, element, name, value = line.split("*", 3)
if got_lang != code:
continue
if element == "layout":
self.layouts[name] = value
elif element == "variant":
variantname, variantdesc = value.split("*", 1)
self.variants[name][variantname] = variantdesc
def lookup(self, code):
if ':' in code:
layout_code, variant_code = code.split(":", 1)
return self.layouts.get(layout_code, '?'), self.variants.get(layout_code).get(variant_code, '?')
else:
return self.layouts.get(code, '?'), None
def set_keyboard(self, layout, variant):
path = self.config_path
os.makedirs(os.path.dirname(path), exist_ok=True)
self.layout = layout
self.variant = variant
with open(path, 'w') as fp:
fp.write(self.config_content)
if self.root == '/':
run_command(['setupcon', '--save', '--force'])

View File

@ -25,6 +25,9 @@ class LocaleModel(object):
XXX Only represents *language* selection for now.
"""
def __init__(self, signal):
self.signal = signal
supported_languages = [
('en_US', 'English'),
('ca_EN', 'Catalan'),
@ -47,6 +50,7 @@ class LocaleModel(object):
def switch_language(self, code):
self.selected_language = code
self.signal.emit_signal('l10n:language-selected', code)
i18n.switch_language(code)
def __repr__(self):

View File

@ -22,6 +22,7 @@ from subiquitycore.models.network import NetworkModel
from .filesystem import FilesystemModel
from .installpath import InstallpathModel
from .keyboard import KeyboardModel
from .locale import LocaleModel
@ -29,7 +30,11 @@ class SubiquityModel:
"""The overall model for subiquity."""
def __init__(self, common):
self.locale = LocaleModel()
root = '/'
if common['opts'].dry_run:
root = os.path.abspath(".subiquity")
self.locale = LocaleModel(common['signal'])
self.keyboard = KeyboardModel(root)
self.installpath = InstallpathModel()
self.network = NetworkModel()
self.filesystem = FilesystemModel(common['prober'])
@ -61,34 +66,35 @@ class SubiquityModel:
config.update(self.installpath.render_cloudinit())
return config
def _write_files_config(self):
def _cloud_init_files(self):
# TODO, this should be moved to the in-target cloud-config seed so on first
# boot of the target, it reconfigures datasource_list to none for subsequent
# boots.
# (mwhudson does not entirely know what the above means!)
userdata = '#cloud-config\n' + yaml.dump(self._cloud_init_config())
metadata = yaml.dump({'instance-id': str(uuid.uuid4())})
return {
'postinst_metadata': {
'path': 'var/lib/cloud/seed/nocloud-net/meta-data',
'content': metadata,
},
'postinst_userdata': {
'path': 'var/lib/cloud/seed/nocloud-net/user-data',
'content': userdata,
},
'postinst_enable_cloudinit': {
'path': 'etc/cloud/ds-identify.cfg',
'content': 'policy: enabled\n',
},
}
return [
('var/lib/cloud/seed/nocloud-net/meta-data', metadata),
('var/lib/cloud/seed/nocloud-net/user-data', userdata),
('etc/cloud/ds-identify.cfg', 'policy: enabled\n'),
]
def render(self, install_step, reporting_url=None):
disk_actions = self.filesystem.render()
if install_step == "postinstall":
for a in disk_actions:
a['preserve'] = True
def configure_cloud_init(self, target):
for path, content in self._cloud_init_files():
path = os.path.join(target, path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(os.path.join(target, path), 'w') as fp:
fp.write(content)
def render(self, target, reporting_url=None):
config = {
'install': {
'target': target,
'unmount': 'disabled',
'save_install_config': '/var/log/installer/curtin-install-cfg.yaml',
'save_install_log': '/var/log/installer/curtin-install.log',
},
'partitioning_commands': {
'builtin': 'curtin block-meta custom',
},
@ -101,14 +107,19 @@ class SubiquityModel:
'storage': {
'version': 1,
'config': disk_actions,
'config': self.filesystem.render(),
},
'write_files': {
'etc_default_keyboard': {
'path': 'etc/default/keyboard',
'content': self.keyboard.config_content,
},
},
}
if install_step == "install":
config.update(self.network.render())
config.update(self.installpath.render())
else:
config['write_files'] = self._write_files_config()
if reporting_url is not None:
config['reporting']['subiquity'] = {

View File

@ -28,4 +28,5 @@ from .lvm import LVMVolumeGroupView # NOQA
from .identity import IdentityView # NOQA
from .installpath import InstallpathView, MAASView # NOQA
from .installprogress import ProgressView # NOQA
from .keyboard import KeyboardView
from .welcome import WelcomeView

View File

@ -0,0 +1,351 @@
# Copyright 2018 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from urwid import (
connect_signal,
LineBox,
Text,
WidgetWrap,
)
from subiquitycore.ui.buttons import ok_btn, other_btn
from subiquitycore.ui.container import (
Columns,
ListBox,
Pile,
)
from subiquitycore.ui.form import (
Form,
FormField,
)
from subiquitycore.ui.selector import Option, Selector
from subiquitycore.ui.utils import button_pile, Color, Padding
from subiquitycore.view import BaseView
from subiquity.ui.views import pc105
log = logging.getLogger("subiquity.ui.views.keyboard")
class AutoDetectBase(WidgetWrap):
def __init__(self, keyboard_detector, step):
# step is an instance of pc105.Step
self.keyboard_detector = keyboard_detector
self.step = step
lb = LineBox(self.make_body(), _("Keyboard auto-detection"))
super().__init__(lb)
def start(self):
pass
def stop(self):
pass
def keypress(self, size, key):
if key == 'esc':
self.keyboard_detector.backup()
else:
return super().keypress(size, key)
class AutoDetectIntro(AutoDetectBase):
def ok(self, sender):
self.keyboard_detector.do_step(0)
def cancel(self, sender):
self.keyboard_detector.abort()
def make_body(self):
return Pile([
Text(_("Keyboard detection starting. You will be asked a series of questions about your keyboard. Press escape at any time to go back to the previous screen.")),
Text(""),
button_pile([
ok_btn(label=_("OK"), on_press=self.ok),
ok_btn(label=_("Cancel"), on_press=self.cancel),
]),
])
class AutoDetectFailed(AutoDetectBase):
def ok(self, sender):
self.keyboard_detector.abort()
def make_body(self):
return Pile([
Text(_("Keyboard auto detection failed, sorry")),
Text(""),
button_pile([ok_btn(label="OK", on_press=self.ok)]),
])
class AutoDetectResult(AutoDetectBase):
preamble = _("""\
Keyboard auto detection completed.
Your keyboard was detected as:
""")
postamble = _("""\
If this is correct, select Done on the next screen. If not you can select \
another layout or run the automated detection again.
""")
def ok(self, sender):
self.keyboard_detector.keyboard_view.found_layout(self.step.result)
def make_body(self):
model = self.keyboard_detector.keyboard_view.model
layout, variant = model.lookup(self.step.result)
var_desc = []
if variant is not None:
var_desc = [Text(_(" Variant: ") + variant)]
return Pile([
Text(self.preamble),
Text(_(" Layout: ") + layout),
] + var_desc + [
Text(self.postamble),
button_pile([ok_btn(label=_("OK"), on_press=self.ok)]),
])
class AutoDetectPressKey(AutoDetectBase):
# This is the tricky case. We need access to the "keycodes" not
# the characters that the current keyboard set up maps the
# keycodes to. The heavy lifting is done by the InputFilter class
# in subiquitycore.core.
def selectable(self):
return True
def make_body(self):
self.error_text = Text("", align="center")
return Pile([
Text(_("Please press one of the following keys:")),
Text(""),
Columns([Text(s, align="center") for s in self.step.symbols], dividechars=1),
Text(""),
Color.info_error(self.error_text),
])
@property
def input_filter(self):
return self.keyboard_detector.keyboard_view.controller.input_filter
def start(self):
self.input_filter.enter_keycodes_mode()
def stop(self):
self.input_filter.exit_keycodes_mode()
def keypress(self, size, key):
log.debug('keypress %r %r', size, key)
if key.startswith('release '):
# Escape is key 1 on all keyboards and all layouts except
# amigas and very old Macs so this seems safe enough.
if key == 'release 1':
return super().keypress(size, 'esc')
else:
return
elif key.startswith('press '):
code = int(key[len('press '):])
if code not in self.step.keycodes:
self.error_text.set_text(_("Input was not recognized, try again"))
return
v = self.step.keycodes[code]
else:
# If we're not on a linux tty, the filtering won't have
# happened and so there's no way to get the keycodes. Do
# something literally random instead.
import random
v = random.choice(list(self.step.keycodes.values()))
self.keyboard_detector.do_step(v)
class AutoDetectKeyPresent(AutoDetectBase):
def yes(self, sender):
self.keyboard_detector.do_step(self.step.yes)
def no(self, sender):
self.keyboard_detector.do_step(self.step.no)
def make_body(self):
return Pile([
Text(_("Is the following key present on your keyboard?")),
Text(""),
Text(self.step.symbol, align="center"),
Text(""),
button_pile([
ok_btn(label=_("Yes"), on_press=self.yes),
other_btn(label=_("No"), on_press=self.no),
]),
])
class Detector:
# Encapsulates the state of the autodetection process.
def __init__(self, kview):
self.keyboard_view = kview
self.pc105tree = pc105.PC105Tree()
self.pc105tree.read_steps()
self.seen_steps = []
def start(self):
o = AutoDetectIntro(self, None)
self.keyboard_view.show_overlay(o)
def abort(self):
overlay = self.keyboard_view._w.top_w
overlay.stop()
self.keyboard_view.remove_overlay()
step_cls_to_view_cls = {
pc105.StepResult: AutoDetectResult,
pc105.StepPressKey: AutoDetectPressKey,
pc105.StepKeyPresent: AutoDetectKeyPresent,
}
def backup(self):
if len(self.seen_steps) == 0:
self.seen_steps = []
self.abort()
return
if len(self.seen_steps) == 1:
self.seen_steps = []
self.abort()
self.start()
return
self.seen_steps.pop()
step_index = self.seen_steps.pop()
self.do_step(step_index)
def do_step(self, step_index):
self.abort()
log.debug("moving to step %s", step_index)
try:
step = self.pc105tree.steps[step_index]
except KeyError:
view = AutoDetectFailed(self, None)
else:
self.seen_steps.append(step_index)
log.debug("step: %s", repr(step))
view = self.step_cls_to_view_cls[type(step)](self, step)
view.start()
self.keyboard_view.show_overlay(view)
class ChoiceField(FormField):
def __init__(self, caption=None, help=None, choices=[]):
super().__init__(caption, help)
self.choices = choices
def _make_widget(self, form):
return Selector(self.choices)
class KeyboardForm(Form):
cancel_label = _("Back")
layout = ChoiceField(_("Layout:"), choices=["dummy"])
variant = ChoiceField(_("Variant:"), choices=["dummy"])
class KeyboardView(BaseView):
def __init__(self, model, controller, opts):
self.model = model
self.controller = controller
self.opts = opts
self.form = KeyboardForm()
opts = []
for layout, desc in model.layouts.items():
opts.append(Option((desc, True, layout)))
opts.sort(key=lambda o:o.label)
connect_signal(self.form, 'submit', self.done)
connect_signal(self.form, 'cancel', self.cancel)
connect_signal(self.form.layout.widget, "select", self.select_layout)
self.form.layout.widget._options = opts
try:
self.form.layout.widget.value = model.layout
self.form.variant.widget.value = model.variant
except AttributeError:
# Don't crash on pre-existing invalid config.
pass
self._rows = self.form.as_rows(self)
lb_contents = [self._rows]
if not self.opts.run_on_serial:
lb_contents.extend([
Text(""),
button_pile([
other_btn(label=_("Identify keyboard"), on_press=self.detect)]),
])
lb = ListBox(lb_contents)
pile = Pile([
('pack', Text("")),
Padding.center_90(lb),
('pack', Pile([
Text(""),
self.form.buttons,
Text(""),
])),
])
lb._select_last_selectable()
pile.focus_position = 2
super().__init__(pile)
def detect(self, sender):
detector = Detector(self)
detector.start()
def found_layout(self, result):
self.remove_overlay()
log.debug("found_layout %s", result)
if ':' in result:
layout, variant = result.split(':')
else:
layout, variant = result, None
self.form.layout.widget.value = layout
self.form.variant.widget.value = variant
self._w.focus_position = 2
def done(self, result):
layout = self.form.layout.widget.value
variant = ''
if self.form.variant.widget.value is not None:
variant = self.form.variant.widget.value
self.controller.done(layout, variant)
def cancel(self, result=None):
self.controller.cancel()
def select_layout(self, sender, layout):
log.debug("%s", layout)
opts = []
for variant, variant_desc in self.model.variants[layout].items():
opts.append(Option((variant_desc, True, variant)))
opts.sort(key=lambda o:o.label)
opts.insert(0, Option(("default", True, None)))
self.form.variant.widget._options = opts
self.form.variant.widget.index = 0
self.form.variant.enabled = len(opts) > 1

133
subiquity/ui/views/pc105.py Normal file
View File

@ -0,0 +1,133 @@
# Copyright 2018 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# The keyboard autodetection process is driven by the data in
# /usr/share/console-setup/pc105.tree. This code parses that data into
# subclasses of Step.
class Step:
def __repr__(self):
kvs = []
for k, v in self.__dict__.items():
kvs.append("%s=%r" % (k, v))
return "%s(%s)" % (self.__class__.__name__, ", ".join(sorted(kvs)))
def check(self):
pass
class StepPressKey(Step):
# "Press one of the following keys"
def __init__(self):
self.symbols = []
self.keycodes = {}
def check(self):
if len(self.symbols) == 0 or len(self.keycodes) == 0:
raise Exception
class StepKeyPresent(Step):
# "Is this symbol present on your keyboard"
def __init__(self, symbol):
self.symbol = symbol
self.yes = None
self.no = None
def check(self):
if self.yes is None or self.no is None:
raise Exception
class StepResult(Step):
# "This is the autodetected layout"
def __init__(self, result):
self.result = result
class PC105Tree:
"""Parses the pc105.tree file into subclasses of Step"""
# This is adapted (quite heavily) from the code in ubiquity.
def __init__(self):
self.steps = {}
def _add_step_from_lines(self, lines):
step = None
step_index = -1
for line in lines:
if line.startswith('STEP '):
step_index = int(line[5:])
elif line.startswith('PRESS '):
# Ask the user to press a character on the keyboard.
if step is None:
step = StepPressKey()
elif not isinstance(step, StepPressKey):
raise Exception
step.symbols.append(line[6:].strip())
elif line.startswith('CODE '):
# Direct the evaluating code to process step ## next if the
# user has pressed a key which returned that keycode.
if not isinstance(step, StepPressKey):
raise Exception
keycode = int(line[5:line.find(' ', 5)])
s = int(line[line.find(' ', 5) + 1:])
step.keycodes[keycode] = s
elif line.startswith('FIND '):
# Ask the user whether that character is present on their
# keyboard.
if step is None:
step = StepKeyPresent(line[5:].strip())
else:
raise Exception
elif line.startswith('FINDP '):
# Equivalent to FIND, except that the user is asked to consider
# only the primary symbols (i.e. Plain and Shift).
if step is None:
step = StepKeyPresent(line[6:].strip())
else:
raise Exception
elif line.startswith('YES '):
# Direct the evaluating code to process step ## next if the
# user does have this key.
if not isinstance(step, StepKeyPresent):
raise Exception
step.yes = int(line[4:].strip())
elif line.startswith('NO '):
# Direct the evaluating code to process step ## next if the
# user does not have this key.
if not isinstance(step, StepKeyPresent):
raise Exception
step.no = int(line[3:].strip())
elif line.startswith('MAP '):
# This step uniquely identifies a keymap.
if step is None:
step = StepResult(line[4:].strip())
else:
raise Exception
else:
raise Exception
if step is None or step_index == -1:
raise Exception
step.check()
self.steps[step_index] = step
def read_steps(self):
cur_step_lines = []
with open('/usr/share/console-setup/pc105.tree') as fp:
for line in fp:
if line.startswith('STEP '):
if cur_step_lines:
self._add_step_from_lines(cur_step_lines)
cur_step_lines = [line]
else:
cur_step_lines.append(line)
if cur_step_lines:
self._add_step_from_lines(cur_step_lines)

View File

@ -36,6 +36,7 @@ class BaseController(ABC):
self.pool = common['pool']
self.base_model = common['base_model']
self.all_answers = common['answers']
self.input_filter = common['input_filter']
def register_signals(self):
"""Defines signals associated with controller from model."""

View File

@ -16,7 +16,11 @@
from concurrent import futures
import fcntl
import logging
import os
import struct
import sys
import termios
import tty
import urwid
import yaml
@ -31,15 +35,23 @@ class ApplicationError(Exception):
""" Basecontroller exception """
pass
# The next little bit is cribbed from
# https://github.com/EvanPurkhiser/linux-vt-setcolors/blob/master/setcolors.c:
# From uapi/linux/kd.h:
KDGKBTYPE = 0x4B33 # get keyboard type
GIO_CMAP = 0x4B70 # gets colour palette on VGA+
PIO_CMAP = 0x4B71 # sets colour palette on VGA+
UO_R, UO_G, UO_B = 0xe9, 0x54, 0x20
# /usr/include/linux/kd.h
K_RAW = 0x00
K_XLATE = 0x01
K_MEDIUMRAW = 0x02
K_UNICODE = 0x03
K_OFF = 0x04
KDGKBMODE = 0x4B44 # gets current keyboard mode
KDSKBMODE = 0x4B45 # sets current keyboard mode
class ISO_8613_3_Screen(urwid.raw_display.Screen):
@ -116,6 +128,92 @@ def setup_screen(colors, styles):
return ISO_8613_3_Screen(_urwid_name_to_rgb), urwid_palette
class KeyCodesFilter:
"""input_filter that can pass (medium) raw keycodes to the application
See http://lct.sourceforge.net/lct/x60.html for terminology.
Call enter_keycodes_mode()/exit_keycodes_mode() to switch into and
out of keycodes mode. In keycodes mode, the only events passed to
the application are "press $N" / "release $N" where $N is the
keycode the user pressed or released.
Much of this is cribbed from the source of the "showkeys" utility.
"""
def __init__(self):
self._fd = os.open("/proc/self/fd/0", os.O_RDWR)
self.filtering = False
def enter_keycodes_mode(self):
log.debug("enter_keycodes_mode")
self.filtering = True
# Read the old keyboard mode (it will proably always be K_UNICODE but well).
o = bytearray(4)
fcntl.ioctl(self._fd, KDGKBMODE, o)
self._old_mode = struct.unpack('i', o)[0]
# Make some changes to the terminal settings.
# If you don't do this, sometimes writes to the terminal hang (and no,
# I don't know exactly why).
self._old_settings = termios.tcgetattr(self._fd)
new_settings = termios.tcgetattr(self._fd)
new_settings[tty.IFLAG] = 0
new_settings[tty.LFLAG] = new_settings[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.ISIG)
new_settings[tty.CC][termios.VMIN] = 0
new_settings[tty.CC][termios.VTIME] = 0
termios.tcsetattr(self._fd, termios.TCSAFLUSH, new_settings)
# Finally, set the keyboard mode to K_MEDIUMRAW, which causes
# the keyboard driver in the kernel to pass us keycodes.
log.debug("old mode was %s, setting mode to %s", self._old_mode, K_MEDIUMRAW)
fcntl.ioctl(self._fd, KDSKBMODE, K_MEDIUMRAW)
def exit_keycodes_mode(self):
log.debug("exit_keycodes_mode")
self.filtering = False
log.debug("setting mode back to %s", self._old_mode)
fcntl.ioctl(self._fd, KDSKBMODE, self._old_mode)
termios.tcsetattr(self._fd, termios.TCSANOW, self._old_settings)
def filter(self, keys, codes):
# Luckily urwid passes us the raw results from read() we can
# turn into keycodes.
if self.filtering:
i = 0
r = []
n = len(codes)
while i < len(codes):
# This is straight from showkeys.c.
if codes[i] & 0x80:
p = 'release '
else:
p = 'press '
if i + 2 < n and (codes[i] & 0x7f) == 0 and (codes[i + 1] & 0x80) != 0 and (codes[i + 2] & 0x80) != 0:
kc = ((codes[i + 1] & 0x7f) << 7) | (codes[i + 2] & 0x7f)
i += 3
else:
kc = codes[i] & 0x7f
i += 1
r.append(p + str(kc))
return r
else:
return keys
class DummyKeycodesFilter:
# A dummy implementation of the same interface as KeyCodesFilter
# we can use when not running in a linux tty.
def enter_keycodes_mode(self):
pass
def exit_keycodes_mode(self):
pass
def filter(self, keys, codes):
return keys
class Application:
# A concrete subclass must set project and controllers attributes, e.g.:
@ -151,6 +249,12 @@ class Application:
if not opts.dry_run:
open('/run/casper-no-prompt', 'w').close()
if is_linux_tty():
log.debug("is_linux_tty")
input_filter = KeyCodesFilter()
else:
input_filter = DummyKeycodesFilter()
self.common = {
"ui": ui,
"opts": opts,
@ -159,6 +263,7 @@ class Application:
"loop": None,
"pool": futures.ThreadPoolExecutor(1),
"answers": answers,
"input_filter": input_filter,
}
if opts.screens:
self.controllers = [c for c in self.controllers if c in opts.screens]
@ -226,12 +331,11 @@ class Application:
self.common['loop'] = urwid.MainLoop(
self.common['ui'], palette=palette, screen=screen,
handle_mouse=False, pop_ups=True)
handle_mouse=False, pop_ups=True, input_filter=self.common['input_filter'].filter)
log.debug("Running event loop: {}".format(
self.common['loop'].event_loop))
self.common['base_model'] = self.model_class(self.common)
try:
self.common['loop'].set_alarm_in(0.05, self.next_screen)
controllers_mod = __import__('%s.controllers' % self.project, None, None, [''])