use snapd apis to check for a user and (minimally) handle a managed system with no owner

This commit is contained in:
Michael Hudson-Doyle 2020-07-03 15:35:32 +12:00
parent 9271f17dd2
commit 164c575f2a
2 changed files with 57 additions and 37 deletions

View File

@ -16,11 +16,13 @@
import json import json
import logging import logging
import os import os
import pwd
import shlex import shlex
import sys import sys
from subiquitycore.controller import BaseController from subiquitycore.controller import BaseController
from subiquitycore.ssh import host_key_info, get_ips_standalone from subiquitycore.ssh import host_key_info, get_ips_standalone
from subiquitycore.snapd import SnapdConnection
from subiquitycore.utils import disable_console_conf, run_command from subiquitycore.utils import disable_console_conf, run_command
from console_conf.ui.views import IdentityView, LoginView from console_conf.ui.views import IdentityView, LoginView
@ -50,24 +52,31 @@ def get_core_version():
return version return version
def get_device_owner(): def get_managed():
""" Check if device is owned """ """ Check if device is managed """
con = SnapdConnection('', '/run/snapd.socket')
return con.get('v2/system-info').json()['result']['managed']
# TODO: use proper snap APIs.
def get_realname(username):
try: try:
extrausers_fp = open('/var/lib/extrausers/passwd', 'r') info = pwd.getpwnam(username)
except FileNotFoundError: except KeyError:
return None return ''
with extrausers_fp: return info.pw_gecos.split(',', 1)[0]
passwd_line = extrausers_fp.readline()
if passwd_line and len(passwd_line) > 0:
passwd = passwd_line.split(':') def get_device_owner():
result = { """ Get device owner, if any """
'realname': passwd[4].split(',')[0], con = SnapdConnection('', '/run/snapd.socket')
'username': passwd[0], for user in con.get('v2/users').json()['result']:
'homedir': passwd[5], user = user['email'].split('@')[0]
if os.path.isdir('/home/' + user):
return {
'username': user,
'realname': get_realname(user),
'homedir': '/home/' + user,
} }
return result
return None return None
@ -111,17 +120,21 @@ def write_login_details(fp, username, ips):
def write_login_details_standalone(): def write_login_details_standalone():
owner = get_device_owner() owner = get_device_owner()
if owner is None:
print("No device owner details found.")
return 0
ips = get_ips_standalone() ips = get_ips_standalone()
if len(ips) == 0: if len(ips) == 0:
tty_name = os.ttyname(0)[5:] if owner is None:
version = get_core_version() or "16" print("device managed without user")
print(login_details_tmpl_no_ip.format(tty_name=tty_name, return 2
version=version)) else:
return 2 tty_name = os.ttyname(0)[5:]
write_login_details(sys.stdout, owner['username'], ips) version = get_core_version() or "16"
print(login_details_tmpl_no_ip.format(
tty_name=tty_name, version=version))
return 2
if owner is None:
print("device managed without user @ {}".format(', '.join(ips)))
else:
write_login_details(sys.stdout, owner['username'], ips)
return 0 return 0
@ -133,14 +146,15 @@ class IdentityController(BaseController):
def start_ui(self): def start_ui(self):
self.ui.set_body(IdentityView(self.model, self)) self.ui.set_body(IdentityView(self.model, self))
device_owner = get_device_owner() if get_managed():
if device_owner is not None: device_owner = get_device_owner()
self.model.add_user(device_owner) if device_owner:
key_file = os.path.join(device_owner['homedir'], self.model.add_user(device_owner)
".ssh/authorized_keys") key_file = os.path.join(device_owner['homedir'],
self.model.user.fingerprints = ( ".ssh/authorized_keys")
run_command(['ssh-keygen', '-lf', cp = run_command(['ssh-keygen', '-lf', key_file])
key_file]).stdout.replace('\r', '').splitlines()) self.model.user.fingerprints = (
cp.stdout.replace('\r', '').splitlines())
self.login() self.login()
def identity_done(self, email): def identity_done(self, email):

View File

@ -57,6 +57,17 @@ class LoginView(BaseView):
] ]
def _build_model_inputs(self): def _build_model_inputs(self):
user = self.model.user
ips = []
for dev in self.netdevs:
for addr in dev.actual_global_ip_addresses:
ips.append(addr)
if not user:
sl = []
sl.append(Text("no owner"))
return sl
local_tpl = ( local_tpl = (
"This device is registered to {realname}.") "This device is registered to {realname}.")
@ -67,17 +78,12 @@ class LoginView(BaseView):
"device via SSH:") "device via SSH:")
sl = [] sl = []
user = self.model.user
login_info = { login_info = {
'realname': user.realname, 'realname': user.realname,
'username': user.username, 'username': user.username,
} }
login_text = local_tpl.format(**login_info) login_text = local_tpl.format(**login_info)
login_text += remote_tpl.format(**login_info) login_text += remote_tpl.format(**login_info)
ips = []
for dev in self.netdevs:
for addr in dev.actual_global_ip_addresses:
ips.append(addr)
sl += [Text(login_text), Padding.line_break("")] sl += [Text(login_text), Padding.line_break("")]
for ip in ips: for ip in ips: