make network view use a plain-old-data view of a nic

The network view code used to crawl all over the network model object,
which isn't really going to work with the upcoming client/server split.
So this adds a much better defined interface between the view and
controller.
This commit is contained in:
Michael Hudson-Doyle 2020-08-28 14:10:51 +12:00
parent 207f46a00b
commit 04426cec15
6 changed files with 737 additions and 485 deletions

View File

@ -27,8 +27,13 @@ from subiquitycore.context import with_context
from subiquitycore.controller import BaseController from subiquitycore.controller import BaseController
from subiquitycore.file_util import write_file from subiquitycore.file_util import write_file
from subiquitycore.models.network import ( from subiquitycore.models.network import (
BondParameters, BondConfig,
DHCPState,
NetDevAction, NetDevAction,
NetDevInfo,
StaticConfig,
VLANConfig,
WLANConfig,
) )
from subiquitycore import netplan from subiquitycore import netplan
from subiquitycore.ui.stretchy import StretchyOverlay from subiquitycore.ui.stretchy import StretchyOverlay
@ -45,24 +50,23 @@ log = logging.getLogger("subiquitycore.controller.network")
class SubiquityNetworkEventReceiver(NetworkEventReceiver): class SubiquityNetworkEventReceiver(NetworkEventReceiver):
def __init__(self, model): def __init__(self, controller):
self.model = model self.controller = controller
self.view = None self.model = controller.model
self.default_route_watchers = []
self.default_routes = set() self.default_routes = set()
self.dhcp_events = {}
def new_link(self, ifindex, link): def new_link(self, ifindex, link):
netdev = self.model.new_link(ifindex, link) netdev = self.model.new_link(ifindex, link)
if self.view is not None and netdev is not None: if netdev is not None:
self.view.new_link(netdev) self.controller.new_link(netdev)
def del_link(self, ifindex): def del_link(self, ifindex):
netdev = self.model.del_link(ifindex) netdev = self.model.del_link(ifindex)
if ifindex in self.default_routes: if ifindex in self.default_routes:
self.default_routes.remove(ifindex) self.default_routes.remove(ifindex)
if self.view is not None and netdev is not None: self.controller.update_default_routes(self.default_routes)
self.view.del_link(netdev) if netdev is not None:
self.controller.del_link(netdev)
def update_link(self, ifindex): def update_link(self, ifindex):
netdev = self.model.update_link(ifindex) netdev = self.model.update_link(ifindex)
@ -71,14 +75,8 @@ class SubiquityNetworkEventReceiver(NetworkEventReceiver):
flags = getattr(netdev.info, "flags", 0) flags = getattr(netdev.info, "flags", 0)
if not (flags & IFF_UP) and ifindex in self.default_routes: if not (flags & IFF_UP) and ifindex in self.default_routes:
self.default_routes.remove(ifindex) self.default_routes.remove(ifindex)
for watcher in self.default_route_watchers: self.controller.update_default_routes(self.default_routes)
watcher(self.default_routes) self.controller.update_link(netdev)
for v, e in netdev.dhcp_events.items():
if netdev.dhcp_addresses()[v]:
e.set()
if self.view is not None:
self.view.update_link(netdev)
def route_change(self, action, data): def route_change(self, action, data):
super().route_change(action, data) super().route_change(action, data)
@ -91,17 +89,8 @@ class SubiquityNetworkEventReceiver(NetworkEventReceiver):
self.default_routes.add(ifindex) self.default_routes.add(ifindex)
elif action == "DEL" and ifindex in self.default_routes: elif action == "DEL" and ifindex in self.default_routes:
self.default_routes.remove(ifindex) self.default_routes.remove(ifindex)
for watcher in self.default_route_watchers:
watcher(self.default_routes)
log.debug('default routes %s', self.default_routes) log.debug('default routes %s', self.default_routes)
self.controller.update_default_routes(self.default_routes)
def add_default_route_watcher(self, watcher):
self.default_route_watchers.append(watcher)
watcher(self.default_routes)
def remove_default_route_watcher(self, watcher):
if watcher in self.default_route_watchers:
self.default_route_watchers.remove(watcher)
default_netplan = ''' default_netplan = '''
@ -157,16 +146,32 @@ class NetworkController(BaseController):
self.parse_netplan_configs() self.parse_netplan_configs()
self._watching = False self._watching = False
self.network_event_receiver = SubiquityNetworkEventReceiver(self.model) self.network_event_receiver = SubiquityNetworkEventReceiver(self)
self.network_event_receiver.add_default_route_watcher(
self.route_watcher)
def parse_netplan_configs(self): def parse_netplan_configs(self):
self.model.parse_netplan_configs(self.root) self.model.parse_netplan_configs(self.root)
def route_watcher(self, routes): def update_default_routes(self, routes):
if routes: if routes:
self.signal.emit_signal('network-change') self.signal.emit_signal('network-change')
if self.view:
self.view.update_default_routes(routes)
def new_link(self, netdev):
if self.view is not None:
self.view.new_link(netdev.netdev_info())
def update_link(self, netdev):
for v, e in netdev.dhcp_events.items():
if netdev.dhcp_addresses()[v]:
netdev.set_dhcp_state(v, DHCPState.CONFIGURED)
e.set()
if self.view is not None:
self.view.update_link(netdev.netdev_info())
def del_link(self, netdev):
if self.view is not None:
self.view.del_link(netdev.netdev_info())
def start(self): def start(self):
self._observer_handles = [] self._observer_handles = []
@ -199,22 +204,6 @@ class NetworkController(BaseController):
loop.call_later(0.1, self.start_watching) loop.call_later(0.1, self.start_watching)
return return
self.observer.data_ready(fd) self.observer.data_ready(fd)
v = self.ui.body
if isinstance(getattr(v, '_w', None), StretchyOverlay):
if hasattr(v._w.stretchy, 'refresh_model_inputs'):
v._w.stretchy.refresh_model_inputs()
def start_scan(self, dev):
self.observer.trigger_scan(dev.ifindex)
def done(self):
log.debug("NetworkController.done next_screen")
self.model.has_network = bool(
self.network_event_receiver.default_routes)
self.app.next_screen()
def cancel(self):
self.app.prev_screen()
def _action_get(self, id): def _action_get(self, id):
dev_spec = id[0].split() dev_spec = id[0].split()
@ -234,19 +223,21 @@ class NetworkController(BaseController):
return dev return dev
raise Exception("could not resolve {}".format(id)) raise Exception("could not resolve {}".format(id))
def _action_clean_devices(self, devices): def _action_clean_interfaces(self, devices):
return [self._action_get(device) for device in devices] r = [self._action_get(device).name for device in devices]
log.debug("%s", r)
return r
def _answers_action(self, action): def _answers_action(self, action):
from subiquitycore.ui.stretchy import StretchyOverlay
log.debug("_answers_action %r", action) log.debug("_answers_action %r", action)
if 'obj' in action: if 'obj' in action:
obj = self._action_get(action['obj']) obj = self._action_get(action['obj']).netdev_info()
meth = getattr( meth = getattr(
self.ui.body, self.ui.body,
"_action_{}".format(action['action'])) "_action_{}".format(action['action']))
action_obj = getattr(NetDevAction, action['action']) action_obj = getattr(NetDevAction, action['action'])
self.ui.body._action(None, (action_obj, meth), obj) table = self.ui.body.dev_name_to_table[obj.name]
self.ui.body._action(None, (action_obj, meth), table)
yield yield
body = self.ui.body._w body = self.ui.body._w
if not isinstance(body, StretchyOverlay): if not isinstance(body, StretchyOverlay):
@ -268,9 +259,12 @@ class NetworkController(BaseController):
self.ui.body._create_bond() self.ui.body._create_bond()
yield yield
body = self.ui.body._w body = self.ui.body._w
data = action['data'].copy()
if 'devices' in data:
data['interfaces'] = data.pop('devices')
yield from self._enter_form_data( yield from self._enter_form_data(
body.stretchy.form, body.stretchy.form,
action['data'], data,
action.get("submit", True)) action.get("submit", True))
elif action['action'] == 'done': elif action['action'] == 'done':
self.ui.body.done() self.ui.body.done()
@ -296,19 +290,6 @@ class NetworkController(BaseController):
log.debug("disabling %s", dev.name) log.debug("disabling %s", dev.name)
dev.disabled_reason = _("autoconfiguration failed") dev.disabled_reason = _("autoconfiguration failed")
def start_ui(self):
if not self.view_shown:
self.update_initial_configs()
self.view = NetworkView(self.model, self)
if not self.view_shown:
self.apply_config(silent=True)
self.view_shown = True
self.network_event_receiver.view = self.view
self.ui.set_body(self.view)
def end_ui(self):
self.view = self.network_event_receiver.view = None
@property @property
def netplan_path(self): def netplan_path(self):
if self.opts.project == "subiquity": if self.opts.project == "subiquity":
@ -371,11 +352,11 @@ class NetworkController(BaseController):
for v in 4, 6: for v in 4, 6:
if dev.dhcp_enabled(v): if dev.dhcp_enabled(v):
if not silent: if not silent:
dev.set_dhcp_state(v, "PENDING") dev.set_dhcp_state(v, DHCPState.PENDING)
self.network_event_receiver.update_link( self.network_event_receiver.update_link(
dev.ifindex) dev.ifindex)
else: else:
dev.set_dhcp_state(v, "RECONFIGURE") dev.set_dhcp_state(v, DHCPState.RECONFIGURE)
dev.dhcp_events[v] = e = asyncio.Event() dev.dhcp_events[v] = e = asyncio.Event()
dhcp_events.add(e) dhcp_events.add(e)
if dev.info is None: if dev.info is None:
@ -466,28 +447,144 @@ class NetworkController(BaseController):
for dev, v in dhcp_device_versions: for dev, v in dhcp_device_versions:
dev.dhcp_events = {} dev.dhcp_events = {}
if not dev.dhcp_addresses()[v]: if not dev.dhcp_addresses()[v]:
dev.set_dhcp_state(v, "TIMEDOUT") dev.set_dhcp_state(v, DHCPState.TIMED_OUT)
self.network_event_receiver.update_link(dev.ifindex) self.network_event_receiver.update_link(dev.ifindex)
def add_vlan(self, device, vlan): def start_ui(self):
return self.model.new_vlan(device, vlan) if not self.view_shown:
self.update_initial_configs()
netdev_infos = [
dev.netdev_info() for dev in self.model.get_all_netdevs()
]
self.view = NetworkView(self, netdev_infos)
if not self.view_shown:
self.apply_config(silent=True)
self.view_shown = True
self.view.update_default_routes(
self.network_event_receiver.default_routes)
self.ui.set_body(self.view)
def add_or_update_bond(self, existing, result): def end_ui(self):
mode = result['mode'] self.view = None
params = {
'mode': mode, def done(self):
} log.debug("NetworkController.done next_screen")
if mode in BondParameters.supports_xmit_hash_policy: self.model.has_network = bool(
params['transmit-hash-policy'] = result['xmit_hash_policy'] self.network_event_receiver.default_routes)
if mode in BondParameters.supports_lacp_rate: self.app.next_screen()
params['lacp-rate'] = result['lacp_rate']
for device in result['devices']: def cancel(self):
device.config = {} self.app.prev_screen()
interfaces = [d.name for d in result['devices']]
if existing is None: def set_static_config(self, dev_info: NetDevInfo, ip_version: int,
return self.model.new_bond(result['name'], interfaces, params) static_config: StaticConfig) -> None:
setattr(dev_info, 'static' + str(ip_version), static_config)
getattr(dev_info, 'dhcp' + str(ip_version)).enabled = False
dev = self.model.get_netdev_by_name(dev_info.name)
dev.remove_ip_networks_for_version(ip_version)
dev.config.setdefault('addresses', []).extend(static_config.addresses)
gwkey = 'gateway{v}'.format(v=ip_version)
if static_config.gateway:
dev.config[gwkey] = static_config.gateway
else: else:
existing.config['interfaces'] = interfaces dev.config.pop(gwkey, None)
existing.config['parameters'] = params ns = dev.config.setdefault('nameservers', {})
existing.name = result['name'] ns.setdefault('addresses', []).extend(static_config.nameservers)
return existing ns.setdefault('search', []).extend(static_config.searchdomains)
self.apply_config()
def enable_dhcp(self, dev_info: NetDevInfo, ip_version: int) -> None:
setattr(dev_info, 'static' + str(ip_version), StaticConfig())
getattr(dev_info, 'dhcp' + str(ip_version)).enabled = True
getattr(dev_info, 'dhcp' + str(ip_version)).state = DHCPState.PENDING
dev = self.model.get_netdev_by_name(dev_info.name)
dev.remove_ip_networks_for_version(ip_version)
dhcpkey = 'dhcp{v}'.format(v=ip_version)
dev.config[dhcpkey] = True
self.apply_config()
def disable_network(self, dev_info: NetDevInfo, ip_version: int) -> None:
setattr(dev_info, 'static' + str(ip_version), StaticConfig())
getattr(dev_info, 'dhcp' + str(ip_version)).enabled = False
dev = self.model.get_netdev_by_name(dev_info.name)
dev.remove_ip_networks_for_version(ip_version)
self.apply_config()
def add_vlan(self, dev_info: NetDevInfo, vlan_config: VLANConfig):
new = self.model.new_vlan(dev_info.name, vlan_config)
dev = self.model.get_netdev_by_name(dev_info.name)
self.update_link(dev)
self.apply_config()
return new.netdev_info()
def delete_link(self, dev_info: NetDevInfo):
touched_devices = set()
if dev_info.type == "bond":
for device_name in dev_info.bond.interfaces:
interface = self.model.get_netdev_by_name(device_name)
touched_devices.add(interface)
elif dev_info.type == "vlan":
link = self.model.get_netdev_by_name(dev_info.vlan.link)
touched_devices.add(link)
dev_info.has_config = False
device = self.model.get_netdev_by_name(dev_info.name)
self.del_link(device)
device.config = None
for dev in touched_devices:
self.update_link(dev)
self.apply_config()
def add_or_update_bond(self, existing_name: NetDevInfo, new_name: str,
new_info: BondConfig) -> None:
get_netdev_by_name = self.model.get_netdev_by_name
touched_devices = set()
for device_name in new_info.interfaces:
device = get_netdev_by_name(device_name)
device.config = {}
touched_devices.add(device)
if existing_name is None:
new_dev = self.model.new_bond(new_name, new_info)
self.new_link(new_dev)
else:
existing = get_netdev_by_name(existing_name)
for interface in existing.config['interfaces']:
touched_devices.add(get_netdev_by_name(interface))
existing.config.update(new_info.to_config())
if existing.name != new_name:
config = existing.config
existing.config = None
self.del_link(existing)
existing.config = config
existing.name = new_name
self.new_link(existing)
else:
touched_devices.add(existing)
self.apply_config()
for dev in touched_devices:
self.update_link(dev)
def get_info_for_netdev(self, dev_info: NetDevInfo) -> str:
device = self.model.get_netdev_by_name(dev_info.name)
if device.info is not None:
return yaml.dump(
device.info.serialize(), default_flow_style=False)
else:
return "Configured but not yet created {type} interface.".format(
type=device.type)
def set_wlan(self, dev_info: NetDevInfo, wlan: WLANConfig) -> None:
dev_info.wlan = wlan
device = self.model.get_netdev_by_name(dev_info.name)
device.set_ssid_psk(wlan.ssid, wlan.psk)
self.update_link(device)
def start_scan(self, dev_info: NetDevInfo) -> None:
device = self.model.get_netdev_by_name(dev_info.name)
self.observer.trigger_scan(device.ifindex)
self.update_link(device)

