Adds server side username validation.
- system_reserved_names populated from a reserved-names file. - subiquity.server.IdentityController loads it on init. - validation compares the submitted against it. - also checks if exists on passwd database.
This commit is contained in:
parent
65a701e44b
commit
b06073d86d
|
@ -334,11 +334,13 @@ class API:
|
||||||
class check_token:
|
class check_token:
|
||||||
def GET(token: Payload[str]) \
|
def GET(token: Payload[str]) \
|
||||||
-> UbuntuProCheckTokenAnswer: ...
|
-> UbuntuProCheckTokenAnswer: ...
|
||||||
|
|
||||||
class identity:
|
class identity:
|
||||||
def GET() -> IdentityData: ...
|
def GET() -> IdentityData: ...
|
||||||
def POST(data: Payload[IdentityData]): ...
|
def POST(data: Payload[IdentityData]): ...
|
||||||
class validate:
|
|
||||||
def GET(username: str) ->UsernameValidation: ...
|
class validate_username:
|
||||||
|
def GET(username: str) -> UsernameValidation: ...
|
||||||
|
|
||||||
|
|
||||||
class LinkAction(enum.Enum):
|
class LinkAction(enum.Enum):
|
||||||
|
|
|
@ -354,6 +354,7 @@ class IdentityData:
|
||||||
crypted_password: str = attr.ib(default='', repr=False)
|
crypted_password: str = attr.ib(default='', repr=False)
|
||||||
hostname: str = ''
|
hostname: str = ''
|
||||||
|
|
||||||
|
|
||||||
class UsernameValidation(enum.Enum):
|
class UsernameValidation(enum.Enum):
|
||||||
OK = enum.auto()
|
OK = enum.auto()
|
||||||
ALREADY_IN_USE = enum.auto()
|
ALREADY_IN_USE = enum.auto()
|
||||||
|
|
|
@ -16,20 +16,40 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
import pwd
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
|
||||||
from subiquitycore.context import with_context
|
from subiquitycore.context import with_context
|
||||||
|
|
||||||
from subiquity.common.apidef import API
|
from subiquity.common.apidef import API
|
||||||
from subiquity.common.types import IdentityData
|
from subiquity.common.resources import resource_path
|
||||||
|
from subiquity.common.types import IdentityData, UsernameValidation
|
||||||
from subiquity.server.controller import SubiquityController
|
from subiquity.server.controller import SubiquityController
|
||||||
|
|
||||||
log = logging.getLogger('subiquity.server.controllers.identity')
|
log = logging.getLogger('subiquity.server.controllers.identity')
|
||||||
|
|
||||||
|
USERNAME_MAXLEN = 32
|
||||||
|
USERNAME_REGEX = r'[a-z_][a-z0-9_-]*'
|
||||||
|
|
||||||
|
|
||||||
|
def _reserved_names_from_file(path: str) -> set[str]:
|
||||||
|
if os.path.exists(path):
|
||||||
|
with open(path, "r") as f:
|
||||||
|
return {
|
||||||
|
s.split()[0] for line in f.readlines()
|
||||||
|
if (s := line.strip()) and not s.startswith("#")
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
|
||||||
class IdentityController(SubiquityController):
|
class IdentityController(SubiquityController):
|
||||||
|
|
||||||
endpoint = API.identity
|
endpoint = API.identity
|
||||||
|
|
||||||
|
_system_reserved_names = set()
|
||||||
|
_existing_usernames = set()
|
||||||
autoinstall_key = model_name = "identity"
|
autoinstall_key = model_name = "identity"
|
||||||
autoinstall_schema = {
|
autoinstall_schema = {
|
||||||
'type': 'object',
|
'type': 'object',
|
||||||
|
@ -43,6 +63,18 @@ class IdentityController(SubiquityController):
|
||||||
'additionalProperties': False,
|
'additionalProperties': False,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# TODO: Find THE way to initialize application_reserved_names.
|
||||||
|
def __init__(self, app):
|
||||||
|
super().__init__(app)
|
||||||
|
core_reserved_path = resource_path("reserved-usernames")
|
||||||
|
self._system_reserved_names = \
|
||||||
|
_reserved_names_from_file(core_reserved_path)
|
||||||
|
self._system_reserved_names.add('root')
|
||||||
|
|
||||||
|
self._existing_usernames = {
|
||||||
|
u.pw_name for u in pwd.getpwall()
|
||||||
|
}
|
||||||
|
|
||||||
def load_autoinstall_data(self, data):
|
def load_autoinstall_data(self, data):
|
||||||
if data is not None:
|
if data is not None:
|
||||||
identity_data = IdentityData(
|
identity_data = IdentityData(
|
||||||
|
@ -77,4 +109,23 @@ class IdentityController(SubiquityController):
|
||||||
|
|
||||||
async def POST(self, data: IdentityData):
|
async def POST(self, data: IdentityData):
|
||||||
self.model.add_user(data)
|
self.model.add_user(data)
|
||||||
|
if await self.validate_username_GET(data.username) != \
|
||||||
|
UsernameValidation.OK:
|
||||||
|
log.error("Username <%s> is invalid and should not be submitted.",
|
||||||
|
data.username)
|
||||||
await self.configured()
|
await self.configured()
|
||||||
|
|
||||||
|
async def validate_username_GET(self, username: str) -> UsernameValidation:
|
||||||
|
if username in self._existing_usernames:
|
||||||
|
return UsernameValidation.ALREADY_IN_USE
|
||||||
|
|
||||||
|
if username in self._system_reserved_names:
|
||||||
|
return UsernameValidation.SYSTEM_RESERVED
|
||||||
|
|
||||||
|
if not re.match(USERNAME_REGEX, username):
|
||||||
|
return UsernameValidation.INVALID_CHARS
|
||||||
|
|
||||||
|
if len(username) > USERNAME_MAXLEN:
|
||||||
|
return UsernameValidation.TOO_LONG
|
||||||
|
|
||||||
|
return UsernameValidation.OK
|
||||||
|
|
Loading…
Reference in New Issue