Merge pull request #1872 from kubiko/console-conf-use-snapd-socket

console_conf: identity: use snapd unix socket
This commit is contained in:
Dan Bungert 2024-02-12 20:17:26 -07:00 committed by GitHub
commit d617ae0f1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 108 additions and 24 deletions

View File

@ -13,7 +13,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
import os
import pwd
@ -25,7 +24,7 @@ from subiquitycore import snap
from subiquitycore.snapd import SnapdConnection
from subiquitycore.ssh import get_ips_standalone, host_key_info
from subiquitycore.tuicontroller import TuiController
from subiquitycore.utils import disable_console_conf, run_command
from subiquitycore.utils import disable_console_conf
log = logging.getLogger("console_conf.controllers.identity")
@ -52,9 +51,8 @@ def get_core_version():
return version
def get_managed():
def get_managed(con):
"""Check if device is managed"""
con = SnapdConnection("", "/run/snapd.socket")
return con.get("v2/system-info").json()["result"]["managed"]
@ -66,9 +64,8 @@ def get_realname(username):
return info.pw_gecos.split(",", 1)[0]
def get_device_owner():
def get_device_owner(con):
"""Get device owner, if any"""
con = SnapdConnection("", "/run/snapd.socket")
for user in con.get("v2/users").json()["result"]:
if "username" not in user:
continue
@ -130,7 +127,9 @@ def write_login_details(fp, username, ips, state_dir=None):
def write_login_details_standalone():
owner = get_device_owner()
# running in standalone mode
con = SnapdConnection("/", "/run/snapd.socket")
owner = get_device_owner(con)
ips = get_ips_standalone()
if len(ips) == 0:
if owner is None:
@ -160,8 +159,8 @@ class IdentityController(TuiController):
self.model = app.base_model.identity
def make_ui(self):
if get_managed():
device_owner = get_device_owner()
if get_managed(self.app.snapdcon):
device_owner = get_device_owner(self.app.snapdcon)
if device_owner:
self.model.add_user(device_owner)
return self.make_login_view()
@ -178,18 +177,19 @@ class IdentityController(TuiController):
login_details_path = self.opts.output_base + "/login-details.txt"
else:
self.app.urwid_loop.draw_screen()
cp = run_command(["snap", "create-user", "--sudoer", "--json", email])
if cp.returncode != 0:
user_action = {"action": "create", "email": email, "sudoer": True}
res = self.app.snapdcon.post("v2/users", body=user_action)
if res.json()["status"] != "OK":
if isinstance(self.ui.body, IdentityView):
self.ui.body.snap_create_user_failed(
"Creating user failed:", cp.stderr
"Creating user failed:", res.json()["result"]["message"]
)
return
else:
data = json.loads(cp.stdout)
username = res.json()["result"][0]["username"]
result = {
"realname": email,
"username": data["username"],
"username": username,
}
login_details_path = self.app.state_path("login-details.txt")
self.model.add_user(result)

View File