View File

@ -18,6 +18,8 @@ import ipaddress
import logging import logging
import yaml import yaml
from socket import AF_INET, AF_INET6 from socket import AF_INET, AF_INET6
import attr
from typing import List, Optional
from subiquitycore.gettext38 import pgettext from subiquitycore.gettext38 import pgettext
from subiquitycore import netplan from subiquitycore import netplan
@ -50,6 +52,97 @@ class NetDevAction(enum.Enum):
return pgettext(type(self).__name__, self.value) return pgettext(type(self).__name__, self.value)
class DHCPState(enum.Enum):
PENDING = enum.auto()
TIMED_OUT = enum.auto()
RECONFIGURE = enum.auto()
CONFIGURED = enum.auto()
@attr.s(auto_attribs=True)
class DHCPStatus:
enabled: bool
state: Optional[DHCPState]
addresses: List[str]
@attr.s(auto_attribs=True)
class StaticConfig:
addresses: List[str] = attr.Factory(list)
gateway: Optional[str] = None
nameservers: List[str] = attr.Factory(list)
searchdomains: List[str] = attr.Factory(list)
@attr.s(auto_attribs=True)
class VLANConfig:
id: int
link: str
@attr.s(auto_attribs=True)
class WLANConfig:
ssid: str
psk: str
@attr.s(auto_attribs=True)
class WLANStatus:
config: WLANConfig
scan_state: Optional[str]
visible_ssids: List[str]
@attr.s(auto_attribs=True)
class BondConfig:
interfaces: List[str]
mode: str
xmit_hash_policy: Optional[str] = None
lacp_rate: Optional[str] = None
def to_config(self):
mode = self.mode
params = {
'mode': self.mode,
}
if mode in BondParameters.supports_xmit_hash_policy:
params['transmit-hash-policy'] = self.xmit_hash_policy
if mode in BondParameters.supports_lacp_rate:
params['lacp-rate'] = self.lacp_rate
return {
'interfaces': self.interfaces,
'parameters': params,
}
@attr.s(auto_attribs=True)
class NetDevInfo:
"""All the information about a NetworkDev that the view code needs."""
name: str
type: str
is_connected: bool
bond_master: Optional[str]
is_used: bool
disabled_reason: Optional[str]
hwaddr: Optional[str]
vendor: Optional[str]
model: Optional[str]
is_virtual: bool
has_config: bool
vlan: Optional[VLANConfig]
bond: Optional[BondConfig]
wlan: Optional[WLANConfig]
dhcp4: DHCPStatus
dhcp6: DHCPStatus
static4: StaticConfig
static6: StaticConfig
enabled_actions: List[NetDevAction]
class BondParameters: class BondParameters:
# Just a place to hang various data about how bonds can be # Just a place to hang various data about how bonds can be
# configured. # configured.
@ -103,6 +196,88 @@ class NetworkDev(object):
6: None, 6: None,
} }
def netdev_info(self) -> NetDevInfo:
if self.type == 'eth':
is_connected = self.info.is_connected
else:
is_connected = True
bond_master = None
for dev2 in self._model.get_all_netdevs():
if dev2.type != "bond":
continue
if self.name in dev2.config.get('interfaces', []):
bond_master = dev2.name
break
if self.type == 'bond' and self.config is not None:
params = self.config['parameters']
bond = BondConfig(
interfaces=self.config['interfaces'],
mode=params['mode'],
xmit_hash_policy=params.get('xmit-hash-policy'),
lacp_rate=params.get('lacp-rate'))
else:
bond = None
if self.type == 'vlan' and self.config is not None:
vlan = VLANConfig(id=self.config['id'], link=self.config['link'])
else:
vlan = None
if self.type == 'wlan':
ssid, psk = self.configured_ssid
wlan = WLANStatus(
config=WLANConfig(ssid=ssid, psk=psk),
scan_state=self.info.wlan['scan_state'],
visible_ssids=self.info.wlan['visible_ssids'])
else:
wlan = None
dhcp_addresses = self.dhcp_addresses()
configured_addresseses = {4: [], 6: []}
if self.config is not None:
for addr in self.config.get('addresses', []):
configured_addresseses[addr_version(addr)].append(addr)
ns = self.config.get('nameservers', {})
else:
ns = {}
dhcp_statuses = {}
static_configs = {}
for v in 4, 6:
dhcp_statuses[v] = DHCPStatus(
enabled=self.dhcp_enabled(v),
state=self._dhcp_state[v],
addresses=dhcp_addresses[v])
if self.config is not None:
gateway = self.config.get('gateway' + str(v))
else:
gateway = None
static_configs[v] = StaticConfig(
addresses=configured_addresseses[v],
gateway=gateway,
nameservers=ns.get('nameservers', []),
searchdomains=ns.get('search', []))
return NetDevInfo(
name=self.name,
type=self.type,
is_connected=is_connected,
vlan=vlan,
bond_master=bond_master,
bond=bond,
wlan=wlan,
dhcp4=dhcp_statuses[4],
dhcp6=dhcp_statuses[6],
static4=static_configs[4],
static6=static_configs[6],
is_used=self.is_used,
disabled_reason=self.disabled_reason,
enabled_actions=[
action for action in NetDevAction
if self.supports_action(action)
],
hwaddr=getattr(self.info, 'hwaddr', None),
vendor=getattr(self.info, 'vendor', None),
model=getattr(self.info, 'model', None),
is_virtual=self.is_virtual,
has_config=self.config is not None)
def dhcp_addresses(self): def dhcp_addresses(self):
r = {4: [], 6: []} r = {4: [], 6: []}
if self.info is not None: if self.info is not None:
@ -140,17 +315,18 @@ class NetworkDev(object):
# If a virtual device that already exists is renamed, we need # If a virtual device that already exists is renamed, we need
# to create a dummy NetworkDev so that the existing virtual # to create a dummy NetworkDev so that the existing virtual
# device is actually deleted when the config is applied. # device is actually deleted when the config is applied.
if new_name != self.name and self.is_virtual and self.info is not None: if new_name != self.name and self.is_virtual:
if new_name in self.model.devices_by_name: if new_name in self._model.devices_by_name:
raise RuntimeError( raise RuntimeError(
"renaming {old_name} over {new_name}".format( "renaming {old_name} over {new_name}".format(
old_name=self.name, new_name=new_name)) old_name=self.name, new_name=new_name))
self._model.devices_by_name[new_name] = self self._model.devices_by_name[new_name] = self
dead_device = self._model.devices_by_name[self.name] = NetworkDev( if self.info is not None:
self.name, self.type) dead_device = NetworkDev(self._model, self.name, self.type)
dead_device.config = None self._model.devices_by_name[self.name] = dead_device
dead_device.info = self.info dead_device.config = None
self.info = None dead_device.info = self.info
self.info = None
self._name = new_name self._name = new_name
def supports_action(self, action): def supports_action(self, action):
@ -228,28 +404,6 @@ class NetworkDev(object):
else: else:
self.config.pop('addresses', None) self.config.pop('addresses', None)
def add_network(self, version, network):
# result = {
# 'network': self.subnet_input.value,
# 'address': self.address_input.value,
# 'gateway': self.gateway_input.value,
# 'nameserver': [nameservers],
# 'searchdomains': [searchdomains],
# }
address = network['address'].split('/')[0]
address += '/' + network['network'].split('/')[1]
self.config.setdefault('addresses', []).append(address)
gwkey = 'gateway{v}'.format(v=version)
if network['gateway']:
self.config[gwkey] = network['gateway']
else:
self.config.pop(gwkey, None)
ns = self.config.setdefault('nameservers', {})
if network['nameservers']:
ns.setdefault('addresses', []).extend(network['nameservers'])
if network['searchdomains']:
ns.setdefault('search', []).extend(network['searchdomains'])
class NetworkModel(object): class NetworkModel(object):
""" """ """ """
@ -318,21 +472,18 @@ class NetworkModel(object):
del self.devices_by_name[name] del self.devices_by_name[name]
return dev return dev
def new_vlan(self, device, tag): def new_vlan(self, device_name, tag):
name = "{name}.{tag}".format(name=device.name, tag=tag) name = "{name}.{tag}".format(name=device_name, tag=tag)
dev = self.devices_by_name[name] = NetworkDev(self, name, 'vlan') dev = self.devices_by_name[name] = NetworkDev(self, name, 'vlan')
dev.config = { dev.config = {
'link': device.name, 'link': device_name,
'id': tag, 'id': tag,
} }
return dev return dev
def new_bond(self, name, interfaces, params): def new_bond(self, name, bond_config):
dev = self.devices_by_name[name] = NetworkDev(self, name, 'bond') dev = self.devices_by_name[name] = NetworkDev(self, name, 'bond')
dev.config = { dev.config = bond_config.to_config()
'interfaces': interfaces,
'parameters': params,
}
return dev return dev
def get_all_netdevs(self, include_deleted=False): def get_all_netdevs(self, include_deleted=False):

