From 04426cec15740969c6131b3ac67602a5b71c2d75 Mon Sep 17 00:00:00 2001 From: Michael Hudson-Doyle Date: Fri, 28 Aug 2020 14:10:51 +1200 Subject: [PATCH] make network view use a plain-old-data view of a nic The network view code used to crawl all over the network model object, which isn't really going to work with the upcoming client/server split. So this adds a much better defined interface between the view and controller. --- subiquitycore/controllers/network.py | 277 ++++++++---- subiquitycore/models/network.py | 225 ++++++++-- subiquitycore/ui/views/network.py | 411 +++++++++--------- .../network_configure_manual_interface.py | 168 +++---- .../views/network_configure_wlan_interface.py | 46 +- ...test_network_configure_manual_interface.py | 95 ++-- 6 files changed, 737 insertions(+), 485 deletions(-) diff --git a/subiquitycore/controllers/network.py b/subiquitycore/controllers/network.py index 793f7da6..75e320b3 100644 --- a/subiquitycore/controllers/network.py +++ b/subiquitycore/controllers/network.py @@ -27,8 +27,13 @@ from subiquitycore.context import with_context from subiquitycore.controller import BaseController from subiquitycore.file_util import write_file from subiquitycore.models.network import ( - BondParameters, + BondConfig, + DHCPState, NetDevAction, + NetDevInfo, + StaticConfig, + VLANConfig, + WLANConfig, ) from subiquitycore import netplan from subiquitycore.ui.stretchy import StretchyOverlay @@ -45,24 +50,23 @@ log = logging.getLogger("subiquitycore.controller.network") class SubiquityNetworkEventReceiver(NetworkEventReceiver): - def __init__(self, model): - self.model = model - self.view = None - self.default_route_watchers = [] + def __init__(self, controller): + self.controller = controller + self.model = controller.model self.default_routes = set() - self.dhcp_events = {} def new_link(self, ifindex, link): netdev = self.model.new_link(ifindex, link) - if self.view is not None and netdev is not None: - self.view.new_link(netdev) + if netdev is not None: + self.controller.new_link(netdev) def del_link(self, ifindex): netdev = self.model.del_link(ifindex) if ifindex in self.default_routes: self.default_routes.remove(ifindex) - if self.view is not None and netdev is not None: - self.view.del_link(netdev) + self.controller.update_default_routes(self.default_routes) + if netdev is not None: + self.controller.del_link(netdev) def update_link(self, ifindex): netdev = self.model.update_link(ifindex) @@ -71,14 +75,8 @@ class SubiquityNetworkEventReceiver(NetworkEventReceiver): flags = getattr(netdev.info, "flags", 0) if not (flags & IFF_UP) and ifindex in self.default_routes: self.default_routes.remove(ifindex) - for watcher in self.default_route_watchers: - watcher(self.default_routes) - for v, e in netdev.dhcp_events.items(): - if netdev.dhcp_addresses()[v]: - e.set() - - if self.view is not None: - self.view.update_link(netdev) + self.controller.update_default_routes(self.default_routes) + self.controller.update_link(netdev) def route_change(self, action, data): super().route_change(action, data) @@ -91,17 +89,8 @@ class SubiquityNetworkEventReceiver(NetworkEventReceiver): self.default_routes.add(ifindex) elif action == "DEL" and ifindex in self.default_routes: self.default_routes.remove(ifindex) - for watcher in self.default_route_watchers: - watcher(self.default_routes) log.debug('default routes %s', self.default_routes) - - def add_default_route_watcher(self, watcher): - self.default_route_watchers.append(watcher) - watcher(self.default_routes) - - def remove_default_route_watcher(self, watcher): - if watcher in self.default_route_watchers: - self.default_route_watchers.remove(watcher) + self.controller.update_default_routes(self.default_routes) default_netplan = ''' @@ -157,16 +146,32 @@ class NetworkController(BaseController): self.parse_netplan_configs() self._watching = False - self.network_event_receiver = SubiquityNetworkEventReceiver(self.model) - self.network_event_receiver.add_default_route_watcher( - self.route_watcher) + self.network_event_receiver = SubiquityNetworkEventReceiver(self) def parse_netplan_configs(self): self.model.parse_netplan_configs(self.root) - def route_watcher(self, routes): + def update_default_routes(self, routes): if routes: self.signal.emit_signal('network-change') + if self.view: + self.view.update_default_routes(routes) + + def new_link(self, netdev): + if self.view is not None: + self.view.new_link(netdev.netdev_info()) + + def update_link(self, netdev): + for v, e in netdev.dhcp_events.items(): + if netdev.dhcp_addresses()[v]: + netdev.set_dhcp_state(v, DHCPState.CONFIGURED) + e.set() + if self.view is not None: + self.view.update_link(netdev.netdev_info()) + + def del_link(self, netdev): + if self.view is not None: + self.view.del_link(netdev.netdev_info()) def start(self): self._observer_handles = [] @@ -199,22 +204,6 @@ class NetworkController(BaseController): loop.call_later(0.1, self.start_watching) return self.observer.data_ready(fd) - v = self.ui.body - if isinstance(getattr(v, '_w', None), StretchyOverlay): - if hasattr(v._w.stretchy, 'refresh_model_inputs'): - v._w.stretchy.refresh_model_inputs() - - def start_scan(self, dev): - self.observer.trigger_scan(dev.ifindex) - - def done(self): - log.debug("NetworkController.done next_screen") - self.model.has_network = bool( - self.network_event_receiver.default_routes) - self.app.next_screen() - - def cancel(self): - self.app.prev_screen() def _action_get(self, id): dev_spec = id[0].split() @@ -234,19 +223,21 @@ class NetworkController(BaseController): return dev raise Exception("could not resolve {}".format(id)) - def _action_clean_devices(self, devices): - return [self._action_get(device) for device in devices] + def _action_clean_interfaces(self, devices): + r = [self._action_get(device).name for device in devices] + log.debug("%s", r) + return r def _answers_action(self, action): - from subiquitycore.ui.stretchy import StretchyOverlay log.debug("_answers_action %r", action) if 'obj' in action: - obj = self._action_get(action['obj']) + obj = self._action_get(action['obj']).netdev_info() meth = getattr( self.ui.body, "_action_{}".format(action['action'])) action_obj = getattr(NetDevAction, action['action']) - self.ui.body._action(None, (action_obj, meth), obj) + table = self.ui.body.dev_name_to_table[obj.name] + self.ui.body._action(None, (action_obj, meth), table) yield body = self.ui.body._w if not isinstance(body, StretchyOverlay): @@ -268,9 +259,12 @@ class NetworkController(BaseController): self.ui.body._create_bond() yield body = self.ui.body._w + data = action['data'].copy() + if 'devices' in data: + data['interfaces'] = data.pop('devices') yield from self._enter_form_data( body.stretchy.form, - action['data'], + data, action.get("submit", True)) elif action['action'] == 'done': self.ui.body.done() @@ -296,19 +290,6 @@ class NetworkController(BaseController): log.debug("disabling %s", dev.name) dev.disabled_reason = _("autoconfiguration failed") - def start_ui(self): - if not self.view_shown: - self.update_initial_configs() - self.view = NetworkView(self.model, self) - if not self.view_shown: - self.apply_config(silent=True) - self.view_shown = True - self.network_event_receiver.view = self.view - self.ui.set_body(self.view) - - def end_ui(self): - self.view = self.network_event_receiver.view = None - @property def netplan_path(self): if self.opts.project == "subiquity": @@ -371,11 +352,11 @@ class NetworkController(BaseController): for v in 4, 6: if dev.dhcp_enabled(v): if not silent: - dev.set_dhcp_state(v, "PENDING") + dev.set_dhcp_state(v, DHCPState.PENDING) self.network_event_receiver.update_link( dev.ifindex) else: - dev.set_dhcp_state(v, "RECONFIGURE") + dev.set_dhcp_state(v, DHCPState.RECONFIGURE) dev.dhcp_events[v] = e = asyncio.Event() dhcp_events.add(e) if dev.info is None: @@ -466,28 +447,144 @@ class NetworkController(BaseController): for dev, v in dhcp_device_versions: dev.dhcp_events = {} if not dev.dhcp_addresses()[v]: - dev.set_dhcp_state(v, "TIMEDOUT") + dev.set_dhcp_state(v, DHCPState.TIMED_OUT) self.network_event_receiver.update_link(dev.ifindex) - def add_vlan(self, device, vlan): - return self.model.new_vlan(device, vlan) + def start_ui(self): + if not self.view_shown: + self.update_initial_configs() + netdev_infos = [ + dev.netdev_info() for dev in self.model.get_all_netdevs() + ] + self.view = NetworkView(self, netdev_infos) + if not self.view_shown: + self.apply_config(silent=True) + self.view_shown = True + self.view.update_default_routes( + self.network_event_receiver.default_routes) + self.ui.set_body(self.view) - def add_or_update_bond(self, existing, result): - mode = result['mode'] - params = { - 'mode': mode, - } - if mode in BondParameters.supports_xmit_hash_policy: - params['transmit-hash-policy'] = result['xmit_hash_policy'] - if mode in BondParameters.supports_lacp_rate: - params['lacp-rate'] = result['lacp_rate'] - for device in result['devices']: - device.config = {} - interfaces = [d.name for d in result['devices']] - if existing is None: - return self.model.new_bond(result['name'], interfaces, params) + def end_ui(self): + self.view = None + + def done(self): + log.debug("NetworkController.done next_screen") + self.model.has_network = bool( + self.network_event_receiver.default_routes) + self.app.next_screen() + + def cancel(self): + self.app.prev_screen() + + def set_static_config(self, dev_info: NetDevInfo, ip_version: int, + static_config: StaticConfig) -> None: + setattr(dev_info, 'static' + str(ip_version), static_config) + getattr(dev_info, 'dhcp' + str(ip_version)).enabled = False + + dev = self.model.get_netdev_by_name(dev_info.name) + dev.remove_ip_networks_for_version(ip_version) + dev.config.setdefault('addresses', []).extend(static_config.addresses) + gwkey = 'gateway{v}'.format(v=ip_version) + if static_config.gateway: + dev.config[gwkey] = static_config.gateway else: - existing.config['interfaces'] = interfaces - existing.config['parameters'] = params - existing.name = result['name'] - return existing + dev.config.pop(gwkey, None) + ns = dev.config.setdefault('nameservers', {}) + ns.setdefault('addresses', []).extend(static_config.nameservers) + ns.setdefault('search', []).extend(static_config.searchdomains) + self.apply_config() + + def enable_dhcp(self, dev_info: NetDevInfo, ip_version: int) -> None: + setattr(dev_info, 'static' + str(ip_version), StaticConfig()) + getattr(dev_info, 'dhcp' + str(ip_version)).enabled = True + getattr(dev_info, 'dhcp' + str(ip_version)).state = DHCPState.PENDING + + dev = self.model.get_netdev_by_name(dev_info.name) + dev.remove_ip_networks_for_version(ip_version) + dhcpkey = 'dhcp{v}'.format(v=ip_version) + dev.config[dhcpkey] = True + self.apply_config() + + def disable_network(self, dev_info: NetDevInfo, ip_version: int) -> None: + setattr(dev_info, 'static' + str(ip_version), StaticConfig()) + getattr(dev_info, 'dhcp' + str(ip_version)).enabled = False + + dev = self.model.get_netdev_by_name(dev_info.name) + dev.remove_ip_networks_for_version(ip_version) + + self.apply_config() + + def add_vlan(self, dev_info: NetDevInfo, vlan_config: VLANConfig): + new = self.model.new_vlan(dev_info.name, vlan_config) + dev = self.model.get_netdev_by_name(dev_info.name) + self.update_link(dev) + self.apply_config() + return new.netdev_info() + + def delete_link(self, dev_info: NetDevInfo): + touched_devices = set() + if dev_info.type == "bond": + for device_name in dev_info.bond.interfaces: + interface = self.model.get_netdev_by_name(device_name) + touched_devices.add(interface) + elif dev_info.type == "vlan": + link = self.model.get_netdev_by_name(dev_info.vlan.link) + touched_devices.add(link) + dev_info.has_config = False + + device = self.model.get_netdev_by_name(dev_info.name) + self.del_link(device) + device.config = None + for dev in touched_devices: + self.update_link(dev) + self.apply_config() + + def add_or_update_bond(self, existing_name: NetDevInfo, new_name: str, + new_info: BondConfig) -> None: + get_netdev_by_name = self.model.get_netdev_by_name + touched_devices = set() + for device_name in new_info.interfaces: + device = get_netdev_by_name(device_name) + device.config = {} + touched_devices.add(device) + if existing_name is None: + new_dev = self.model.new_bond(new_name, new_info) + self.new_link(new_dev) + else: + existing = get_netdev_by_name(existing_name) + for interface in existing.config['interfaces']: + touched_devices.add(get_netdev_by_name(interface)) + existing.config.update(new_info.to_config()) + if existing.name != new_name: + config = existing.config + existing.config = None + self.del_link(existing) + existing.config = config + existing.name = new_name + self.new_link(existing) + else: + touched_devices.add(existing) + self.apply_config() + for dev in touched_devices: + self.update_link(dev) + + def get_info_for_netdev(self, dev_info: NetDevInfo) -> str: + device = self.model.get_netdev_by_name(dev_info.name) + if device.info is not None: + return yaml.dump( + device.info.serialize(), default_flow_style=False) + else: + return "Configured but not yet created {type} interface.".format( + type=device.type) + + def set_wlan(self, dev_info: NetDevInfo, wlan: WLANConfig) -> None: + dev_info.wlan = wlan + + device = self.model.get_netdev_by_name(dev_info.name) + device.set_ssid_psk(wlan.ssid, wlan.psk) + self.update_link(device) + + def start_scan(self, dev_info: NetDevInfo) -> None: + device = self.model.get_netdev_by_name(dev_info.name) + self.observer.trigger_scan(device.ifindex) + self.update_link(device) diff --git a/subiquitycore/models/network.py b/subiquitycore/models/network.py index bd22c107..921964f7 100644 --- a/subiquitycore/models/network.py +++ b/subiquitycore/models/network.py @@ -18,6 +18,8 @@ import ipaddress import logging import yaml from socket import AF_INET, AF_INET6 +import attr +from typing import List, Optional from subiquitycore.gettext38 import pgettext from subiquitycore import netplan @@ -50,6 +52,97 @@ class NetDevAction(enum.Enum): return pgettext(type(self).__name__, self.value) +class DHCPState(enum.Enum): + PENDING = enum.auto() + TIMED_OUT = enum.auto() + RECONFIGURE = enum.auto() + CONFIGURED = enum.auto() + + +@attr.s(auto_attribs=True) +class DHCPStatus: + enabled: bool + state: Optional[DHCPState] + addresses: List[str] + + +@attr.s(auto_attribs=True) +class StaticConfig: + addresses: List[str] = attr.Factory(list) + gateway: Optional[str] = None + nameservers: List[str] = attr.Factory(list) + searchdomains: List[str] = attr.Factory(list) + + +@attr.s(auto_attribs=True) +class VLANConfig: + id: int + link: str + + +@attr.s(auto_attribs=True) +class WLANConfig: + ssid: str + psk: str + + +@attr.s(auto_attribs=True) +class WLANStatus: + config: WLANConfig + scan_state: Optional[str] + visible_ssids: List[str] + + +@attr.s(auto_attribs=True) +class BondConfig: + interfaces: List[str] + mode: str + xmit_hash_policy: Optional[str] = None + lacp_rate: Optional[str] = None + + def to_config(self): + mode = self.mode + params = { + 'mode': self.mode, + } + if mode in BondParameters.supports_xmit_hash_policy: + params['transmit-hash-policy'] = self.xmit_hash_policy + if mode in BondParameters.supports_lacp_rate: + params['lacp-rate'] = self.lacp_rate + return { + 'interfaces': self.interfaces, + 'parameters': params, + } + + +@attr.s(auto_attribs=True) +class NetDevInfo: + """All the information about a NetworkDev that the view code needs.""" + name: str + type: str + + is_connected: bool + bond_master: Optional[str] + is_used: bool + disabled_reason: Optional[str] + hwaddr: Optional[str] + vendor: Optional[str] + model: Optional[str] + is_virtual: bool + has_config: bool + + vlan: Optional[VLANConfig] + bond: Optional[BondConfig] + wlan: Optional[WLANConfig] + + dhcp4: DHCPStatus + dhcp6: DHCPStatus + static4: StaticConfig + static6: StaticConfig + + enabled_actions: List[NetDevAction] + + class BondParameters: # Just a place to hang various data about how bonds can be # configured. @@ -103,6 +196,88 @@ class NetworkDev(object): 6: None, } + def netdev_info(self) -> NetDevInfo: + if self.type == 'eth': + is_connected = self.info.is_connected + else: + is_connected = True + bond_master = None + for dev2 in self._model.get_all_netdevs(): + if dev2.type != "bond": + continue + if self.name in dev2.config.get('interfaces', []): + bond_master = dev2.name + break + if self.type == 'bond' and self.config is not None: + params = self.config['parameters'] + bond = BondConfig( + interfaces=self.config['interfaces'], + mode=params['mode'], + xmit_hash_policy=params.get('xmit-hash-policy'), + lacp_rate=params.get('lacp-rate')) + else: + bond = None + if self.type == 'vlan' and self.config is not None: + vlan = VLANConfig(id=self.config['id'], link=self.config['link']) + else: + vlan = None + if self.type == 'wlan': + ssid, psk = self.configured_ssid + wlan = WLANStatus( + config=WLANConfig(ssid=ssid, psk=psk), + scan_state=self.info.wlan['scan_state'], + visible_ssids=self.info.wlan['visible_ssids']) + else: + wlan = None + + dhcp_addresses = self.dhcp_addresses() + configured_addresseses = {4: [], 6: []} + if self.config is not None: + for addr in self.config.get('addresses', []): + configured_addresseses[addr_version(addr)].append(addr) + ns = self.config.get('nameservers', {}) + else: + ns = {} + dhcp_statuses = {} + static_configs = {} + for v in 4, 6: + dhcp_statuses[v] = DHCPStatus( + enabled=self.dhcp_enabled(v), + state=self._dhcp_state[v], + addresses=dhcp_addresses[v]) + if self.config is not None: + gateway = self.config.get('gateway' + str(v)) + else: + gateway = None + static_configs[v] = StaticConfig( + addresses=configured_addresseses[v], + gateway=gateway, + nameservers=ns.get('nameservers', []), + searchdomains=ns.get('search', [])) + return NetDevInfo( + name=self.name, + type=self.type, + is_connected=is_connected, + vlan=vlan, + bond_master=bond_master, + bond=bond, + wlan=wlan, + dhcp4=dhcp_statuses[4], + dhcp6=dhcp_statuses[6], + static4=static_configs[4], + static6=static_configs[6], + is_used=self.is_used, + disabled_reason=self.disabled_reason, + enabled_actions=[ + action for action in NetDevAction + if self.supports_action(action) + ], + hwaddr=getattr(self.info, 'hwaddr', None), + vendor=getattr(self.info, 'vendor', None), + model=getattr(self.info, 'model', None), + is_virtual=self.is_virtual, + has_config=self.config is not None) + def dhcp_addresses(self): r = {4: [], 6: []} if self.info is not None: @@ -140,17 +315,18 @@ class NetworkDev(object): # If a virtual device that already exists is renamed, we need # to create a dummy NetworkDev so that the existing virtual # device is actually deleted when the config is applied. - if new_name != self.name and self.is_virtual and self.info is not None: - if new_name in self.model.devices_by_name: + if new_name != self.name and self.is_virtual: + if new_name in self._model.devices_by_name: raise RuntimeError( "renaming {old_name} over {new_name}".format( old_name=self.name, new_name=new_name)) self._model.devices_by_name[new_name] = self - dead_device = self._model.devices_by_name[self.name] = NetworkDev( - self.name, self.type) - dead_device.config = None - dead_device.info = self.info - self.info = None + if self.info is not None: + dead_device = NetworkDev(self._model, self.name, self.type) + self._model.devices_by_name[self.name] = dead_device + dead_device.config = None + dead_device.info = self.info + self.info = None self._name = new_name def supports_action(self, action): @@ -228,28 +404,6 @@ class NetworkDev(object): else: self.config.pop('addresses', None) - def add_network(self, version, network): - # result = { - # 'network': self.subnet_input.value, - # 'address': self.address_input.value, - # 'gateway': self.gateway_input.value, - # 'nameserver': [nameservers], - # 'searchdomains': [searchdomains], - # } - address = network['address'].split('/')[0] - address += '/' + network['network'].split('/')[1] - self.config.setdefault('addresses', []).append(address) - gwkey = 'gateway{v}'.format(v=version) - if network['gateway']: - self.config[gwkey] = network['gateway'] - else: - self.config.pop(gwkey, None) - ns = self.config.setdefault('nameservers', {}) - if network['nameservers']: - ns.setdefault('addresses', []).extend(network['nameservers']) - if network['searchdomains']: - ns.setdefault('search', []).extend(network['searchdomains']) - class NetworkModel(object): """ """ @@ -318,21 +472,18 @@ class NetworkModel(object): del self.devices_by_name[name] return dev - def new_vlan(self, device, tag): - name = "{name}.{tag}".format(name=device.name, tag=tag) + def new_vlan(self, device_name, tag): + name = "{name}.{tag}".format(name=device_name, tag=tag) dev = self.devices_by_name[name] = NetworkDev(self, name, 'vlan') dev.config = { - 'link': device.name, + 'link': device_name, 'id': tag, } return dev - def new_bond(self, name, interfaces, params): + def new_bond(self, name, bond_config): dev = self.devices_by_name[name] = NetworkDev(self, name, 'bond') - dev.config = { - 'interfaces': interfaces, - 'parameters': params, - } + dev.config = bond_config.to_config() return dev def get_all_netdevs(self, include_deleted=False): diff --git a/subiquitycore/ui/views/network.py b/subiquitycore/ui/views/network.py index 64205bb2..8ac5f7c9 100644 --- a/subiquitycore/ui/views/network.py +++ b/subiquitycore/ui/views/network.py @@ -27,7 +27,7 @@ from urwid import ( ) from subiquitycore.models.network import ( - addr_version, + DHCPState, NetDevAction, ) from subiquitycore.ui.actionmenu import ActionMenu @@ -38,6 +38,7 @@ from subiquitycore.ui.buttons import ( ) from subiquitycore.ui.container import ( Pile, + WidgetWrap, ) from subiquitycore.ui.spinner import Spinner from subiquitycore.ui.stretchy import StretchyOverlay @@ -72,17 +73,151 @@ def _stretchy_shower(cls, *args): return impl +class NetworkDeviceTable(WidgetWrap): + def __init__(self, parent, dev_info): + self.parent = parent + self.dev_info = dev_info + super().__init__(self._create()) + + def _create(self): + # Create the widget for a nic. This consists of a Pile containing a + # table, an info line and a blank line. The first row of the table is + # the one that can be focused and has a menu for manipulating the nic, + # the other rows summarize its address config. + # [ name type notes ▸ ] \ + # address info | <- table + # more address info / + # mac / vendor info / model info + # + + actions = [] + for action in NetDevAction: + meth = getattr(self.parent, '_action_' + action.name) + opens_dialog = getattr(meth, 'opens_dialog', False) + if action in self.dev_info.enabled_actions: + actions.append( + (action.str(), True, (action, meth), opens_dialog)) + + menu = ActionMenu(actions) + connect_signal(menu, 'action', self.parent._action, self) + + trows = [make_action_menu_row([ + Text("["), + Text(self.dev_info.name), + Text(self.dev_info.type), + Text(self._notes(), wrap='clip'), + menu, + Text("]"), + ], menu)] + self._address_rows() + + self.table = TablePile( + trows, colspecs=self.parent.device_colspecs, spacing=2) + self.table.bind(self.parent.heading_table) + + if self.dev_info.type == "vlan": + info = _("VLAN {id} on interface {link}").format( + id=self.dev_info.vlan.id, link=self.dev_info.vlan.link) + elif self.dev_info.type == "bond": + info = _("bond master for {interfaces}").format( + interfaces=', '.join(self.dev_info.bond.interfaces)) + else: + info = " / ".join([ + self.dev_info.hwaddr, + self.dev_info.vendor, + self.dev_info.model, + ]) + + return Pile([ + ('pack', self.table), + ('pack', Color.info_minor(Text(" " + info))), + ('pack', Text("")), + ]) + + def _notes(self): + notes = [] + if self.dev_info.type == "wlan": + config = self.dev_info.wlan.config + if config.ssid is not None: + notes.append(_("ssid: {ssid}".format(ssid=config.ssid))) + else: + notes.append(_("not connected")) + if not self.dev_info.is_connected: + notes.append(_("not connected")) + if self.dev_info.bond_master: + notes.append( + _("enslaved to {device}").format( + device=self.dev_info.bond_master)) + if notes: + notes = ", ".join(notes) + else: + notes = '-' + return notes + + def _address_rows(self): + address_info = [] + for v, dhcp_status, static_config in ( + (4, self.dev_info.dhcp4, self.dev_info.static4), + (6, self.dev_info.dhcp6, self.dev_info.static6), + ): + if dhcp_status.enabled: + label = Text("DHCPv{v}".format(v=v)) + addrs = dhcp_status.addresses + if addrs: + address_info.extend( + [(label, Text(addr)) for addr in addrs]) + elif dhcp_status.state == DHCPState.PENDING: + s = Spinner( + self.parent.controller.app.aio_loop, align='left') + s.rate = 0.3 + s.start() + address_info.append((label, s)) + elif dhcp_status.state == DHCPState.TIMED_OUT: + address_info.append((label, Text(_("timed out")))) + elif dhcp_status.state == DHCPState.RECONFIGURE: + address_info.append((label, Text("-"))) + elif static_config.addresses: + address_info.append(( + Text(_('static')), + Text(', '.join(static_config.addresses)), + )) + if len(address_info) == 0 and not self.dev_info.is_used: + reason = self.dev_info.disabled_reason + if reason is None: + reason = "" + address_info.append((Text(_("disabled")), Text(reason))) + rows = [] + for label, value in address_info: + rows.append(TableRow([Text(""), label, (2, value)])) + return rows + + def update(self, dev_info): + # Update the display of dev to represent the current state. + # + # The easiest way of doing this would be to just create a new table + # widget for the device and replace the current one with it. But that + # is jarring if the menu for the current device is open, so instead we + # overwrite the contents of the first (menu) row of the table, and + # replace all other rows of the with new content (which is OK as they + # cannot be focused). + self.dev_info = dev_info + first_row = self.table.table_rows[0].base_widget + first_row.cells[1][1].set_text(dev_info.name) + first_row.cells[2][1].set_text(dev_info.type) + first_row.cells[3][1].set_text(self._notes()) + self.table.remove_rows(1, len(self.table.table_rows)) + self.table.insert_rows(1, self._address_rows()) + + class NetworkView(BaseView): title = _("Network connections") excerpt = _("Configure at least one interface this server can use to talk " "to other machines, and which preferably provides sufficient " "access for updates.") - def __init__(self, model, controller): - self.model = model + def __init__(self, controller, netdev_infos): self.controller = controller - self.dev_to_table = {} - self.cur_netdevs = [] + self.dev_name_to_table = {} + self.cur_netdev_names = [] self.error = Text("", align='center') self.device_colspecs = { @@ -91,7 +226,19 @@ class NetworkView(BaseView): 4: ColSpec(can_shrink=True, rpad=1), } - self.device_pile = Pile(self._build_model_inputs()) + self.heading_table = TablePile([ + TableRow([ + Color.info_minor(Text(header)) for header in [ + "", "NAME", "TYPE", "NOTES", "", + ] + ]) + ], + spacing=2, colspecs=self.device_colspecs) + + self.device_pile = Pile([self.heading_table]) + + for dev_info in netdev_infos: + self.new_link(dev_info) self._create_bond_btn = menu_btn( _("Create bond"), on_press=self._create_bond) @@ -111,9 +258,6 @@ class NetworkView(BaseView): ('pack', self.buttons), ]) - self.controller.network_event_receiver.add_default_route_watcher( - self._route_watcher) - self.error_showing = False super().__init__(screen( @@ -126,26 +270,27 @@ class NetworkView(BaseView): _action_EDIT_WLAN = _stretchy_shower(NetworkConfigureWLANStretchy) _action_EDIT_IPV4 = _stretchy_shower(EditNetworkStretchy, 4) _action_EDIT_IPV6 = _stretchy_shower(EditNetworkStretchy, 6) - _action_EDIT_BOND = _stretchy_shower(BondStretchy) _action_ADD_VLAN = _stretchy_shower(AddVlanStretchy) - def _action_DELETE(self, name, device): + def _action_EDIT_BOND(self, name, dev_info): + stretchy = BondStretchy( + self, dev_info, self.get_candidate_bond_member_names()) + stretchy.attach_context(self.controller.context.child(name)) + self.show_stretchy_overlay(stretchy) + + _action_EDIT_BOND.opens_dialog = True + + def _action_DELETE(self, name, dev_info): with self.controller.context.child(name): - touched_devs = set() - if device.type == "bond": - for name in device.config['interfaces']: - touched_devs.add(self.model.get_netdev_by_name(name)) - device.config = None - self.del_link(device) - for dev in touched_devs: - self.update_link(dev) - self.controller.apply_config() + self.controller.delete_link(dev_info) + self.del_link(dev_info) - def _action(self, sender, action, device): + def _action(self, sender, action, netdev_table): action, meth = action - meth("{}/{}".format(device.name, action.name), device) + dev_info = netdev_table.dev_info + meth("{}/{}".format(dev_info.name, action.name), dev_info) - def _route_watcher(self, routes): + def update_default_routes(self, routes): log.debug('view route_watcher %s', routes) if routes: label = _("Done") @@ -176,111 +321,31 @@ class NetworkView(BaseView): if len(self.bottom.contents) > 2: self.bottom.contents[0:2] = [] - def _notes_for_device(self, dev): - notes = [] - if dev.type == "eth" and not dev.info.is_connected: - notes.append(_("not connected")) - for dev2 in self.model.get_all_netdevs(): - if dev2.type != "bond": - continue - if dev.name in dev2.config.get('interfaces', []): - notes.append( - _("enslaved to {device}").format(device=dev2.name)) - break - if notes: - notes = ", ".join(notes) - else: - notes = '-' - return notes - - def _address_rows_for_device(self, dev): - address_info = [] - dhcp_addresses = dev.dhcp_addresses() - for v in 4, 6: - if dev.dhcp_enabled(v): - label = Text("DHCPv{v}".format(v=v)) - addrs = dhcp_addresses.get(v) - if addrs: - address_info.extend( - [(label, Text(addr)) for addr in addrs]) - elif dev.dhcp_state(v) == "PENDING": - s = Spinner(self.controller.app.aio_loop, align='left') - s.rate = 0.3 - s.start() - address_info.append((label, s)) - elif dev.dhcp_state(v) == "TIMEDOUT": - address_info.append((label, Text(_("timed out")))) - elif dev.dhcp_state(v) == "RECONFIGURE": - address_info.append((label, Text("-"))) - else: - address_info.append(( - label, - Text( - _("unknown state {state}".format( - state=dev.dhcp_state(v)))) - )) - else: - addrs = [] - for ip in dev.config.get('addresses', []): - if addr_version(ip) == v: - addrs.append(str(ip)) - if addrs: - address_info.append( - # Network addressing mode (static/dhcp/disabled) - (Text(_('static')), Text(', '.join(addrs)))) - if len(address_info) == 0: - # Do not show an interface as disabled if it is part of a bond or - # has a vlan on it. - if not dev.is_used: - reason = dev.disabled_reason - if reason is None: - reason = "" - # Network addressing mode (static/dhcp/disabled) - address_info.append((Text(_("disabled")), Text(reason))) - rows = [] - for label, value in address_info: - rows.append(TableRow([Text(""), label, (2, value)])) - return rows - - def new_link(self, new_dev): + def new_link(self, new_dev_info): log.debug( - "new_link %s %s %s", - new_dev.name, new_dev.ifindex, (new_dev in self.cur_netdevs)) - if new_dev in self.dev_to_table: - self.update_link(new_dev) + "new_link %s %s", + new_dev_info.name, (new_dev_info.name in self.cur_netdev_names)) + if new_dev_info.name in self.dev_name_to_table: + self.update_link(new_dev_info) return - for i, cur_dev in enumerate(self.cur_netdevs): - if cur_dev.name > new_dev.name: - netdev_i = i - break - else: - netdev_i = len(self.cur_netdevs) - w = self._device_widget(new_dev, netdev_i) + self.cur_netdev_names.append(new_dev_info.name) + self.cur_netdev_names.sort() + netdev_i = self.cur_netdev_names.index(new_dev_info.name) + device_table = NetworkDeviceTable(self, new_dev_info) + self.dev_name_to_table[new_dev_info.name] = device_table self.device_pile.contents[netdev_i+1:netdev_i+1] = [ - (w, self.device_pile.options('pack'))] + (device_table, self.device_pile.options('pack'))] - def update_link(self, dev): + def update_link(self, dev_info): + if isinstance(self._w, StretchyOverlay): + if hasattr(self._w.stretchy, 'update_link'): + self._w.stretchy.update_link(dev_info) log.debug( - "update_link %s %s %s", - dev.name, dev.ifindex, (dev in self.cur_netdevs)) - if dev not in self.cur_netdevs: + "update_link %s %s", + dev_info.name, (dev_info.name in self.cur_netdev_names)) + if dev_info.name not in self.cur_netdev_names: return - # Update the display of dev to represent the current state. - # - # The easiest way of doing this would be to just create a new table - # widget for the device and replace the current one with it. But that - # is jarring if the menu for the current device is open, so instead we - # overwrite the content of the first (menu) row of the old table with - # the contents of the first row of the new table, and replace all other - # rows of the old table with new content (which is OK as they cannot be - # focused). - old_table = self.dev_to_table[dev] - first_row = old_table.table_rows[0].base_widget - first_row.cells[1][1].set_text(dev.name) - first_row.cells[2][1].set_text(dev.type) - first_row.cells[3][1].set_text(self._notes_for_device(dev)) - old_table.remove_rows(1, len(old_table.table_rows)) - old_table.insert_rows(1, self._address_rows_for_device(dev)) + self.dev_name_to_table[dev_info.name].update(dev_info) def _remove_row(self, netdev_i): # MonitoredFocusList clamps the focus position to the new @@ -297,94 +362,38 @@ class NetworkView(BaseView): self.device_pile.focus_position += 1 self.device_pile.focus._select_first_selectable() - def del_link(self, dev): + def del_link(self, dev_info): log.debug( - "del_link %s %s %s", - dev.name, dev.ifindex, (dev in self.cur_netdevs)) + "del_link %s %s", + dev_info.name, (dev_info.name in self.cur_netdev_names)) # If a virtual device disappears while we still have config # for it, we assume it will be back soon. - if dev.is_virtual and dev.config is not None: + if dev_info.is_virtual and dev_info.has_config: return - if dev in self.cur_netdevs: - netdev_i = self.cur_netdevs.index(dev) + if dev_info.name in self.cur_netdev_names: + netdev_i = self.cur_netdev_names.index(dev_info.name) self._remove_row(netdev_i+1) - del self.cur_netdevs[netdev_i] - del self.dev_to_table[dev] + del self.cur_netdev_names[netdev_i] + del self.dev_name_to_table[dev_info.name] if isinstance(self._w, StretchyOverlay): stretchy = self._w.stretchy - if getattr(stretchy, 'device', None) is dev: + if getattr(stretchy, 'device', None) is dev_info: self.remove_overlay() - def _device_widget(self, dev, netdev_i=None): - # Create the widget for a nic. This consists of a Pile containing a - # table, an info line and a blank line. The first row of the table is - # the one that can be focused and has a menu for manipulating the nic, - # the other rows summarize its address config. - # [ name type notes ▸ ] \ - # address info | <- table - # more address info / - # mac / vendor info / model info - # - if netdev_i is None: - netdev_i = len(self.cur_netdevs) - self.cur_netdevs[netdev_i:netdev_i] = [dev] - - actions = [] - for action in NetDevAction: - meth = getattr(self, '_action_' + action.name) - opens_dialog = getattr(meth, 'opens_dialog', False) - if dev.supports_action(action): - actions.append( - (action.str(), True, (action, meth), opens_dialog)) - - menu = ActionMenu(actions) - connect_signal(menu, 'action', self._action, dev) - - trows = [make_action_menu_row([ - Text("["), - Text(dev.name), - Text(dev.type), - Text(self._notes_for_device(dev), wrap='clip'), - menu, - Text("]"), - ], menu)] + self._address_rows_for_device(dev) - - table = TablePile(trows, colspecs=self.device_colspecs, spacing=2) - self.dev_to_table[dev] = table - table.bind(self.heading_table) - - if dev.type == "vlan": - info = _("VLAN {id} on interface {link}").format( - **dev.config) - elif dev.type == "bond": - info = _("bond master for {interfaces}").format( - interfaces=', '.join(dev.config['interfaces'])) - else: - info = " / ".join([ - dev.info.hwaddr, dev.info.vendor, dev.info.model]) - - return Pile([ - ('pack', table), - ('pack', Color.info_minor(Text(" " + info))), - ('pack', Text("")), - ]) - - def _build_model_inputs(self): - self.heading_table = TablePile([ - TableRow([ - Color.info_minor(Text(header)) for header in [ - "", "NAME", "TYPE", "NOTES", "", - ] - ]) - ], - spacing=2, colspecs=self.device_colspecs) - rows = [self.heading_table] - for dev in self.model.get_all_netdevs(): - rows.append(self._device_widget(dev)) - return rows + def get_candidate_bond_member_names(self): + names = [] + for table in self.dev_name_to_table.values(): + dev_info = table.dev_info + if dev_info.type in ("vlan", "bond"): + continue + if dev_info.bond_master is not None: + continue + names.append(dev_info.name) + return names def _create_bond(self, sender=None): - stretchy = BondStretchy(self) + stretchy = BondStretchy( + self, None, self.get_candidate_bond_member_names()) stretchy.attach_context(self.controller.context.child("add_bond")) self.show_stretchy_overlay(stretchy) @@ -417,11 +426,7 @@ class NetworkView(BaseView): def done(self, result=None): if self.error_showing: self.bottom.contents[0:2] = [] - self.controller.network_event_receiver.remove_default_route_watcher( - self._route_watcher) self.controller.done() def cancel(self, button=None): - self.controller.network_event_receiver.remove_default_route_watcher( - self._route_watcher) self.controller.cancel() diff --git a/subiquitycore/ui/views/network_configure_manual_interface.py b/subiquitycore/ui/views/network_configure_manual_interface.py index be2d1a4f..2c13edfb 100644 --- a/subiquitycore/ui/views/network_configure_manual_interface.py +++ b/subiquitycore/ui/views/network_configure_manual_interface.py @@ -15,7 +15,6 @@ import logging import ipaddress -import yaml from urwid import ( CheckBox, @@ -25,8 +24,9 @@ from urwid import ( ) from subiquitycore.models.network import ( - addr_version, + BondConfig, BondParameters, + StaticConfig, ) from subiquitycore.ui.container import Pile, WidgetWrap from subiquitycore.ui.form import ( @@ -155,33 +155,29 @@ class NetworkMethodForm(Form): class EditNetworkStretchy(Stretchy): - def __init__(self, parent, device, ip_version): + def __init__(self, parent, dev_info, ip_version): self.parent = parent - self.device = device + self.dev_info = dev_info self.ip_version = ip_version self.method_form = NetworkMethodForm() self.method_form.method.caption = _( "IPv{v} Method: ").format(v=ip_version) manual_initial = {} - cur_addresses = [] - for addr in device.config.get('addresses', []): - if addr_version(addr) == ip_version: - cur_addresses.append(addr) - if cur_addresses: + dhcp_status = getattr(dev_info, 'dhcp' + str(ip_version)) + static_config = getattr(dev_info, 'static' + str(ip_version)) + if static_config.addresses: method = 'manual' - addr = ipaddress.ip_interface(cur_addresses[0]) - ns = device.config.get('nameservers', {}) + addr = ipaddress.ip_interface(static_config.addresses[0]) manual_initial = { 'subnet': str(addr.network), 'address': str(addr.ip), - 'nameservers': ', '.join(ns.get('addresses', [])), - 'searchdomains': ', '.join(ns.get('search', [])), + 'nameservers': ', '.join(static_config.nameservers), + 'searchdomains': ', '.join(static_config.searchdomains), } - gw = device.config.get('gateway{v}'.format(v=ip_version)) - if gw: - manual_initial['gateway'] = str(gw) - elif self.device.config.get('dhcp{v}'.format(v=ip_version)): + if static_config.gateway: + manual_initial['gateway'] = static_config.gateway + elif dhcp_status.enabled: method = 'dhcp' else: method = 'disable' @@ -208,7 +204,7 @@ class EditNetworkStretchy(Stretchy): widgets = [self.form_pile, Text(""), self.bp] super().__init__( "Edit {device} IPv{v} configuration".format( - device=device.name, v=ip_version), + device=dev_info.name, v=ip_version), widgets, 0, 0) @@ -229,9 +225,6 @@ class EditNetworkStretchy(Stretchy): self.form_pile.contents[:] = rows def done(self, sender): - - self.device.remove_ip_networks_for_version(self.ip_version) - if self.method_form.method.value == "manual": form = self.manual_form # XXX this converting from and to and from strings thing is a @@ -239,25 +232,26 @@ class EditNetworkStretchy(Stretchy): gateway = form.gateway.value if gateway is not None: gateway = str(gateway) - result = { - 'network': str(form.subnet.value), - 'address': str(form.address.value), - 'gateway': gateway, - 'nameservers': list(map(str, form.nameservers.value)), - 'searchdomains': form.searchdomains.value, - } + address = str(form.address.value).split('/')[0] + address += '/' + str(form.subnet.value).split('/')[1] + config = StaticConfig( + addresses=[address], + gateway=gateway, + nameservers=list(map(str, form.nameservers.value)), + searchdomains=form.searchdomains.value) log.debug( - "EditNetworkStretchy %s manual result=%s", - self.ip_version, result) - self.device.config.pop('nameservers', None) - self.device.add_network(self.ip_version, result) + "EditNetworkStretchy %s manual config=%s", + self.ip_version, config) + self.parent.controller.set_static_config( + self.dev_info, self.ip_version, config) elif self.method_form.method.value == "dhcp": - log.debug("EditNetworkStretchy %s dhcp", self.ip_version) - self.device.config['dhcp{v}'.format(v=self.ip_version)] = True + self.parent.controller.enable_dhcp(self.dev_info, self.ip_version) + log.debug("EditNetworkStretchy %s, dhcp", self.ip_version) else: + self.parent.controller.disable_network( + self.dev_info, self.ip_version) log.debug("EditNetworkStretchy %s, disabled", self.ip_version) - self.parent.controller.apply_config() - self.parent.update_link(self.device) + self.parent.update_link(self.dev_info) self.parent.remove_overlay() def cancel(self, sender=None): @@ -268,9 +262,9 @@ class VlanForm(Form): ok_label = _("Create") - def __init__(self, parent, device): + def __init__(self, parent, dev_info): self.parent = parent - self.device = device + self.dev_info = dev_info super().__init__() vlan = StringField(_("VLAN ID:")) @@ -286,10 +280,9 @@ class VlanForm(Form): return vlanid def validate_vlan(self): - new_name = '%s.%s' % (self.device.name, self.vlan.value) - if new_name in self.parent.model.devices_by_name: - if self.parent.model.devices_by_name[new_name].config is not None: - return _("{netdev} already exists").format(netdev=new_name) + new_name = '%s.%s' % (self.dev_info.name, self.vlan.value) + if new_name in self.parent.cur_netdev_names: + return _("{netdev} already exists").format(netdev=new_name) class AddVlanStretchy(Stretchy): @@ -310,31 +303,24 @@ class AddVlanStretchy(Stretchy): "AddVlanStretchy.done %s %s", self.device.name, self.form.vlan.value) self.parent.remove_overlay() - dev = self.parent.controller.add_vlan( + dev_info = self.parent.controller.add_vlan( self.device, self.form.vlan.value) - self.parent.new_link(dev) - self.parent.controller.apply_config() + self.parent.new_link(dev_info) def cancel(self, sender=None): self.parent.remove_overlay() class ViewInterfaceInfo(Stretchy): - def __init__(self, parent, device): + def __init__(self, parent, dev_info): self.parent = parent - if device.info is not None: - result = yaml.dump( - device.info.serialize(), default_flow_style=False) - else: - result = "Configured but not yet created {type} interface.".format( - type=device.type) widgets = [ - Text(result), + Text(self.parent.controller.get_info_for_netdev(dev_info)), Text(""), button_pile([done_btn(_("Close"), on_press=self.close)]), ] # {device} is the name of a network device - title = _("Info for {device}").format(device=device.name) + title = _("Info for {device}").format(device=dev_info.name) super().__init__(title, widgets, 0, 2) def close(self, button=None): @@ -351,7 +337,7 @@ class MultiNetdevChooser(WidgetWrap, WantsToKnowFormField): @property def value(self): - return list(sorted(self.selected, key=lambda x: x.name)) + return list(sorted(self.selected)) @value.setter def value(self, value): @@ -362,7 +348,7 @@ class MultiNetdevChooser(WidgetWrap, WantsToKnowFormField): def set_bound_form_field(self, bff): contents = [] for d in bff.form.candidate_netdevs: - box = CheckBox(d.name, on_state_change=self._state_change) + box = CheckBox(d, on_state_change=self._state_change) self.box_to_device[box] = d contents.append((box, self.pile.options('pack'))) self.pile.contents[:] = contents @@ -389,7 +375,7 @@ class BondForm(Form): self._select_level(None, self.mode.value) name = StringField(_("Name:")) - devices = MultiNetdevField(_("Devices: ")) + interfaces = MultiNetdevField(_("Devices: ")) mode = ChoiceField(_("Bond mode:"), choices=BondParameters.modes) xmit_hash_policy = ChoiceField( _("XMIT hash policy:"), choices=BondParameters.xmit_hash_policies) @@ -416,12 +402,11 @@ class BondForm(Form): class BondStretchy(Stretchy): - def __init__(self, parent, existing=None): + def __init__(self, parent, existing_bond_info, candidate_names): self.parent = parent - self.existing = existing - all_netdev_names = { - device.name for device in parent.model.get_all_netdevs()} - if existing is None: + all_netdev_names = set(parent.cur_netdev_names) + if existing_bond_info is None: + self.existing_name = None title = _('Create bond') label = _("Create") x = 0 @@ -431,41 +416,29 @@ class BondStretchy(Stretchy): break x += 1 initial = { - 'devices': set(), + 'interfaces': [], 'name': name, } else: + self.existing_name = existing_bond_info.name + bondconfig = existing_bond_info.bond title = _('Edit bond') label = _("Save") - all_netdev_names.remove(existing.name) - params = existing.config['parameters'] - mode = params['mode'] + all_netdev_names.remove(self.existing_name) + mode = bondconfig.mode initial = { - 'devices': set([ - parent.model.get_netdev_by_name(name) - for name in existing.config['interfaces']]), - 'name': existing.name, + 'interfaces': bondconfig.interfaces, + 'name': self.existing_name, 'mode': mode, } if mode in BondParameters.supports_xmit_hash_policy: - initial['xmit_hash_policy'] = params['transmit-hash-policy'] + initial['xmit_hash_policy'] = bondconfig.xmit_hash_policy if mode in BondParameters.supports_lacp_rate: - initial['lacp_rate'] = params['lacp-rate'] + initial['lacp_rate'] = bondconfig.lacp_rate - def device_ok(device): - if device is existing: - return False - if device in initial['devices']: - return True - if device.type in ("vlan", "bond"): - return False - return not device.is_bond_slave + candidate_names = sorted(candidate_names + initial['interfaces']) - candidate_netdevs = [ - device for device in parent.model.get_all_netdevs() - if device_ok(device)] - - self.form = BondForm(initial, candidate_netdevs, all_netdev_names) + self.form = BondForm(initial, candidate_names, all_netdev_names) self.form.buttons.base_widget[0].set_label(label) connect_signal(self.form, 'submit', self.done) connect_signal(self.form, 'cancel', self.cancel) @@ -475,26 +448,13 @@ class BondStretchy(Stretchy): 0, 0) def done(self, sender): - log.debug("BondStretchy.done result=%s", self.form.as_data()) - touched_devices = set() - get_netdev_by_name = self.parent.model.get_netdev_by_name - if self.existing: - for name in self.existing.config['interfaces']: - touched_devices.add(get_netdev_by_name(name)) - bond = self.existing - self.parent.controller.add_or_update_bond( - self.existing, self.form.as_data()) - self.parent.update_link(bond) - else: - bond = self.parent.controller.add_or_update_bond( - None, self.form.as_data()) - self.parent.new_link(bond) - for name in self.form.devices.value: - touched_devices.add(name) - for dev in touched_devices: - self.parent.update_link(dev) + result = self.form.as_data() + log.debug("BondStretchy.done result=%s", result) + new_name = result.pop('name') + new_config = BondConfig(**result) + self.parent.controller.add_or_update_bond( + self.existing_name, new_name, new_config) self.parent.remove_overlay() - self.parent.controller.apply_config() def cancel(self, sender=None): self.parent.remove_overlay() diff --git a/subiquitycore/ui/views/network_configure_wlan_interface.py b/subiquitycore/ui/views/network_configure_wlan_interface.py index 800b218b..cc876449 100644 --- a/subiquitycore/ui/views/network_configure_wlan_interface.py +++ b/subiquitycore/ui/views/network_configure_wlan_interface.py @@ -7,6 +7,7 @@ from urwid import ( Text, ) +from subiquitycore.models.network import WLANConfig from subiquitycore.ui.buttons import cancel_btn, menu_btn from subiquitycore.ui.container import ( ListBox, @@ -63,22 +64,21 @@ class WLANForm(Form): class NetworkConfigureWLANStretchy(Stretchy): - def __init__(self, parent, device): + def __init__(self, parent, dev_info): self.parent = parent - self.device = device + self.dev_info = dev_info title = _("Network interface {nic} WIFI configuration").format( - nic=device.name) + nic=dev_info.name) self.form = WLANForm() connect_signal(self.form, 'submit', self.done) connect_signal(self.form, 'cancel', self.cancel) - ssid, psk = self.device.configured_ssid - if ssid: - self.form.ssid.value = ssid - if psk: - self.form.psk.value = psk + if self.dev_info.wlan.config.ssid: + self.form.ssid.value = self.dev_info.wlan.config.ssid + if self.dev_info.wlan.config.psk: + self.form.psk.value = self.dev_info.wlan.config.psk self.ssid_row = self.form.ssid._table self.psk_row = self.form.psk._table @@ -96,8 +96,8 @@ class NetworkConfigureWLANStretchy(Stretchy): def show_ssid_list(self, sender): self.parent.show_overlay( - NetworkList( - self, self.device.info.wlan['visible_ssids']), width=60) + NetworkList(self, self.dev_info.wlan.visible_ssids), + width=60) def start_scan(self, sender): fp = self.inputs.focus_position - 1 @@ -105,19 +105,20 @@ class NetworkConfigureWLANStretchy(Stretchy): fp -= 1 self.inputs.focus_position = fp try: - self.parent.controller.start_scan(self.device) + self.parent.controller.start_scan(self.dev_info) except RuntimeError as r: log.exception("start_scan failed") self.error.set_text("%s" % (r,)) def _build_iface_inputs(self): - if len(self.device.info.wlan['visible_ssids']) > 0: + visible_ssids = self.dev_info.wlan.visible_ssids + if len(visible_ssids) > 0: networks_btn = menu_btn("Choose a visible network", on_press=self.show_ssid_list) else: networks_btn = disabled(menu_btn("No visible networks")) - if not self.device.info.wlan['scan_state']: + if not self.dev_info.wlan.scan_state: scan_btn = menu_btn("Scan for networks", on_press=self.start_scan) else: scan_btn = disabled(menu_btn("Scanning for networks")) @@ -136,22 +137,16 @@ class NetworkConfigureWLANStretchy(Stretchy): ] return col - def refresh_model_inputs(self): - try: - self.device = self.parent.model.get_netdev_by_name( - self.device.name) - except KeyError: - # The interface is gone - self.parent.remove_overlay() - return + def update_link(self, dev_info): + self.dev_info = dev_info self.inputs.contents = [(obj, ('pack', None)) for obj in self._build_iface_inputs()] def done(self, sender): - if self.device.configured_ssid[0] is None and self.form.ssid.value: + if self.dev_info.wlan.config.ssid is None and self.form.ssid.value: # Turn DHCP4 on by default when specifying an SSID for # the first time... - self.device.config['dhcp4'] = True + self.parent.controller.enable_dhcp(self.dev_info, 4) if self.form.ssid.value: ssid = self.form.ssid.value else: @@ -160,8 +155,9 @@ class NetworkConfigureWLANStretchy(Stretchy): psk = self.form.psk.value else: psk = None - self.device.set_ssid_psk(ssid, psk) - self.parent.update_link(self.device) + self.parent.controller.set_wlan( + self.dev_info, WLANConfig(ssid=ssid, psk=psk)) + self.parent.update_link(self.dev_info) self.parent.remove_overlay() def cancel(self, sender=None): diff --git a/subiquitycore/ui/views/tests/test_network_configure_manual_interface.py b/subiquitycore/ui/views/tests/test_network_configure_manual_interface.py index de0f18f2..b8e18737 100644 --- a/subiquitycore/ui/views/tests/test_network_configure_manual_interface.py +++ b/subiquitycore/ui/views/tests/test_network_configure_manual_interface.py @@ -1,10 +1,14 @@ +import enum +import typing import unittest from unittest import mock +import attr + import urwid from subiquitycore.controllers.network import NetworkController -from subiquitycore.models.network import NetworkDev +from subiquitycore.models.network import NetDevInfo, StaticConfig from subiquitycore.testing import view_helpers from subiquitycore.ui.views.network_configure_manual_interface import ( EditNetworkStretchy, @@ -22,15 +26,47 @@ valid_data = { } +def create_test_instance(cls, name=(), *, overrides={}): + if '.'.join(name) in overrides: + return overrides['.'.join(name)] + if attr.has(cls): + args = {} + for field in attr.fields(cls): + args[field.name] = create_test_instance( + field.type, name + (field.name,), overrides=overrides) + return cls(**args) + elif hasattr(cls, '__origin__'): + t = cls.__origin__ + if t is typing.Union and type(None) in cls.__args__: + return None + elif t in [list, typing.List]: + return [ + create_test_instance( + cls.__args__[0], name, overrides=overrides), + ] + else: + raise Exception( + "do not understand annotation {} at {}".format( + t, '.'.join(name))) + elif issubclass(cls, enum.Enum): + return next(iter(cls)) + else: + try: + return cls() + except Exception: + raise Exception("instantiating {} failed".format(cls)) + + class TestNetworkConfigureIPv4InterfaceView(unittest.TestCase): - def make_view(self): - device = mock.create_autospec(spec=NetworkDev) - device.config = {} + def make_view(self, dev_info=None): + if dev_info is None: + dev_info = create_test_instance( + NetDevInfo, overrides={'static4.addresses': ['1.2.3.4/5']}) base_view = BaseView(urwid.Text("")) base_view.update_link = lambda device: None base_view.controller = mock.create_autospec(spec=NetworkController) - stretchy = EditNetworkStretchy(base_view, device, 4) + stretchy = EditNetworkStretchy(base_view, dev_info, 4) base_view.show_stretchy_overlay(stretchy) stretchy.method_form.method.value = "manual" return base_view, stretchy @@ -45,10 +81,16 @@ class TestNetworkConfigureIPv4InterfaceView(unittest.TestCase): self.fail("method widget not focus") def test_done_initially_disabled(self): - _, stretchy = self.make_view() + dev_info = create_test_instance( + NetDevInfo, overrides={'static4.addresses': []}) + _, stretchy = self.make_view(dev_info) self.assertFalse(stretchy.manual_form.done_btn.enabled) def test_done_enabled_for_valid_data(self): + valid_data = { + 'subnet': '10.0.2.0/24', + 'address': '10.0.2.15', + } _, stretchy = self.make_view() view_helpers.enter_data(stretchy.manual_form, valid_data) self.assertTrue(stretchy.manual_form.done_btn.enabled) @@ -56,20 +98,29 @@ class TestNetworkConfigureIPv4InterfaceView(unittest.TestCase): def test_click_done(self): # The ugliness of this test is probably an indication that the # view is doing too much... - view, stretchy = self.make_view() + dev_info = create_test_instance( + NetDevInfo, overrides={'static4.addresses': []}) + view, stretchy = self.make_view(dev_info) + valid_data = { + 'subnet': '10.0.2.0/24', + 'address': '10.0.2.15', + 'gateway': '10.0.2.2', + 'nameservers': '8.8.8.8, 1.1.1.1', + 'searchdomains': '.custom, .zzz', + } view_helpers.enter_data(stretchy.manual_form, valid_data) - expected = valid_data.copy() - expected['nameservers'] = [expected['nameservers']] - expected['searchdomains'] = [expected['searchdomains']] - expected['network'] = expected.pop('subnet') + expected = StaticConfig( + addresses=['10.0.2.15/24'], + gateway='10.0.2.2', + nameservers=['8.8.8.8', '1.1.1.1'], + searchdomains=['.custom', '.zzz']) but = view_helpers.find_button_matching(view, "^Save$") view_helpers.click(but) - rinfv = stretchy.device.remove_ip_networks_for_version - rinfv.assert_called_once_with(4) - stretchy.device.add_network.assert_called_once_with(4, expected) + view.controller.set_static_config.assert_called_once_with( + stretchy.dev_info, 4, expected) class FakeLink: @@ -80,12 +131,11 @@ class FakeLink: class TestViewInterfaceInfo(unittest.TestCase): def make_view(self, *, info): - device = mock.create_autospec(spec=NetworkDev) - device.config = {} - device.info = info - device.type = "vlan" + dev_info = create_test_instance(NetDevInfo) base_view = BaseView(urwid.Text("")) - stretchy = ViewInterfaceInfo(base_view, device) + base_view.controller = mock.Mock() + base_view.controller.get_info_for_netdev.return_value = "INFO" + stretchy = ViewInterfaceInfo(base_view, dev_info) base_view.show_stretchy_overlay(stretchy) return base_view, stretchy @@ -94,10 +144,3 @@ class TestViewInterfaceInfo(unittest.TestCase): text = view_helpers.find_with_pred( view, lambda w: isinstance(w, urwid.Text) and "INFO" in w.text) self.assertNotEqual(text, None) - - def test_view_virtual(self): - view, stretchy = self.make_view(info=None) - text = view_helpers.find_with_pred( - view, lambda w: isinstance( - w, urwid.Text) and "Configured but not yet created" in w.text) - self.assertNotEqual(text, None)