@ -18,24 +18,22 @@ import unittest
from unittest.mock import MagicMock, patch
from console_conf.controllers.identity import IdentityController
from console_conf.ui.views import IdentityView, LoginView
from subiquitycore.models.network import NetworkDev
from subiquitycore.snapd import MemoryResponseSet, get_fake_connection
from subiquitycore.tests.mocks import make_app
class TestIdentityController(unittest.TestCase):
@patch("os.ttyname", return_value="/dev/tty1")
@patch("console_conf.controllers.identity.get_core_version", return_value="24")
@patch("console_conf.controllers.identity.run_command")
def test_snap_integration(self, run_command, core_version, ttyname):
def test_snap_integration(self, core_version, ttyname):
with tempfile.TemporaryDirectory(suffix="console-conf-test") as statedir:
proc_mock = MagicMock()
run_command.return_value = proc_mock
proc_mock.returncode = 0
proc_mock.stdout = '{"username":"foo"}'
app = make_app()
app.state_dir = statedir
app.opts.dry_run = False
app.snapdcon = get_fake_connection()
app.state_dir = statedir
network_model = MagicMock()
mock_devs = [MagicMock(spec=NetworkDev)]
network_model.get_all_netdevs.return_value = mock_devs
@ -48,12 +46,77 @@ class TestIdentityController(unittest.TestCase):
app.state_path = MagicMock(side_effect=state_path)
create_user_calls = 0
def create_user_cb(path, body, **args):
nonlocal create_user_calls
create_user_calls += 1
self.assertEqual(path, "v2/users")
self.assertEqual(
body, {"action": "create", "email": "foo@bar.com", "sudoer": True}
)
return {
"status": "OK",
"result": [
{
"username": "foo",
}
],
}
# fake POST handlers
app.snapdcon.post_cb["v2/users"] = create_user_cb
c = IdentityController(app)
c.identity_done("foo@bar.com")
run_command.assert_called_with(
["snap", "create-user", "--sudoer", "--json", "foo@bar.com"]
)
self.assertEqual(create_user_calls, 1)
with open(os.path.join(statedir, "login-details.txt")) as inf:
data = inf.read()
self.assertIn("Ubuntu Core 24 on 1.2.3.4 (tty1)\n", data)
@patch("pwd.getpwnam")
@patch("os.path.isdir", return_value=True)
def test_make_ui_managed_with_user(self, isdir, getpwnam):
pwinfo = MagicMock()
pwinfo.pw_gecos = "Foo,Bar"
getpwnam.return_value = pwinfo
app = make_app()
app.opts.dry_run = False
app.snapdcon = get_fake_connection()
# app.state_dir = statedir
network_model = MagicMock()
mock_devs = [MagicMock(spec=NetworkDev)]
network_model.get_all_netdevs.return_value = mock_devs
mock_devs[0].actual_global_ip_addresses = ["1.2.3.4"]
app.base_model.network = network_model
app.snapdcon.response_sets = {
"v2-system-info": MemoryResponseSet([{"result": {"managed": True}}]),
"v2-users": MemoryResponseSet(
[
# no "username" for first entry
{"result": [{}, {"username": "foo"}]}
]
),
}
c = IdentityController(app)
ui = c.make_ui()
self.assertIsInstance(ui, LoginView)
getpwnam.assert_called_with("foo")
def test_make_ui_unmanaged(self):
app = make_app()
app.opts.dry_run = False
app.snapdcon = get_fake_connection()
app.snapdcon.response_sets = {
"v2-system-info": MemoryResponseSet([{"result": {"managed": False}}]),
}
c = IdentityController(app)
ui = c.make_ui()
self.assertIsInstance(ui, IdentityView)

View File

@ -19,6 +19,7 @@ from console_conf.models.console_conf import ConsoleConfModel
from console_conf.models.systems import RecoverySystemsModel
from subiquitycore.prober import Prober
from subiquitycore.snap import snap_name
from subiquitycore.snapd import SnapdConnection
from subiquitycore.tui import TuiApplication
log = logging.getLogger("console_conf.core")
@ -46,6 +47,9 @@ class ConsoleConf(TuiApplication):
def __init__(self, opts):
super().__init__(opts)
self.prober = Prober(opts.machine_config, self.debug_flags)
# we're talking to snapd over the main socket, this may require
# snapd-control if executing inside a snap
self.snapdcon = SnapdConnection(self.root, "/run/snapd.socket")
class RecoveryChooser(TuiApplication):

View File

@ -121,12 +121,26 @@ class ResponseSet:
return _FakeFileResponse(f)
class MemoryResponseSet:
"""Set of response for an endpoint which returns data stored in memory."""
def __init__(self, data):
self.data = data
self.index = 0
def next(self):
d = self.data[self.index]
self.index += 1
return _FakeMemoryResponse(d)
class FakeSnapdConnection:
def __init__(self, snap_data_dir, scale_factor, output_base):
self.snap_data_dir = snap_data_dir
self.scale_factor = scale_factor
self.response_sets = {}
self.output_base = output_base
self.post_cb = {}
def configure_proxy(self, proxy):
log.debug("pretending to restart snapd to pick up proxy config")
@ -167,6 +181,9 @@ class FakeSnapdConnection:
"status": "Accepted",
}
)
if path in self.post_cb:
return _FakeMemoryResponse(self.post_cb[path](path, body, **args))
raise Exception(
"Don't know how to fake POST response to {}".format((path, args))
)