View File

@ -27,7 +27,7 @@ from urwid import (
) )
from subiquitycore.models.network import ( from subiquitycore.models.network import (
addr_version, DHCPState,
NetDevAction, NetDevAction,
) )
from subiquitycore.ui.actionmenu import ActionMenu from subiquitycore.ui.actionmenu import ActionMenu
@ -38,6 +38,7 @@ from subiquitycore.ui.buttons import (
) )
from subiquitycore.ui.container import ( from subiquitycore.ui.container import (
Pile, Pile,
WidgetWrap,
) )
from subiquitycore.ui.spinner import Spinner from subiquitycore.ui.spinner import Spinner
from subiquitycore.ui.stretchy import StretchyOverlay from subiquitycore.ui.stretchy import StretchyOverlay
@ -72,17 +73,151 @@ def _stretchy_shower(cls, *args):
return impl return impl
class NetworkDeviceTable(WidgetWrap):
def __init__(self, parent, dev_info):
self.parent = parent
self.dev_info = dev_info
super().__init__(self._create())
def _create(self):
# Create the widget for a nic. This consists of a Pile containing a
# table, an info line and a blank line. The first row of the table is
# the one that can be focused and has a menu for manipulating the nic,
# the other rows summarize its address config.
# [ name type notes ▸ ] \
# address info | <- table
# more address info /
# mac / vendor info / model info
# <blank line>
actions = []
for action in NetDevAction:
meth = getattr(self.parent, '_action_' + action.name)
opens_dialog = getattr(meth, 'opens_dialog', False)
if action in self.dev_info.enabled_actions:
actions.append(
(action.str(), True, (action, meth), opens_dialog))
menu = ActionMenu(actions)
connect_signal(menu, 'action', self.parent._action, self)
trows = [make_action_menu_row([
Text("["),
Text(self.dev_info.name),
Text(self.dev_info.type),
Text(self._notes(), wrap='clip'),
menu,
Text("]"),
], menu)] + self._address_rows()
self.table = TablePile(
trows, colspecs=self.parent.device_colspecs, spacing=2)
self.table.bind(self.parent.heading_table)
if self.dev_info.type == "vlan":
info = _("VLAN {id} on interface {link}").format(
id=self.dev_info.vlan.id, link=self.dev_info.vlan.link)
elif self.dev_info.type == "bond":
info = _("bond master for {interfaces}").format(
interfaces=', '.join(self.dev_info.bond.interfaces))
else:
info = " / ".join([
self.dev_info.hwaddr,
self.dev_info.vendor,
self.dev_info.model,
])
return Pile([
('pack', self.table),
('pack', Color.info_minor(Text(" " + info))),
('pack', Text("")),
])
def _notes(self):
notes = []
if self.dev_info.type == "wlan":
config = self.dev_info.wlan.config
if config.ssid is not None:
notes.append(_("ssid: {ssid}".format(ssid=config.ssid)))
else:
notes.append(_("not connected"))
if not self.dev_info.is_connected:
notes.append(_("not connected"))
if self.dev_info.bond_master:
notes.append(
_("enslaved to {device}").format(
device=self.dev_info.bond_master))
if notes:
notes = ", ".join(notes)
else:
notes = '-'
return notes
def _address_rows(self):
address_info = []
for v, dhcp_status, static_config in (
(4, self.dev_info.dhcp4, self.dev_info.static4),
(6, self.dev_info.dhcp6, self.dev_info.static6),
):
if dhcp_status.enabled:
label = Text("DHCPv{v}".format(v=v))
addrs = dhcp_status.addresses
if addrs:
address_info.extend(
[(label, Text(addr)) for addr in addrs])
elif dhcp_status.state == DHCPState.PENDING:
s = Spinner(
self.parent.controller.app.aio_loop, align='left')
s.rate = 0.3
s.start()
address_info.append((label, s))
elif dhcp_status.state == DHCPState.TIMED_OUT:
address_info.append((label, Text(_("timed out"))))
elif dhcp_status.state == DHCPState.RECONFIGURE:
address_info.append((label, Text("-")))
elif static_config.addresses:
address_info.append((
Text(_('static')),
Text(', '.join(static_config.addresses)),
))
if len(address_info) == 0 and not self.dev_info.is_used:
reason = self.dev_info.disabled_reason
if reason is None:
reason = ""
address_info.append((Text(_("disabled")), Text(reason)))
rows = []
for label, value in address_info:
rows.append(TableRow([Text(""), label, (2, value)]))
return rows
def update(self, dev_info):
# Update the display of dev to represent the current state.
#
# The easiest way of doing this would be to just create a new table
# widget for the device and replace the current one with it. But that
# is jarring if the menu for the current device is open, so instead we
# overwrite the contents of the first (menu) row of the table, and
# replace all other rows of the with new content (which is OK as they
# cannot be focused).
self.dev_info = dev_info
first_row = self.table.table_rows[0].base_widget
first_row.cells[1][1].set_text(dev_info.name)
first_row.cells[2][1].set_text(dev_info.type)
first_row.cells[3][1].set_text(self._notes())
self.table.remove_rows(1, len(self.table.table_rows))
self.table.insert_rows(1, self._address_rows())
class NetworkView(BaseView): class NetworkView(BaseView):
title = _("Network connections") title = _("Network connections")
excerpt = _("Configure at least one interface this server can use to talk " excerpt = _("Configure at least one interface this server can use to talk "
"to other machines, and which preferably provides sufficient " "to other machines, and which preferably provides sufficient "
"access for updates.") "access for updates.")
def __init__(self, model, controller): def __init__(self, controller, netdev_infos):
self.model = model
self.controller = controller self.controller = controller
self.dev_to_table = {} self.dev_name_to_table = {}
self.cur_netdevs = [] self.cur_netdev_names = []
self.error = Text("", align='center') self.error = Text("", align='center')
self.device_colspecs = { self.device_colspecs = {
@ -91,7 +226,19 @@ class NetworkView(BaseView):
4: ColSpec(can_shrink=True, rpad=1), 4: ColSpec(can_shrink=True, rpad=1),
} }
self.device_pile = Pile(self._build_model_inputs()) self.heading_table = TablePile([
TableRow([
Color.info_minor(Text(header)) for header in [
"", "NAME", "TYPE", "NOTES", "",
]
])
],
spacing=2, colspecs=self.device_colspecs)
self.device_pile = Pile([self.heading_table])
for dev_info in netdev_infos:
self.new_link(dev_info)
self._create_bond_btn = menu_btn( self._create_bond_btn = menu_btn(
_("Create bond"), on_press=self._create_bond) _("Create bond"), on_press=self._create_bond)
@ -111,9 +258,6 @@ class NetworkView(BaseView):
('pack', self.buttons), ('pack', self.buttons),
]) ])
self.controller.network_event_receiver.add_default_route_watcher(
self._route_watcher)
self.error_showing = False self.error_showing = False
super().__init__(screen( super().__init__(screen(
@ -126,26 +270,27 @@ class NetworkView(BaseView):
_action_EDIT_WLAN = _stretchy_shower(NetworkConfigureWLANStretchy) _action_EDIT_WLAN = _stretchy_shower(NetworkConfigureWLANStretchy)
_action_EDIT_IPV4 = _stretchy_shower(EditNetworkStretchy, 4) _action_EDIT_IPV4 = _stretchy_shower(EditNetworkStretchy, 4)
_action_EDIT_IPV6 = _stretchy_shower(EditNetworkStretchy, 6) _action_EDIT_IPV6 = _stretchy_shower(EditNetworkStretchy, 6)
_action_EDIT_BOND = _stretchy_shower(BondStretchy)
_action_ADD_VLAN = _stretchy_shower(AddVlanStretchy) _action_ADD_VLAN = _stretchy_shower(AddVlanStretchy)
def _action_DELETE(self, name, device): def _action_EDIT_BOND(self, name, dev_info):
stretchy = BondStretchy(
self, dev_info, self.get_candidate_bond_member_names())
stretchy.attach_context(self.controller.context.child(name))
self.show_stretchy_overlay(stretchy)
_action_EDIT_BOND.opens_dialog = True
def _action_DELETE(self, name, dev_info):
with self.controller.context.child(name): with self.controller.context.child(name):
touched_devs = set() self.controller.delete_link(dev_info)
if device.type == "bond": self.del_link(dev_info)
for name in device.config['interfaces']:
touched_devs.add(self.model.get_netdev_by_name(name))
device.config = None
self.del_link(device)
for dev in touched_devs:
self.update_link(dev)
self.controller.apply_config()
def _action(self, sender, action, device): def _action(self, sender, action, netdev_table):
action, meth = action action, meth = action
meth("{}/{}".format(device.name, action.name), device) dev_info = netdev_table.dev_info
meth("{}/{}".format(dev_info.name, action.name), dev_info)
def _route_watcher(self, routes): def update_default_routes(self, routes):
log.debug('view route_watcher %s', routes) log.debug('view route_watcher %s', routes)
if routes: if routes:
label = _("Done") label = _("Done")
@ -176,111 +321,31 @@ class NetworkView(BaseView):
if len(self.bottom.contents) > 2: if len(self.bottom.contents) > 2:
self.bottom.contents[0:2] = [] self.bottom.contents[0:2] = []
def _notes_for_device(self, dev): def new_link(self, new_dev_info):
notes = []
if dev.type == "eth" and not dev.info.is_connected:
notes.append(_("not connected"))
for dev2 in self.model.get_all_netdevs():
if dev2.type != "bond":
continue
if dev.name in dev2.config.get('interfaces', []):
notes.append(
_("enslaved to {device}").format(device=dev2.name))
break
if notes:
notes = ", ".join(notes)
else:
notes = '-'
return notes
def _address_rows_for_device(self, dev):
address_info = []
dhcp_addresses = dev.dhcp_addresses()
for v in 4, 6:
if dev.dhcp_enabled(v):
label = Text("DHCPv{v}".format(v=v))
addrs = dhcp_addresses.get(v)
if addrs:
address_info.extend(
[(label, Text(addr)) for addr in addrs])
elif dev.dhcp_state(v) == "PENDING":
s = Spinner(self.controller.app.aio_loop, align='left')
s.rate = 0.3
s.start()
address_info.append((label, s))
elif dev.dhcp_state(v) == "TIMEDOUT":
address_info.append((label, Text(_("timed out"))))
elif dev.dhcp_state(v) == "RECONFIGURE":
address_info.append((label, Text("-")))
else:
address_info.append((
label,
Text(
_("unknown state {state}".format(
state=dev.dhcp_state(v))))
))
else:
addrs = []
for ip in dev.config.get('addresses', []):
if addr_version(ip) == v:
addrs.append(str(ip))
if addrs:
address_info.append(
# Network addressing mode (static/dhcp/disabled)
(Text(_('static')), Text(', '.join(addrs))))
if len(address_info) == 0:
# Do not show an interface as disabled if it is part of a bond or
# has a vlan on it.
if not dev.is_used:
reason = dev.disabled_reason
if reason is None:
reason = ""
# Network addressing mode (static/dhcp/disabled)
address_info.append((Text(_("disabled")), Text(reason)))
rows = []
for label, value in address_info:
rows.append(TableRow([Text(""), label, (2, value)]))
return rows
def new_link(self, new_dev):
log.debug( log.debug(
"new_link %s %s %s", "new_link %s %s",
new_dev.name, new_dev.ifindex, (new_dev in self.cur_netdevs)) new_dev_info.name, (new_dev_info.name in self.cur_netdev_names))
if new_dev in self.dev_to_table: if new_dev_info.name in self.dev_name_to_table:
self.update_link(new_dev) self.update_link(new_dev_info)
return return
for i, cur_dev in enumerate(self.cur_netdevs): self.cur_netdev_names.append(new_dev_info.name)
if cur_dev.name > new_dev.name: self.cur_netdev_names.sort()
netdev_i = i netdev_i = self.cur_netdev_names.index(new_dev_info.name)
break device_table = NetworkDeviceTable(self, new_dev_info)
else: self.dev_name_to_table[new_dev_info.name] = device_table
netdev_i = len(self.cur_netdevs)
w = self._device_widget(new_dev, netdev_i)
self.device_pile.contents[netdev_i+1:netdev_i+1] = [ self.device_pile.contents[netdev_i+1:netdev_i+1] = [
(w, self.device_pile.options('pack'))] (device_table, self.device_pile.options('pack'))]
def update_link(self, dev): def update_link(self, dev_info):
if isinstance(self._w, StretchyOverlay):
if hasattr(self._w.stretchy, 'update_link'):
self._w.stretchy.update_link(dev_info)
log.debug( log.debug(
"update_link %s %s %s", "update_link %s %s",
dev.name, dev.ifindex, (dev in self.cur_netdevs)) dev_info.name, (dev_info.name in self.cur_netdev_names))
if dev not in self.cur_netdevs: if dev_info.name not in self.cur_netdev_names:
return return
# Update the display of dev to represent the current state. self.dev_name_to_table[dev_info.name].update(dev_info)
#
# The easiest way of doing this would be to just create a new table
# widget for the device and replace the current one with it. But that
# is jarring if the menu for the current device is open, so instead we
# overwrite the content of the first (menu) row of the old table with
# the contents of the first row of the new table, and replace all other
# rows of the old table with new content (which is OK as they cannot be
# focused).
old_table = self.dev_to_table[dev]
first_row = old_table.table_rows[0].base_widget
first_row.cells[1][1].set_text(dev.name)
first_row.cells[2][1].set_text(dev.type)
first_row.cells[3][1].set_text(self._notes_for_device(dev))
old_table.remove_rows(1, len(old_table.table_rows))
old_table.insert_rows(1, self._address_rows_for_device(dev))
def _remove_row(self, netdev_i): def _remove_row(self, netdev_i):
# MonitoredFocusList clamps the focus position to the new # MonitoredFocusList clamps the focus position to the new
@ -297,94 +362,38 @@ class NetworkView(BaseView):
self.device_pile.focus_position += 1 self.device_pile.focus_position += 1
self.device_pile.focus._select_first_selectable() self.device_pile.focus._select_first_selectable()
def del_link(self, dev): def del_link(self, dev_info):
log.debug( log.debug(
"del_link %s %s %s", "del_link %s %s",
dev.name, dev.ifindex, (dev in self.cur_netdevs)) dev_info.name, (dev_info.name in self.cur_netdev_names))
# If a virtual device disappears while we still have config # If a virtual device disappears while we still have config
# for it, we assume it will be back soon. # for it, we assume it will be back soon.
if dev.is_virtual and dev.config is not None: if dev_info.is_virtual and dev_info.has_config:
return return
if dev in self.cur_netdevs: if dev_info.name in self.cur_netdev_names:
netdev_i = self.cur_netdevs.index(dev) netdev_i = self.cur_netdev_names.index(dev_info.name)
self._remove_row(netdev_i+1) self._remove_row(netdev_i+1)
del self.cur_netdevs[netdev_i] del self.cur_netdev_names[netdev_i]
del self.dev_to_table[dev] del self.dev_name_to_table[dev_info.name]
if isinstance(self._w, StretchyOverlay): if isinstance(self._w, StretchyOverlay):
stretchy = self._w.stretchy stretchy = self._w.stretchy
if getattr(stretchy, 'device', None) is dev: if getattr(stretchy, 'device', None) is dev_info:
self.remove_overlay() self.remove_overlay()
def _device_widget(self, dev, netdev_i=None): def get_candidate_bond_member_names(self):
# Create the widget for a nic. This consists of a Pile containing a names = []
# table, an info line and a blank line. The first row of the table is for table in self.dev_name_to_table.values():
# the one that can be focused and has a menu for manipulating the nic, dev_info = table.dev_info
# the other rows summarize its address config. if dev_info.type in ("vlan", "bond"):
# [ name type notes ▸ ] \ continue
# address info | <- table if dev_info.bond_master is not None:
# more address info / continue
# mac / vendor info / model info names.append(dev_info.name)
# <blank line> return names
if netdev_i is None:
netdev_i = len(self.cur_netdevs)
self.cur_netdevs[netdev_i:netdev_i] = [dev]
actions = []
for action in NetDevAction:
meth = getattr(self, '_action_' + action.name)
opens_dialog = getattr(meth, 'opens_dialog', False)
if dev.supports_action(action):
actions.append(
(action.str(), True, (action, meth), opens_dialog))
menu = ActionMenu(actions)
connect_signal(menu, 'action', self._action, dev)
trows = [make_action_menu_row([
Text("["),
Text(dev.name),
Text(dev.type),
Text(self._notes_for_device(dev), wrap='clip'),
menu,
Text("]"),
], menu)] + self._address_rows_for_device(dev)
table = TablePile(trows, colspecs=self.device_colspecs, spacing=2)
self.dev_to_table[dev] = table
table.bind(self.heading_table)
if dev.type == "vlan":
info = _("VLAN {id} on interface {link}").format(
**dev.config)
elif dev.type == "bond":
info = _("bond master for {interfaces}").format(
interfaces=', '.join(dev.config['interfaces']))
else:
info = " / ".join([
dev.info.hwaddr, dev.info.vendor, dev.info.model])
return Pile([
('pack', table),
('pack', Color.info_minor(Text(" " + info))),
('pack', Text("")),
])
def _build_model_inputs(self):
self.heading_table = TablePile([
TableRow([
Color.info_minor(Text(header)) for header in [
"", "NAME", "TYPE", "NOTES", "",
]
])
],
spacing=2, colspecs=self.device_colspecs)
rows = [self.heading_table]
for dev in self.model.get_all_netdevs():
rows.append(self._device_widget(dev))
return rows
def _create_bond(self, sender=None): def _create_bond(self, sender=None):
stretchy = BondStretchy(self) stretchy = BondStretchy(
self, None, self.get_candidate_bond_member_names())
stretchy.attach_context(self.controller.context.child("add_bond")) stretchy.attach_context(self.controller.context.child("add_bond"))
self.show_stretchy_overlay(stretchy) self.show_stretchy_overlay(stretchy)
@ -417,11 +426,7 @@ class NetworkView(BaseView):
def done(self, result=None): def done(self, result=None):
if self.error_showing: if self.error_showing:
self.bottom.contents[0:2] = [] self.bottom.contents[0:2] = []
self.controller.network_event_receiver.remove_default_route_watcher(
self._route_watcher)
self.controller.done() self.controller.done()
def cancel(self, button=None): def cancel(self, button=None):
self.controller.network_event_receiver.remove_default_route_watcher(
self._route_watcher)
self.controller.cancel() self.controller.cancel()

View File

@ -15,7 +15,6 @@
import logging import logging
import ipaddress import ipaddress
import yaml
from urwid import ( from urwid import (
CheckBox, CheckBox,
@ -25,8 +24,9 @@ from urwid import (
) )
from subiquitycore.models.network import ( from subiquitycore.models.network import (
addr_version, BondConfig,
BondParameters, BondParameters,
StaticConfig,
) )
from subiquitycore.ui.container import Pile, WidgetWrap from subiquitycore.ui.container import Pile, WidgetWrap
from subiquitycore.ui.form import ( from subiquitycore.ui.form import (
@ -155,33 +155,29 @@ class NetworkMethodForm(Form):
class EditNetworkStretchy(Stretchy): class EditNetworkStretchy(Stretchy):
def __init__(self, parent, device, ip_version): def __init__(self, parent, dev_info, ip_version):
self.parent = parent self.parent = parent
self.device = device self.dev_info = dev_info
self.ip_version = ip_version self.ip_version = ip_version
self.method_form = NetworkMethodForm() self.method_form = NetworkMethodForm()
self.method_form.method.caption = _( self.method_form.method.caption = _(
"IPv{v} Method: ").format(v=ip_version) "IPv{v} Method: ").format(v=ip_version)
manual_initial = {} manual_initial = {}
cur_addresses = [] dhcp_status = getattr(dev_info, 'dhcp' + str(ip_version))
for addr in device.config.get('addresses', []): static_config = getattr(dev_info, 'static' + str(ip_version))
if addr_version(addr) == ip_version: if static_config.addresses:
cur_addresses.append(addr)
if cur_addresses:
method = 'manual' method = 'manual'
addr = ipaddress.ip_interface(cur_addresses[0]) addr = ipaddress.ip_interface(static_config.addresses[0])
ns = device.config.get('nameservers', {})
manual_initial = { manual_initial = {
'subnet': str(addr.network), 'subnet': str(addr.network),
'address': str(addr.ip), 'address': str(addr.ip),
'nameservers': ', '.join(ns.get('addresses', [])), 'nameservers': ', '.join(static_config.nameservers),
'searchdomains': ', '.join(ns.get('search', [])), 'searchdomains': ', '.join(static_config.searchdomains),
} }
gw = device.config.get('gateway{v}'.format(v=ip_version)) if static_config.gateway:
if gw: manual_initial['gateway'] = static_config.gateway
manual_initial['gateway'] = str(gw) elif dhcp_status.enabled:
elif self.device.config.get('dhcp{v}'.format(v=ip_version)):
method = 'dhcp' method = 'dhcp'
else: else:
method = 'disable' method = 'disable'
@ -208,7 +204,7 @@ class EditNetworkStretchy(Stretchy):
widgets = [self.form_pile, Text(""), self.bp] widgets = [self.form_pile, Text(""), self.bp]
super().__init__( super().__init__(
"Edit {device} IPv{v} configuration".format( "Edit {device} IPv{v} configuration".format(
device=device.name, v=ip_version), device=dev_info.name, v=ip_version),
widgets, widgets,
0, 0) 0, 0)
@ -229,9 +225,6 @@ class EditNetworkStretchy(Stretchy):
self.form_pile.contents[:] = rows self.form_pile.contents[:] = rows
def done(self, sender): def done(self, sender):
self.device.remove_ip_networks_for_version(self.ip_version)
if self.method_form.method.value == "manual": if self.method_form.method.value == "manual":
form = self.manual_form form = self.manual_form
# XXX this converting from and to and from strings thing is a # XXX this converting from and to and from strings thing is a
@ -239,25 +232,26 @@ class EditNetworkStretchy(Stretchy):
gateway = form.gateway.value gateway = form.gateway.value
if gateway is not None: if gateway is not None:
gateway = str(gateway) gateway = str(gateway)
result = { address = str(form.address.value).split('/')[0]
'network': str(form.subnet.value), address += '/' + str(form.subnet.value).split('/')[1]
'address': str(form.address.value), config = StaticConfig(
'gateway': gateway, addresses=[address],
'nameservers': list(map(str, form.nameservers.value)), gateway=gateway,
'searchdomains': form.searchdomains.value, nameservers=list(map(str, form.nameservers.value)),
} searchdomains=form.searchdomains.value)
log.debug( log.debug(
"EditNetworkStretchy %s manual result=%s", "EditNetworkStretchy %s manual config=%s",
self.ip_version, result) self.ip_version, config)
self.device.config.pop('nameservers', None) self.parent.controller.set_static_config(
self.device.add_network(self.ip_version, result) self.dev_info, self.ip_version, config)
elif self.method_form.method.value == "dhcp": elif self.method_form.method.value == "dhcp":
log.debug("EditNetworkStretchy %s dhcp", self.ip_version) self.parent.controller.enable_dhcp(self.dev_info, self.ip_version)
self.device.config['dhcp{v}'.format(v=self.ip_version)] = True log.debug("EditNetworkStretchy %s, dhcp", self.ip_version)
else: else:
self.parent.controller.disable_network(
self.dev_info, self.ip_version)
log.debug("EditNetworkStretchy %s, disabled", self.ip_version) log.debug("EditNetworkStretchy %s, disabled", self.ip_version)
self.parent.controller.apply_config() self.parent.update_link(self.dev_info)
self.parent.update_link(self.device)
self.parent.remove_overlay() self.parent.remove_overlay()
def cancel(self, sender=None): def cancel(self, sender=None):
@ -268,9 +262,9 @@ class VlanForm(Form):
ok_label = _("Create") ok_label = _("Create")
def __init__(self, parent, device): def __init__(self, parent, dev_info):
self.parent = parent self.parent = parent
self.device = device self.dev_info = dev_info
super().__init__() super().__init__()
vlan = StringField(_("VLAN ID:")) vlan = StringField(_("VLAN ID:"))
@ -286,10 +280,9 @@ class VlanForm(Form):
return vlanid return vlanid
def validate_vlan(self): def validate_vlan(self):
new_name = '%s.%s' % (self.device.name, self.vlan.value) new_name = '%s.%s' % (self.dev_info.name, self.vlan.value)
if new_name in self.parent.model.devices_by_name: if new_name in self.parent.cur_netdev_names:
if self.parent.model.devices_by_name[new_name].config is not None: return _("{netdev} already exists").format(netdev=new_name)
return _("{netdev} already exists").format(netdev=new_name)
class AddVlanStretchy(Stretchy): class AddVlanStretchy(Stretchy):
@ -310,31 +303,24 @@ class AddVlanStretchy(Stretchy):
"AddVlanStretchy.done %s %s", "AddVlanStretchy.done %s %s",
self.device.name, self.form.vlan.value) self.device.name, self.form.vlan.value)
self.parent.remove_overlay() self.parent.remove_overlay()
dev = self.parent.controller.add_vlan( dev_info = self.parent.controller.add_vlan(
self.device, self.form.vlan.value) self.device, self.form.vlan.value)
self.parent.new_link(dev) self.parent.new_link(dev_info)
self.parent.controller.apply_config()
def cancel(self, sender=None): def cancel(self, sender=None):
self.parent.remove_overlay() self.parent.remove_overlay()
class ViewInterfaceInfo(Stretchy): class ViewInterfaceInfo(Stretchy):
def __init__(self, parent, device): def __init__(self, parent, dev_info):
self.parent = parent self.parent = parent
if device.info is not None:
result = yaml.dump(
device.info.serialize(), default_flow_style=False)
else:
result = "Configured but not yet created {type} interface.".format(
type=device.type)
widgets = [ widgets = [
Text(result), Text(self.parent.controller.get_info_for_netdev(dev_info)),
Text(""), Text(""),
button_pile([done_btn(_("Close"), on_press=self.close)]), button_pile([done_btn(_("Close"), on_press=self.close)]),
] ]
# {device} is the name of a network device # {device} is the name of a network device
title = _("Info for {device}").format(device=device.name) title = _("Info for {device}").format(device=dev_info.name)
super().__init__(title, widgets, 0, 2) super().__init__(title, widgets, 0, 2)
def close(self, button=None): def close(self, button=None):
@ -351,7 +337,7 @@ class MultiNetdevChooser(WidgetWrap, WantsToKnowFormField):
@property @property
def value(self): def value(self):
return list(sorted(self.selected, key=lambda x: x.name)) return list(sorted(self.selected))
@value.setter @value.setter
def value(self, value): def value(self, value):
@ -362,7 +348,7 @@ class MultiNetdevChooser(WidgetWrap, WantsToKnowFormField):
def set_bound_form_field(self, bff): def set_bound_form_field(self, bff):
contents = [] contents = []
for d in bff.form.candidate_netdevs: for d in bff.form.candidate_netdevs:
box = CheckBox(d.name, on_state_change=self._state_change) box = CheckBox(d, on_state_change=self._state_change)
self.box_to_device[box] = d self.box_to_device[box] = d
contents.append((box, self.pile.options('pack'))) contents.append((box, self.pile.options('pack')))
self.pile.contents[:] = contents self.pile.contents[:] = contents
@ -389,7 +375,7 @@ class BondForm(Form):
self._select_level(None, self.mode.value) self._select_level(None, self.mode.value)
name = StringField(_("Name:")) name = StringField(_("Name:"))
devices = MultiNetdevField(_("Devices: ")) interfaces = MultiNetdevField(_("Devices: "))
mode = ChoiceField(_("Bond mode:"), choices=BondParameters.modes) mode = ChoiceField(_("Bond mode:"), choices=BondParameters.modes)
xmit_hash_policy = ChoiceField( xmit_hash_policy = ChoiceField(
_("XMIT hash policy:"), choices=BondParameters.xmit_hash_policies) _("XMIT hash policy:"), choices=BondParameters.xmit_hash_policies)
@ -416,12 +402,11 @@ class BondForm(Form):
class BondStretchy(Stretchy): class BondStretchy(Stretchy):
def __init__(self, parent, existing=None): def __init__(self, parent, existing_bond_info, candidate_names):
self.parent = parent self.parent = parent
self.existing = existing all_netdev_names = set(parent.cur_netdev_names)
all_netdev_names = { if existing_bond_info is None:
device.name for device in parent.model.get_all_netdevs()} self.existing_name = None
if existing is None:
title = _('Create bond') title = _('Create bond')
label = _("Create") label = _("Create")
x = 0 x = 0
@ -431,41 +416,29 @@ class BondStretchy(Stretchy):
break break
x += 1 x += 1
initial = { initial = {
'devices': set(), 'interfaces': [],
'name': name, 'name': name,
} }
else: else:
self.existing_name = existing_bond_info.name
bondconfig = existing_bond_info.bond
title = _('Edit bond') title = _('Edit bond')
label = _("Save") label = _("Save")
all_netdev_names.remove(existing.name) all_netdev_names.remove(self.existing_name)
params = existing.config['parameters'] mode = bondconfig.mode
mode = params['mode']
initial = { initial = {
'devices': set([ 'interfaces': bondconfig.interfaces,
parent.model.get_netdev_by_name(name) 'name': self.existing_name,
for name in existing.config['interfaces']]),
'name': existing.name,
'mode': mode, 'mode': mode,
} }
if mode in BondParameters.supports_xmit_hash_policy: if mode in BondParameters.supports_xmit_hash_policy:
initial['xmit_hash_policy'] = params['transmit-hash-policy'] initial['xmit_hash_policy'] = bondconfig.xmit_hash_policy
if mode in BondParameters.supports_lacp_rate: if mode in BondParameters.supports_lacp_rate:
initial['lacp_rate'] = params['lacp-rate'] initial['lacp_rate'] = bondconfig.lacp_rate
def device_ok(device): candidate_names = sorted(candidate_names + initial['interfaces'])
if device is existing:
return False
if device in initial['devices']:
return True
if device.type in ("vlan", "bond"):
return False
return not device.is_bond_slave
candidate_netdevs = [ self.form = BondForm(initial, candidate_names, all_netdev_names)
device for device in parent.model.get_all_netdevs()
if device_ok(device)]
self.form = BondForm(initial, candidate_netdevs, all_netdev_names)
self.form.buttons.base_widget[0].set_label(label) self.form.buttons.base_widget[0].set_label(label)
connect_signal(self.form, 'submit', self.done) connect_signal(self.form, 'submit', self.done)
connect_signal(self.form, 'cancel', self.cancel) connect_signal(self.form, 'cancel', self.cancel)
@ -475,26 +448,13 @@ class BondStretchy(Stretchy):
0, 0) 0, 0)
def done(self, sender): def done(self, sender):
log.debug("BondStretchy.done result=%s", self.form.as_data()) result = self.form.as_data()
touched_devices = set() log.debug("BondStretchy.done result=%s", result)
get_netdev_by_name = self.parent.model.get_netdev_by_name new_name = result.pop('name')
if self.existing: new_config = BondConfig(**result)
for name in self.existing.config['interfaces']: self.parent.controller.add_or_update_bond(
touched_devices.add(get_netdev_by_name(name)) self.existing_name, new_name, new_config)
bond = self.existing
self.parent.controller.add_or_update_bond(
self.existing, self.form.as_data())
self.parent.update_link(bond)
else:
bond = self.parent.controller.add_or_update_bond(
None, self.form.as_data())
self.parent.new_link(bond)
for name in self.form.devices.value:
touched_devices.add(name)
for dev in touched_devices:
self.parent.update_link(dev)
self.parent.remove_overlay() self.parent.remove_overlay()
self.parent.controller.apply_config()
def cancel(self, sender=None): def cancel(self, sender=None):
self.parent.remove_overlay() self.parent.remove_overlay()

View File

@ -7,6 +7,7 @@ from urwid import (
Text, Text,
) )
from subiquitycore.models.network import WLANConfig
from subiquitycore.ui.buttons import cancel_btn, menu_btn from subiquitycore.ui.buttons import cancel_btn, menu_btn
from subiquitycore.ui.container import ( from subiquitycore.ui.container import (
ListBox, ListBox,
@ -63,22 +64,21 @@ class WLANForm(Form):
class NetworkConfigureWLANStretchy(Stretchy): class NetworkConfigureWLANStretchy(Stretchy):
def __init__(self, parent, device): def __init__(self, parent, dev_info):
self.parent = parent self.parent = parent
self.device = device self.dev_info = dev_info
title = _("Network interface {nic} WIFI configuration").format( title = _("Network interface {nic} WIFI configuration").format(
nic=device.name) nic=dev_info.name)
self.form = WLANForm() self.form = WLANForm()
connect_signal(self.form, 'submit', self.done) connect_signal(self.form, 'submit', self.done)
connect_signal(self.form, 'cancel', self.cancel) connect_signal(self.form, 'cancel', self.cancel)
ssid, psk = self.device.configured_ssid if self.dev_info.wlan.config.ssid:
if ssid: self.form.ssid.value = self.dev_info.wlan.config.ssid
self.form.ssid.value = ssid if self.dev_info.wlan.config.psk:
if psk: self.form.psk.value = self.dev_info.wlan.config.psk
self.form.psk.value = psk
self.ssid_row = self.form.ssid._table self.ssid_row = self.form.ssid._table
self.psk_row = self.form.psk._table self.psk_row = self.form.psk._table
@ -96,8 +96,8 @@ class NetworkConfigureWLANStretchy(Stretchy):
def show_ssid_list(self, sender): def show_ssid_list(self, sender):
self.parent.show_overlay( self.parent.show_overlay(
NetworkList( NetworkList(self, self.dev_info.wlan.visible_ssids),
self, self.device.info.wlan['visible_ssids']), width=60) width=60)
def start_scan(self, sender): def start_scan(self, sender):
fp = self.inputs.focus_position - 1 fp = self.inputs.focus_position - 1
@ -105,19 +105,20 @@ class NetworkConfigureWLANStretchy(Stretchy):
fp -= 1 fp -= 1
self.inputs.focus_position = fp self.inputs.focus_position = fp
try: try:
self.parent.controller.start_scan(self.device) self.parent.controller.start_scan(self.dev_info)
except RuntimeError as r: except RuntimeError as r:
log.exception("start_scan failed") log.exception("start_scan failed")
self.error.set_text("%s" % (r,)) self.error.set_text("%s" % (r,))
def _build_iface_inputs(self): def _build_iface_inputs(self):
if len(self.device.info.wlan['visible_ssids']) > 0: visible_ssids = self.dev_info.wlan.visible_ssids
if len(visible_ssids) > 0:
networks_btn = menu_btn("Choose a visible network", networks_btn = menu_btn("Choose a visible network",
on_press=self.show_ssid_list) on_press=self.show_ssid_list)
else: else:
networks_btn = disabled(menu_btn("No visible networks")) networks_btn = disabled(menu_btn("No visible networks"))
if not self.device.info.wlan['scan_state']: if not self.dev_info.wlan.scan_state:
scan_btn = menu_btn("Scan for networks", on_press=self.start_scan) scan_btn = menu_btn("Scan for networks", on_press=self.start_scan)
else: else:
scan_btn = disabled(menu_btn("Scanning for networks")) scan_btn = disabled(menu_btn("Scanning for networks"))
@ -136,22 +137,16 @@ class NetworkConfigureWLANStretchy(Stretchy):
] ]
return col return col
def refresh_model_inputs(self): def update_link(self, dev_info):
try: self.dev_info = dev_info
self.device = self.parent.model.get_netdev_by_name(
self.device.name)
except KeyError:
# The interface is gone
self.parent.remove_overlay()
return
self.inputs.contents = [(obj, ('pack', None)) self.inputs.contents = [(obj, ('pack', None))
for obj in self._build_iface_inputs()] for obj in self._build_iface_inputs()]
def done(self, sender): def done(self, sender):
if self.device.configured_ssid[0] is None and self.form.ssid.value: if self.dev_info.wlan.config.ssid is None and self.form.ssid.value:
# Turn DHCP4 on by default when specifying an SSID for # Turn DHCP4 on by default when specifying an SSID for
# the first time... # the first time...
self.device.config['dhcp4'] = True self.parent.controller.enable_dhcp(self.dev_info, 4)
if self.form.ssid.value: if self.form.ssid.value:
ssid = self.form.ssid.value ssid = self.form.ssid.value
else: else:
@ -160,8 +155,9 @@ class NetworkConfigureWLANStretchy(Stretchy):
psk = self.form.psk.value psk = self.form.psk.value
else: else:
psk = None psk = None
self.device.set_ssid_psk(ssid, psk) self.parent.controller.set_wlan(
self.parent.update_link(self.device) self.dev_info, WLANConfig(ssid=ssid, psk=psk))
self.parent.update_link(self.dev_info)
self.parent.remove_overlay() self.parent.remove_overlay()
def cancel(self, sender=None): def cancel(self, sender=None):

View File

@ -1,10 +1,14 @@
import enum
import typing
import unittest import unittest
from unittest import mock from unittest import mock
import attr
import urwid import urwid
from subiquitycore.controllers.network import NetworkController from subiquitycore.controllers.network import NetworkController
from subiquitycore.models.network import NetworkDev from subiquitycore.models.network import NetDevInfo, StaticConfig
from subiquitycore.testing import view_helpers from subiquitycore.testing import view_helpers
from subiquitycore.ui.views.network_configure_manual_interface import ( from subiquitycore.ui.views.network_configure_manual_interface import (
EditNetworkStretchy, EditNetworkStretchy,
@ -22,15 +26,47 @@ valid_data = {
} }
def create_test_instance(cls, name=(), *, overrides={}):
if '.'.join(name) in overrides:
return overrides['.'.join(name)]
if attr.has(cls):
args = {}
for field in attr.fields(cls):
args[field.name] = create_test_instance(
field.type, name + (field.name,), overrides=overrides)
return cls(**args)
elif hasattr(cls, '__origin__'):
t = cls.__origin__
if t is typing.Union and type(None) in cls.__args__:
return None
elif t in [list, typing.List]:
return [
create_test_instance(
cls.__args__[0], name, overrides=overrides),
]
else:
raise Exception(
"do not understand annotation {} at {}".format(
t, '.'.join(name)))
elif issubclass(cls, enum.Enum):
return next(iter(cls))
else:
try:
return cls()
except Exception:
raise Exception("instantiating {} failed".format(cls))
class TestNetworkConfigureIPv4InterfaceView(unittest.TestCase): class TestNetworkConfigureIPv4InterfaceView(unittest.TestCase):
def make_view(self): def make_view(self, dev_info=None):
device = mock.create_autospec(spec=NetworkDev) if dev_info is None:
device.config = {} dev_info = create_test_instance(
NetDevInfo, overrides={'static4.addresses': ['1.2.3.4/5']})
base_view = BaseView(urwid.Text("")) base_view = BaseView(urwid.Text(""))
base_view.update_link = lambda device: None base_view.update_link = lambda device: None
base_view.controller = mock.create_autospec(spec=NetworkController) base_view.controller = mock.create_autospec(spec=NetworkController)
stretchy = EditNetworkStretchy(base_view, device, 4) stretchy = EditNetworkStretchy(base_view, dev_info, 4)
base_view.show_stretchy_overlay(stretchy) base_view.show_stretchy_overlay(stretchy)
stretchy.method_form.method.value = "manual" stretchy.method_form.method.value = "manual"
return base_view, stretchy return base_view, stretchy
@ -45,10 +81,16 @@ class TestNetworkConfigureIPv4InterfaceView(unittest.TestCase):
self.fail("method widget not focus") self.fail("method widget not focus")
def test_done_initially_disabled(self): def test_done_initially_disabled(self):
_, stretchy = self.make_view() dev_info = create_test_instance(
NetDevInfo, overrides={'static4.addresses': []})
_, stretchy = self.make_view(dev_info)
self.assertFalse(stretchy.manual_form.done_btn.enabled) self.assertFalse(stretchy.manual_form.done_btn.enabled)
def test_done_enabled_for_valid_data(self): def test_done_enabled_for_valid_data(self):
valid_data = {
'subnet': '10.0.2.0/24',
'address': '10.0.2.15',
}
_, stretchy = self.make_view() _, stretchy = self.make_view()
view_helpers.enter_data(stretchy.manual_form, valid_data) view_helpers.enter_data(stretchy.manual_form, valid_data)
self.assertTrue(stretchy.manual_form.done_btn.enabled) self.assertTrue(stretchy.manual_form.done_btn.enabled)
@ -56,20 +98,29 @@ class TestNetworkConfigureIPv4InterfaceView(unittest.TestCase):
def test_click_done(self): def test_click_done(self):
# The ugliness of this test is probably an indication that the # The ugliness of this test is probably an indication that the
# view is doing too much... # view is doing too much...
view, stretchy = self.make_view() dev_info = create_test_instance(
NetDevInfo, overrides={'static4.addresses': []})
view, stretchy = self.make_view(dev_info)
valid_data = {
'subnet': '10.0.2.0/24',
'address': '10.0.2.15',
'gateway': '10.0.2.2',
'nameservers': '8.8.8.8, 1.1.1.1',
'searchdomains': '.custom, .zzz',
}
view_helpers.enter_data(stretchy.manual_form, valid_data) view_helpers.enter_data(stretchy.manual_form, valid_data)
expected = valid_data.copy() expected = StaticConfig(
expected['nameservers'] = [expected['nameservers']] addresses=['10.0.2.15/24'],
expected['searchdomains'] = [expected['searchdomains']] gateway='10.0.2.2',
expected['network'] = expected.pop('subnet') nameservers=['8.8.8.8', '1.1.1.1'],
searchdomains=['.custom', '.zzz'])
but = view_helpers.find_button_matching(view, "^Save$") but = view_helpers.find_button_matching(view, "^Save$")
view_helpers.click(but) view_helpers.click(but)
rinfv = stretchy.device.remove_ip_networks_for_version view.controller.set_static_config.assert_called_once_with(
rinfv.assert_called_once_with(4) stretchy.dev_info, 4, expected)
stretchy.device.add_network.assert_called_once_with(4, expected)
class FakeLink: class FakeLink:
@ -80,12 +131,11 @@ class FakeLink:
class TestViewInterfaceInfo(unittest.TestCase): class TestViewInterfaceInfo(unittest.TestCase):
def make_view(self, *, info): def make_view(self, *, info):
device = mock.create_autospec(spec=NetworkDev) dev_info = create_test_instance(NetDevInfo)
device.config = {}
device.info = info
device.type = "vlan"
base_view = BaseView(urwid.Text("")) base_view = BaseView(urwid.Text(""))
stretchy = ViewInterfaceInfo(base_view, device) base_view.controller = mock.Mock()
base_view.controller.get_info_for_netdev.return_value = "INFO"
stretchy = ViewInterfaceInfo(base_view, dev_info)
base_view.show_stretchy_overlay(stretchy) base_view.show_stretchy_overlay(stretchy)
return base_view, stretchy return base_view, stretchy
@ -94,10 +144,3 @@ class TestViewInterfaceInfo(unittest.TestCase):
text = view_helpers.find_with_pred( text = view_helpers.find_with_pred(
view, lambda w: isinstance(w, urwid.Text) and "INFO" in w.text) view, lambda w: isinstance(w, urwid.Text) and "INFO" in w.text)
self.assertNotEqual(text, None) self.assertNotEqual(text, None)
def test_view_virtual(self):
view, stretchy = self.make_view(info=None)
text = view_helpers.find_with_pred(
view, lambda w: isinstance(
w, urwid.Text) and "Configured but not yet created" in w.text)
self.assertNotEqual(text, None)