# Copyright 2015 Canonical, Ltd. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Filesystem Provides storage device selection and additional storage configuration. """ import logging import re from urwid import (WidgetWrap, ListBox, Pile, BoxAdapter, Text, Columns) from subiquity.ui.lists import SimpleList from subiquity.ui.buttons import (done_btn, reset_btn, cancel_btn, menu_btn) from subiquity.ui.utils import Padding, Color from subiquity.ui.interactive import (StringEditor, IntegerEditor, Selector, MountEditor) from subiquity.models.filesystem import (_humanize_size, _dehumanize_size, HUMAN_UNITS) from subiquity.view import ViewPolicy INVALID_PARTITION_SIZE = 'Invalid Partition Size' PARTITION_SIZE_TOO_BIG = 'Requested size too big' PARTITION_ERRORS = [ INVALID_PARTITION_SIZE, PARTITION_SIZE_TOO_BIG, ] log = logging.getLogger('subiquity.ui.filesystem') class DiskInfoView(ViewPolicy): def __init__(self, model, signal, selected_device, hdinfo): log.debug('DiskInfoView: {}'.format(selected_device)) self.model = model self.signal = signal self.selected_device = selected_device hdinfo = hdinfo.split("\n") body = [] for h in hdinfo: body.append(Text(h)) body.append(Padding.center_20(self._build_buttons())) super().__init__(Padding.center_79(SimpleList(body))) def _build_buttons(self): done = done_btn(on_press=self.done) buttons = [ Color.button(done, focus_map='button focus'), ] return Pile(buttons) def keypress(self, size, key): if key in ['tab', 'n', 'N', 'j', 'J']: log.debug('keypress: [{}]'.format(key)) self.signal.emit_signal('filesystem:show-disk-info-next', self.selected_device) return None if key in ['shift tab', 'p', 'P', 'k', 'K']: log.debug('keypress: [{}]'.format(key)) self.signal.emit_signal('filesystem:show-disk-info-prev', self.selected_device) return None return super().keypress(size, key) def done(self, result): ''' Return to FilesystemView ''' self.signal.emit_signal('filesystem:show') def cancel(self, button): self.signal.emit_signal('filesystem:show') class AddFormatView(WidgetWrap): def __init__(self, model, signal, selected_disk): self.model = model self.signal = signal self.selected_disk = selected_disk self.disk_obj = self.model.get_disk(selected_disk) self.mountpoint = MountEditor(caption="", edit_text="/") self.fstype = Selector(opts=self.model.supported_filesystems) body = [ Padding.line_break(""), self._container(), Padding.line_break(""), Padding.center_20(self._build_buttons()) ] format_box = Padding.center_50(ListBox(body)) super().__init__(format_box) def _build_buttons(self): cancel = cancel_btn(on_press=self.cancel) done = done_btn(on_press=self.done) buttons = [ Color.button(done, focus_map='button focus'), Color.button(cancel, focus_map='button focus') ] return Pile(buttons) def _format_edit(self): return Pile(self.fstype.group) def _container(self): total_items = [ Columns( [ ("weight", 0.2, Text("Format", align="right")), ("weight", 0.3, Color.string_input(self._format_edit(), focus_map="string_input focus")) ], dividechars=4 ), Columns( [ ("weight", 0.2, Text("Mount", align="right")), ("weight", 0.3, Color.string_input(self.mountpoint, focus_map="string_input focs")) ], dividechars=4 ) ] return Pile(total_items) def cancel(self, button): self.signal.emit_signal('filesystem:show') def done(self, result): """ format spec { 'format' Str(ext4|btrfs.., 'mountpoint': Str } """ result = { "fstype": self.fstype.value, "mountpoint": self.mountpoint.value } try: valid = self.model.valid_mount(result) except ValueError as e: log.exception('Invalid mount point') self.mountpoint.set_error('Error: {}'.format(str(e))) log.debug('Invalid mountpoint, try again') return log.debug("Add Format Result: {}".format(result)) self.signal.emit_signal('filesystem:finish-add-disk-format', self.disk_obj.devpath, result) class AddPartitionView(WidgetWrap): def __init__(self, model, signal, selected_disk): self.model = model self.signal = signal self.selected_disk = selected_disk self.disk_obj = self.model.get_disk(selected_disk) self.partnum = IntegerEditor( caption="", default=self.disk_obj.lastpartnumber + 1) self.size_str = _humanize_size(self.disk_obj.freespace) self.size = StringEditor( caption="".format(self.size_str)) self.mountpoint = MountEditor(caption="", edit_text="/") self.fstype = Selector(opts=self.model.supported_filesystems) body = [ Columns( [ ("weight", 0.2, Text("Adding partition to {}".format( self.disk_obj.devpath), align="right")), ("weight", 0.3, Text("")) ] ), Padding.line_break(""), self._container(), Padding.line_break(""), Padding.center_20(self._build_buttons()) ] partition_box = Padding.center_50(ListBox(body)) super().__init__(partition_box) def _build_buttons(self): cancel = cancel_btn(on_press=self.cancel) done = done_btn(on_press=self.done) buttons = [ Color.button(done, focus_map='button focus'), Color.button(cancel, focus_map='button focus') ] return Pile(buttons) def _format_edit(self): return Pile(self.fstype.group) def _container(self): total_items = [ Columns( [ ("weight", 0.2, Text("Partition number", align="right")), ("weight", 0.3, Color.string_input(self.partnum, focus_map="string_input focus")) ], dividechars=4 ), Columns( [ ("weight", 0.2, Text("Size (max {})".format(self.size_str), align="right")), ("weight", 0.3, Color.string_input(self.size, focus_map="string_input focus")), ], dividechars=4 ), Columns( [ ("weight", 0.2, Text("Format", align="right")), ("weight", 0.3, Color.string_input(self._format_edit(), focus_map="string_input focus")) ], dividechars=4 ), Columns( [ ("weight", 0.2, Text("Mount", align="right")), ("weight", 0.3, Color.string_input(self.mountpoint, focus_map="string_input focs")) ], dividechars=4 ) ] return Pile(total_items) def cancel(self, button): self.signal.emit_signal('filesystem:show') def done(self, result): """ partition spec { 'partition_number': Int, 'size': Int(M|G), 'format' Str(ext4|btrfs.., 'mountpoint': Str } """ def __get_valid_size(size_str): r = '(\d*)(\d+[\.]?\d*)[{}]*$'.format(''.join(HUMAN_UNITS)) match = re.match(r, size_str) log.debug('valid_size: input:{} match:{}'.format(size_str, match)) if match: return match.group(0) return '' def __append_unit(input_size): ''' examine the input for a unit string. if not present, use the unit string from the displayed maximum size returns: number string with unit size ''' unit_regex = '[{}]$'.format(''.join(HUMAN_UNITS)) input_has_unit = re.findall(unit_regex, input_size) log.debug('input:{} re:{}'.format(input_size, input_has_unit)) if len(input_has_unit) == 0: # input does not have unit string displayed_unit = re.search(unit_regex, self.size_str) log.debug('input:{} re:{}'.format(self.size_str, displayed_unit)) input_size += displayed_unit.group(0) return input_size def __get_size(): log.debug('Getting partition size') log.debug('size.value={} size_str={} freespace={}'.format( self.size.value, self.size_str, self.disk_obj.freespace)) if self.size.value == '' or \ self.size.value == self.size_str: log.debug('Using default value: {}'.format( self.disk_obj.freespace)) return int(self.disk_obj.freespace) else: # 120B 120 valid_size = __get_valid_size(self.size.value) if len(valid_size) == 0: return INVALID_PARTITION_SIZE self.size.value = __append_unit(valid_size) log.debug('dehumanize_size({})'.format(self.size.value)) sz = _dehumanize_size(self.size.value) if sz > self.disk_obj.freespace: log.debug( 'Input size too big for device: ({} > {})'.format( sz, self.disk_obj.freespace)) log.warn('Capping size @ max freespace: {}'.format( self.disk_obj.freespace)) sz = self.disk_obj.freespace return sz result = { "partnum": self.partnum.value, "raw_size": self.size.value, "bytes": __get_size(), "fstype": self.fstype.value, "mountpoint": self.mountpoint.value } # Validate size (bytes) input if result['bytes'] in PARTITION_ERRORS: log.error(result['bytes']) self.size.set_error('ERROR: {}'.format(result['bytes'])) return # Validate mountpoint input valid = False try: valid = self.model.valid_mount(result) except ValueError as e: log.exception('Invalid mount point') self.mountpoint.set_error('Error: {}'.format(str(e))) log.debug("Invalid mountpoint, try again") return log.debug("Add Partition Result: {}".format(result)) self.signal.emit_signal('filesystem:finish-add-disk-partition', self.disk_obj.devpath, result) class DiskPartitionView(WidgetWrap): def __init__(self, model, signal, selected_disk): self.model = model self.signal = signal self.selected_disk = selected_disk self.disk_obj = self.model.get_disk(self.selected_disk) self.body = [ Padding.center_79(self._build_model_inputs()), Padding.line_break(""), Padding.center_79(self._build_menu()), Padding.line_break(""), Padding.center_20(self._build_buttons()), ] super().__init__(ListBox(self.body)) def _build_buttons(self): cancel = cancel_btn(on_press=self.cancel) done = done_btn(on_press=self.done) buttons = [ Color.button(done, focus_map='button focus'), Color.button(cancel, focus_map='button focus') ] return Pile(buttons) def _build_model_inputs(self): partitioned_disks = [] for mnt, size, fstype, path in self.disk_obj.get_fs_table(): mnt = Text(mnt) size = Text("{}".format(_humanize_size(size))) fstype = Text(fstype) if fstype else '-' path = Text(path) if path else '-' partition_column = Columns([ (15, path), size, fstype, mnt ], 4) partitioned_disks.append(partition_column) free_space = _humanize_size(self.disk_obj.freespace) partitioned_disks.append(Columns([ (15, Text("FREE SPACE")), Text(free_space), Text(""), Text("") ], 4)) return BoxAdapter(SimpleList(partitioned_disks, is_selectable=False), height=len(partitioned_disks)) def _build_menu(self): """ Builds the add partition menu with user visible changes to the button depending on if existing partitions exist or not. """ return Pile([self.add_partition_w(), self.create_swap_w(), self.show_disk_info_w()]) def show_disk_info_w(self): """ Runs hdparm against device and displays its output """ text = ("Show disk information") return Color.menu_button( menu_btn( label=text, on_press=self.show_disk_info ), focus_map='menu_button focus') def create_swap_w(self): """ Handles presenting an enabled create swap on entire device button if no partition exists, otherwise it is disabled. """ text = ("Format or create swap on entire " "device (unusual, advanced)") if len(self.disk_obj.partitions) == 0: return Color.menu_button(menu_btn(label=text, on_press=self.create_swap), focus_map='menu_button focus') return Color.info_minor(Text(text)) def add_partition_w(self): """ Handles presenting the add partition widget button depending on if partitions exist already or not. """ text = "Add first GPT partition" if len(self.disk_obj.partitions) > 0: text = "Add partition (max size {})".format( _humanize_size(self.disk_obj.freespace)) return Color.menu_button(menu_btn(label=text, on_press=self.add_partition), focus_map='menu_button focus') def show_disk_info(self, result): self.signal.emit_signal('filesystem:show-disk-information', self.selected_disk) def add_partition(self, result): log.debug('add_partition: result={}'.format(result)) self.signal.emit_signal('filesystem:add-disk-partition', self.selected_disk) def create_swap(self, result): log.debug('create_swap: result={}'.format(result)) self.signal.emit_signal('filesystem:create-swap-entire-device', self.selected_disk) def done(self, result): ''' Return to FilesystemView ''' self.signal.emit_signal('filesystem:show') def cancel(self, button): self.signal.emit_signal('filesystem:show') class FilesystemView(ViewPolicy): def __init__(self, model, signal): log.debug('FileSystemView init start()') self.model = model self.signal = signal self.items = [] self.model.probe_storage() # probe before we complete self.body = [ Padding.center_79(Text("FILE SYSTEM")), Padding.center_79(self._build_partition_list()), Padding.line_break(""), Padding.center_79(Text("AVAILABLE DISKS")), Padding.center_79(self._build_model_inputs()), Padding.line_break(""), Padding.center_79(self._build_menu()), Padding.line_break(""), Padding.center_79(self._build_used_disks()), Padding.center_15(self._build_buttons()), ] super().__init__(ListBox(self.body)) log.debug('FileSystemView init complete()') def _build_used_disks(self): log.debug('FileSystemView: building used disks') pl = [] for disk in self.model.get_used_disk_names(): log.debug('used disk: {}'.format(disk)) pl.append(Text(disk)) if len(pl): return Pile( [Text("USED DISKS"), Padding.line_break("")] + pl + [Padding.line_break("")] ) return Pile(pl) def _build_partition_list(self): log.debug('FileSystemView: building part list') pl = [] if (len(self.model.get_partitions()) == 0 and len(self.model.get_filesystems()) == 0): pl.append(Color.info_minor( Text("No disks or partitions mounted"))) log.debug('FileSystemView: no partitions') return Pile(pl) log.debug('FileSystemView: weve got partitions!') for dev in self.model.devices.values(): for mnt, size, fstype, path in dev.get_fs_table(): mnt = Text(mnt) size = Text("{}".format(_humanize_size(size))) fstype = Text(fstype) if fstype else '-' path = Text(path) if path else '-' partition_column = Columns([ (15, path), size, fstype, mnt ], 4) pl.append(partition_column) log.debug('FileSystemView: build-part-list done') return Pile(pl) def _build_buttons(self): log.debug('FileSystemView: building buttons') buttons = [] # don't enable done botton if we can't install if self.model.installable(): buttons.append( Color.button(done_btn(on_press=self.done), focus_map='button focus')) buttons.append(Color.button(reset_btn(on_press=self.reset), focus_map='button focus')) return Pile(buttons) def _get_percent_free(self, device): ''' return the device free space and percentage of the whole device''' percent = "%d" % ( int((1.0 - (device.usedspace / device.size)) * 100)) free = _humanize_size(device.freespace) rounded = "{}{}".format(int(float(free[:-1])), free[-1]) return (rounded, percent) def _build_model_inputs(self): log.debug('FileSystemView: building model inputs') col_1 = [] col_2 = [] avail_disks = self.model.get_available_disk_names() if len(avail_disks) == 0: return Pile([Color.info_minor(Text("No available disks."))]) for dname in avail_disks: disk = self.model.get_disk_info(dname) device = self.model.get_disk(dname) btn = menu_btn(label=disk.name, on_press=self.show_disk_partition_view) col_1.append( Color.menu_button(btn, focus_map='menu_button focus')) disk_sz = _humanize_size(disk.size) log.debug('device partitions: {}'.format(len(device.partitions))) # if we've consumed some of the device, show # the remaining space and percentage of the whole if len(device.partitions) > 0: free, percent = self._get_percent_free(device) disk_sz = "{} ({}%) free".format(free, percent) col_2.append(Text(disk_sz)) col_1 = BoxAdapter(SimpleList(col_1), height=len(col_1)) col_2 = BoxAdapter(SimpleList(col_2, is_selectable=False), height=len(col_2)) return Columns([(15, col_1), col_2], 2) def _build_menu(self): log.debug('FileSystemView: building menu') opts = [] for opt, sig, _ in self.model.get_menu(): opts.append( Color.menu_button( menu_btn(label=opt, on_press=self.on_fs_menu_press), focus_map='menu_button focus')) return Pile(opts) def on_fs_menu_press(self, result): self.signal.emit_signal( self.model.get_signal_by_name(result.label)) def cancel(self, button): self.signal.emit_signal(self.model.get_previous_signal) def reset(self, button): self.signal.emit_signal('filesystem:show', True) def done(self, button): actions = self.model.get_actions() self.signal.emit_signal('filesystem:finish', False, actions) def show_disk_partition_view(self, partition): self.signal.emit_signal('filesystem:show-disk-partition', partition.label)