diff --git a/console_conf/controllers/identity.py b/console_conf/controllers/identity.py index 8ae54fb3..c15943c1 100644 --- a/console_conf/controllers/identity.py +++ b/console_conf/controllers/identity.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import json import logging import os import pwd @@ -25,7 +24,7 @@ from subiquitycore import snap from subiquitycore.snapd import SnapdConnection from subiquitycore.ssh import get_ips_standalone, host_key_info from subiquitycore.tuicontroller import TuiController -from subiquitycore.utils import disable_console_conf, run_command +from subiquitycore.utils import disable_console_conf log = logging.getLogger("console_conf.controllers.identity") @@ -52,9 +51,8 @@ def get_core_version(): return version -def get_managed(): +def get_managed(con): """Check if device is managed""" - con = SnapdConnection("", "/run/snapd.socket") return con.get("v2/system-info").json()["result"]["managed"] @@ -66,9 +64,8 @@ def get_realname(username): return info.pw_gecos.split(",", 1)[0] -def get_device_owner(): +def get_device_owner(con): """Get device owner, if any""" - con = SnapdConnection("", "/run/snapd.socket") for user in con.get("v2/users").json()["result"]: if "username" not in user: continue @@ -130,7 +127,9 @@ def write_login_details(fp, username, ips, state_dir=None): def write_login_details_standalone(): - owner = get_device_owner() + # running in standalone mode + con = SnapdConnection("/", "/run/snapd.socket") + owner = get_device_owner(con) ips = get_ips_standalone() if len(ips) == 0: if owner is None: @@ -160,8 +159,8 @@ class IdentityController(TuiController): self.model = app.base_model.identity def make_ui(self): - if get_managed(): - device_owner = get_device_owner() + if get_managed(self.app.snapdcon): + device_owner = get_device_owner(self.app.snapdcon) if device_owner: self.model.add_user(device_owner) return self.make_login_view() @@ -178,18 +177,19 @@ class IdentityController(TuiController): login_details_path = self.opts.output_base + "/login-details.txt" else: self.app.urwid_loop.draw_screen() - cp = run_command(["snap", "create-user", "--sudoer", "--json", email]) - if cp.returncode != 0: + user_action = {"action": "create", "email": email, "sudoer": True} + res = self.app.snapdcon.post("v2/users", body=user_action) + if res.json()["status"] != "OK": if isinstance(self.ui.body, IdentityView): self.ui.body.snap_create_user_failed( - "Creating user failed:", cp.stderr + "Creating user failed:", res.json()["result"]["message"] ) return else: - data = json.loads(cp.stdout) + username = res.json()["result"][0]["username"] result = { "realname": email, - "username": data["username"], + "username": username, } login_details_path = self.app.state_path("login-details.txt") self.model.add_user(result) diff --git a/console_conf/controllers/tests/test_identity.py b/console_conf/controllers/tests/test_identity.py index ff3c3b8b..06269b8c 100644 --- a/console_conf/controllers/tests/test_identity.py +++ b/console_conf/controllers/tests/test_identity.py @@ -18,24 +18,22 @@ import unittest from unittest.mock import MagicMock, patch from console_conf.controllers.identity import IdentityController +from console_conf.ui.views import IdentityView, LoginView from subiquitycore.models.network import NetworkDev +from subiquitycore.snapd import MemoryResponseSet, get_fake_connection from subiquitycore.tests.mocks import make_app class TestIdentityController(unittest.TestCase): @patch("os.ttyname", return_value="/dev/tty1") @patch("console_conf.controllers.identity.get_core_version", return_value="24") - @patch("console_conf.controllers.identity.run_command") - def test_snap_integration(self, run_command, core_version, ttyname): + def test_snap_integration(self, core_version, ttyname): with tempfile.TemporaryDirectory(suffix="console-conf-test") as statedir: - proc_mock = MagicMock() - run_command.return_value = proc_mock - proc_mock.returncode = 0 - proc_mock.stdout = '{"username":"foo"}' - app = make_app() app.state_dir = statedir app.opts.dry_run = False + app.snapdcon = get_fake_connection() + app.state_dir = statedir network_model = MagicMock() mock_devs = [MagicMock(spec=NetworkDev)] network_model.get_all_netdevs.return_value = mock_devs @@ -48,12 +46,77 @@ class TestIdentityController(unittest.TestCase): app.state_path = MagicMock(side_effect=state_path) + create_user_calls = 0 + + def create_user_cb(path, body, **args): + nonlocal create_user_calls + create_user_calls += 1 + self.assertEqual(path, "v2/users") + self.assertEqual( + body, {"action": "create", "email": "foo@bar.com", "sudoer": True} + ) + return { + "status": "OK", + "result": [ + { + "username": "foo", + } + ], + } + + # fake POST handlers + app.snapdcon.post_cb["v2/users"] = create_user_cb + c = IdentityController(app) c.identity_done("foo@bar.com") - run_command.assert_called_with( - ["snap", "create-user", "--sudoer", "--json", "foo@bar.com"] - ) + + self.assertEqual(create_user_calls, 1) with open(os.path.join(statedir, "login-details.txt")) as inf: data = inf.read() self.assertIn("Ubuntu Core 24 on 1.2.3.4 (tty1)\n", data) + + @patch("pwd.getpwnam") + @patch("os.path.isdir", return_value=True) + def test_make_ui_managed_with_user(self, isdir, getpwnam): + pwinfo = MagicMock() + pwinfo.pw_gecos = "Foo,Bar" + getpwnam.return_value = pwinfo + + app = make_app() + app.opts.dry_run = False + app.snapdcon = get_fake_connection() + # app.state_dir = statedir + network_model = MagicMock() + mock_devs = [MagicMock(spec=NetworkDev)] + network_model.get_all_netdevs.return_value = mock_devs + mock_devs[0].actual_global_ip_addresses = ["1.2.3.4"] + app.base_model.network = network_model + + app.snapdcon.response_sets = { + "v2-system-info": MemoryResponseSet([{"result": {"managed": True}}]), + "v2-users": MemoryResponseSet( + [ + # no "username" for first entry + {"result": [{}, {"username": "foo"}]} + ] + ), + } + + c = IdentityController(app) + ui = c.make_ui() + self.assertIsInstance(ui, LoginView) + getpwnam.assert_called_with("foo") + + def test_make_ui_unmanaged(self): + app = make_app() + app.opts.dry_run = False + app.snapdcon = get_fake_connection() + + app.snapdcon.response_sets = { + "v2-system-info": MemoryResponseSet([{"result": {"managed": False}}]), + } + + c = IdentityController(app) + ui = c.make_ui() + self.assertIsInstance(ui, IdentityView) diff --git a/console_conf/core.py b/console_conf/core.py index 257c5e1f..a7bcd6e5 100644 --- a/console_conf/core.py +++ b/console_conf/core.py @@ -19,6 +19,7 @@ from console_conf.models.console_conf import ConsoleConfModel from console_conf.models.systems import RecoverySystemsModel from subiquitycore.prober import Prober from subiquitycore.snap import snap_name +from subiquitycore.snapd import SnapdConnection from subiquitycore.tui import TuiApplication log = logging.getLogger("console_conf.core") @@ -46,6 +47,9 @@ class ConsoleConf(TuiApplication): def __init__(self, opts): super().__init__(opts) self.prober = Prober(opts.machine_config, self.debug_flags) + # we're talking to snapd over the main socket, this may require + # snapd-control if executing inside a snap + self.snapdcon = SnapdConnection(self.root, "/run/snapd.socket") class RecoveryChooser(TuiApplication): diff --git a/subiquitycore/snapd.py b/subiquitycore/snapd.py index 108e8d37..97bba5b2 100644 --- a/subiquitycore/snapd.py +++ b/subiquitycore/snapd.py @@ -121,12 +121,26 @@ class ResponseSet: return _FakeFileResponse(f) +class MemoryResponseSet: + """Set of response for an endpoint which returns data stored in memory.""" + + def __init__(self, data): + self.data = data + self.index = 0 + + def next(self): + d = self.data[self.index] + self.index += 1 + return _FakeMemoryResponse(d) + + class FakeSnapdConnection: def __init__(self, snap_data_dir, scale_factor, output_base): self.snap_data_dir = snap_data_dir self.scale_factor = scale_factor self.response_sets = {} self.output_base = output_base + self.post_cb = {} def configure_proxy(self, proxy): log.debug("pretending to restart snapd to pick up proxy config") @@ -167,6 +181,9 @@ class FakeSnapdConnection: "status": "Accepted", } ) + if path in self.post_cb: + return _FakeMemoryResponse(self.post_cb[path](path, body, **args)) + raise Exception( "Don't know how to fake POST response to {}".format((path, args)) )