Merge pull request #1151 from ogayot/FR-1652

Validate UA token & list available services
This commit is contained in:
Dan Bungert 2022-01-10 10:02:41 -07:00 committed by GitHub
commit 5ae52c000b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 578 additions and 8 deletions

View File

@ -0,0 +1,49 @@
{
"version": "27.4.2~21.10.1",
"effective": null,
"expires": "2010-12-31T00:00:00+00:00",
"services": [
{
"name": "cis",
"description": "Center for Internet Security Audit Tools",
"entitled": "yes",
"auto_enabled": "no",
"available": "yes"
},
{
"name": "esm-apps",
"description": "UA Apps: Extended Security Maintenance (ESM)",
"entitled": "yes",
"auto_enabled": "yes",
"available": "yes"
},
{
"name": "esm-infra",
"description": "UA Infra: Extended Security Maintenance (ESM)",
"entitled": "yes",
"auto_enabled": "yes",
"available": "yes"
},
{
"name": "fips",
"description": "NIST-certified core packages",
"entitled": "yes",
"auto_enabled": "no",
"available": "yes"
},
{
"name": "fips-updates",
"description": "NIST-certified core packages with priority security updates",
"entitled": "yes",
"auto_enabled": "no",
"available": "yes"
},
{
"name": "livepatch",
"description": "Canonical Livepatch service",
"entitled": "yes",
"auto_enabled": "yes",
"available": "yes"
}
]
}

View File

@ -0,0 +1,49 @@
{
"version": "27.4.2~21.10.1",
"effective": null,
"expires": "2035-12-31T00:00:00+00:00",
"services": [
{
"name": "cis",
"description": "Center for Internet Security Audit Tools",
"entitled": "yes",
"auto_enabled": "no",
"available": "yes"
},
{
"name": "esm-apps",
"description": "UA Apps: Extended Security Maintenance (ESM)",
"entitled": "yes",
"auto_enabled": "yes",
"available": "yes"
},
{
"name": "esm-infra",
"description": "UA Infra: Extended Security Maintenance (ESM)",
"entitled": "yes",
"auto_enabled": "yes",
"available": "yes"
},
{
"name": "fips",
"description": "NIST-certified core packages",
"entitled": "yes",
"auto_enabled": "no",
"available": "yes"
},
{
"name": "fips-updates",
"description": "NIST-certified core packages with priority security updates",
"entitled": "yes",
"auto_enabled": "no",
"available": "yes"
},
{
"name": "livepatch",
"description": "Canonical Livepatch service",
"entitled": "yes",
"auto_enabled": "yes",
"available": "yes"
}
]
}

View File

@ -17,7 +17,18 @@
import logging import logging
from subiquitycore.async_helpers import schedule_task
from subiquity.client.controller import SubiquityTuiController from subiquity.client.controller import SubiquityTuiController
from subiquity.common.ubuntu_advantage import (
InvalidUATokenError,
ExpiredUATokenError,
CheckSubscriptionError,
UAInterface,
UAInterfaceStrategy,
MockedUAInterfaceStrategy,
UAClientUAInterfaceStrategy,
)
from subiquity.common.types import UbuntuAdvantageInfo from subiquity.common.types import UbuntuAdvantageInfo
from subiquity.ui.views.ubuntu_advantage import UbuntuAdvantageView from subiquity.ui.views.ubuntu_advantage import UbuntuAdvantageView
@ -32,13 +43,19 @@ class UbuntuAdvantageController(SubiquityTuiController):
endpoint_name = "ubuntu_advantage" endpoint_name = "ubuntu_advantage"
def __init__(self, app):
""" Initializer for client-side UA controller. """
strategy: UAInterfaceStrategy
if app.opts.dry_run:
strategy = MockedUAInterfaceStrategy(scale_factor=app.scale_factor)
else:
strategy = UAClientUAInterfaceStrategy()
self.ua_interface = UAInterface(strategy)
super().__init__(app)
async def make_ui(self) -> UbuntuAdvantageView: async def make_ui(self) -> UbuntuAdvantageView:
""" Generate the UI, based on the data provided by the model. """ """ Generate the UI, based on the data provided by the model. """
# TODO remove these two lines to enable this screen
await self.endpoint.skip.POST()
raise Skip("Hiding the screen until we can validate the token.")
dry_run: bool = self.app.opts.dry_run dry_run: bool = self.app.opts.dry_run
if "LTS" not in lsb_release(dry_run=dry_run)["description"]: if "LTS" not in lsb_release(dry_run=dry_run)["description"]:
await self.endpoint.skip.POST() await self.endpoint.skip.POST()
@ -51,6 +68,31 @@ class UbuntuAdvantageController(SubiquityTuiController):
if "token" in self.answers: if "token" in self.answers:
self.done(self.answers["token"]) self.done(self.answers["token"])
def check_token(self, token: str):
""" Asynchronously check the token passed as an argument. """
async def inner() -> None:
try:
svcs = await self.ua_interface.get_avail_services(token=token)
except InvalidUATokenError:
if isinstance(self.ui.body, UbuntuAdvantageView):
self.ui.body.show_invalid_token()
except ExpiredUATokenError:
if isinstance(self.ui.body, UbuntuAdvantageView):
self.ui.body.show_expired_token()
except CheckSubscriptionError:
if isinstance(self.ui.body, UbuntuAdvantageView):
self.ui.body.show_unknown_error()
else:
if isinstance(self.ui.body, UbuntuAdvantageView):
self.ui.body.show_available_services(svcs)
self._check_task = schedule_task(inner())
def cancel_check_token(self) -> None:
""" Cancel the asynchronous token check (if started). """
if self._check_task is not None:
self._check_task.cancel()
def cancel(self) -> None: def cancel(self) -> None:
self.app.prev_screen() self.app.prev_screen()

View File

@ -0,0 +1,132 @@
# Copyright 2021 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from subprocess import CalledProcessError, CompletedProcess
import unittest
from unittest.mock import patch
from subiquity.common.ubuntu_advantage import (
InvalidUATokenError,
ExpiredUATokenError,
CheckSubscriptionError,
UAInterface,
MockedUAInterfaceStrategy,
UAClientUAInterfaceStrategy,
)
from subiquitycore.tests.util import run_coro
class TestMockedUAInterfaceStrategy(unittest.TestCase):
def setUp(self):
self.strategy = MockedUAInterfaceStrategy(scale_factor=1_000_000)
def test_query_info_invalid(self):
# Tokens starting with "i" in dry-run mode cause the token to be
# reported as invalid.
with self.assertRaises(InvalidUATokenError):
run_coro(self.strategy.query_info(token="invalidToken"))
def test_query_info_failure(self):
# Tokens starting with "f" in dry-run mode simulate an "internal"
# error.
with self.assertRaises(CheckSubscriptionError):
run_coro(self.strategy.query_info(token="failure"))
def test_query_info_expired(self):
# Tokens starting with "x" is dry-run mode simulate an expired token.
info = run_coro(self.strategy.query_info(token="xpiredToken"))
self.assertEqual(info["expires"], "2010-12-31T00:00:00+00:00")
def test_query_info_valid(self):
# Other tokens are considered valid in dry-run mode.
info = run_coro(self.strategy.query_info(token="validToken"))
self.assertEqual(info["expires"], "2035-12-31T00:00:00+00:00")
class TestUAClientUAInterfaceStrategy(unittest.TestCase):
arun_command = "subiquity.common.ubuntu_advantage.utils.arun_command"
def test_query_info_succeeded(self):
strategy = UAClientUAInterfaceStrategy()
command = (
"ubuntu-advantage",
"status",
"--format", "json",
"--simulate-with-token", "123456789",
)
with patch(self.arun_command) as mock_arun:
mock_arun.return_value = CompletedProcess([], 0)
mock_arun.return_value.stdout = "{}"
run_coro(strategy.query_info(token="123456789"))
mock_arun.assert_called_once_with(command, check=True)
def test_query_info_failed(self):
strategy = UAClientUAInterfaceStrategy()
command = (
"ubuntu-advantage",
"status",
"--format", "json",
"--simulate-with-token", "123456789",
)
with patch(self.arun_command) as mock_arun:
mock_arun.side_effect = CalledProcessError(returncode=1,
cmd=command)
mock_arun.return_value.stdout = "{}"
with self.assertRaises(CheckSubscriptionError):
run_coro(strategy.query_info(token="123456789"))
mock_arun.assert_called_once_with(command, check=True)
def test_query_info_invalid_json(self):
strategy = UAClientUAInterfaceStrategy()
command = (
"ubuntu-advantage",
"status",
"--format", "json",
"--simulate-with-token", "123456789",
)
with patch(self.arun_command) as mock_arun:
mock_arun.return_value = CompletedProcess([], 0)
mock_arun.return_value.stdout = "invalid-json"
with self.assertRaises(CheckSubscriptionError):
run_coro(strategy.query_info(token="123456789"))
mock_arun.assert_called_once_with(command, check=True)
class TestUAInterface(unittest.TestCase):
def test_mocked_get_avail_services(self):
strategy = MockedUAInterfaceStrategy(scale_factor=1_000_000)
interface = UAInterface(strategy)
with self.assertRaises(InvalidUATokenError):
run_coro(interface.get_avail_services(token="invalidToken"))
# Tokens starting with "f" in dry-run mode simulate an "internal"
# error.
with self.assertRaises(CheckSubscriptionError):
run_coro(interface.get_avail_services(token="failure"))
# Tokens starting with "x" is dry-run mode simulate an expired token.
with self.assertRaises(ExpiredUATokenError):
run_coro(interface.get_avail_services(token="xpiredToken"))
# Other tokens are considered valid in dry-run mode.
services = run_coro(interface.get_avail_services(token="validToken"))
for service in services:
self.assertIn("name", service)
self.assertIn("description", service)
self.assertTrue(service["available"])

View File

@ -0,0 +1,151 @@
# Copyright 2021 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
""" This module defines utilities to interface with Ubuntu Advantage
subscriptions. """
from abc import ABC, abstractmethod
from datetime import datetime as dt
import json
import logging
from subprocess import CalledProcessError, CompletedProcess
import asyncio
from subiquitycore import utils
log = logging.getLogger("subiquitycore.common.ubuntu_advantage")
class InvalidUATokenError(Exception):
""" Exception to be raised when the supplied token is invalid. """
def __init__(self, token: str, message: str = "") -> None:
self.token = token
self.message = message
super().__init__(message)
class ExpiredUATokenError(Exception):
""" Exception to be raised when the supplied token has expired. """
def __init__(self, token: str, expires: str, message: str = "") -> None:
self.token = token
self.expires = expires
self.message = message
super().__init__(message)
class CheckSubscriptionError(Exception):
""" Exception to be raised when we are unable to fetch information about
the Ubuntu Advantage subscription. """
def __init__(self, token: str, message: str = "") -> None:
self.token = token
self.message = message
super().__init__(message)
class UAInterfaceStrategy(ABC):
""" Strategy to query information about a UA subscription. """
@abstractmethod
async def query_info(token: str) -> dict:
""" Return information about the UA subscription based on the token
provided. """
class MockedUAInterfaceStrategy(UAInterfaceStrategy):
""" Mocked version of the Ubuntu Advantage interface strategy. The info it
returns is based on example files and appearance of the UA token. """
def __init__(self, scale_factor: int = 1):
self.scale_factor = scale_factor
super().__init__()
async def query_info(self, token: str) -> dict:
""" Return the subscription info associated with the supplied
UA token. No actual query is done to the UA servers in this
implementation. Instead, we create a response based on the following
rules:
* Tokens starting with "x" will be considered expired.
* Tokens starting with "i" will be considered invalid.
* Tokens starting with "f" will generate an internal error.
"""
await asyncio.sleep(1 / self.scale_factor)
if token[0] == "x":
path = "examples/uaclient-status-expired.json"
elif token[0] == "i":
raise InvalidUATokenError(token)
elif token[0] == "f":
raise CheckSubscriptionError(token)
else:
path = "examples/uaclient-status-valid.json"
with open(path, encoding="utf-8") as stream:
return json.load(stream)
class UAClientUAInterfaceStrategy(UAInterfaceStrategy):
""" Strategy that relies on UA client script to retrieve the information.
"""
async def query_info(self, token: str) -> dict:
""" Return the subscription info associated with the supplied
UA token. The information will be queried using UA client.
"""
command = (
"ubuntu-advantage",
"status",
"--format", "json",
"--simulate-with-token", token,
)
try:
proc: CompletedProcess = await utils.arun_command(command,
check=True)
# TODO check if we're not returning a string or a list
return json.loads(proc.stdout)
except CalledProcessError:
log.exception("Failed to execute command %r", command)
# TODO Check if the command failed because the token is invalid.
# Currently, ubuntu-advantage fails with the following error when
# the token is invalid:
# * Failed to connect to authentication server
# * Check your Internet connection and try again.
except json.JSONDecodeError:
log.exception("Failed to parse output of command %r", command)
message = "Unable to retrieve subscription information."
raise CheckSubscriptionError(token, message=message)
class UAInterface:
""" Interface to obtain Ubuntu Advantage subscription information. """
def __init__(self, strategy: UAInterfaceStrategy):
self.strategy = strategy
async def get_subscription(self, token: str) -> dict:
""" Return a dictionary containing the subscription information. """
return await self.strategy.query_info(token)
async def get_avail_services(self, token: str) -> list:
""" Return a list of available services for the subscription
associated with the token provided.
"""
info = await self.get_subscription(token)
expiration = dt.fromisoformat(info["expires"])
if expiration.timestamp() <= dt.utcnow().timestamp():
raise ExpiredUATokenError(token, expires=info["expires"])
def is_avail_service(service: dict) -> bool:
# TODO do we need to check for service["entitled"] as well?
return service["available"] == "yes"
return [svc for svc in info["services"] if is_avail_service(svc)]

View File

@ -16,15 +16,41 @@
import logging import logging
import re import re
from typing import List
from urwid import connect_signal from urwid import (
connect_signal,
LineBox,
Text,
Widget,
)
from subiquitycore.view import BaseView from subiquitycore.view import BaseView
from subiquitycore.ui.buttons import (
back_btn,
cancel_btn,
done_btn,
ok_btn,
)
from subiquitycore.ui.container import (
Pile,
WidgetWrap,
)
from subiquitycore.ui.form import ( from subiquitycore.ui.form import (
Form, Form,
simple_field, simple_field,
WantsToKnowFormField, WantsToKnowFormField,
) )
from subiquitycore.ui.spinner import (
Spinner,
)
from subiquitycore.ui.stretchy import (
Stretchy,
)
from subiquitycore.ui.utils import (
button_pile,
SomethingFailed,
)
from subiquitycore.ui.interactive import StringEditor from subiquitycore.ui.interactive import StringEditor
@ -67,6 +93,31 @@ class UbuntuAdvantageForm(Form):
token = UATokenField(_("Ubuntu Advantage token:"), help=ua_help) token = UATokenField(_("Ubuntu Advantage token:"), help=ua_help)
class CheckingUAToken(WidgetWrap):
""" Widget displaying a loading animation while checking ubuntu advantage
subscription. """
def __init__(self, parent: BaseView):
""" Initializes the loading animation widget. """
self.parent = parent
spinner = Spinner(parent.controller.app.aio_loop, style="dots")
spinner.start()
text = _("Checking Ubuntu Advantage subscription...")
button = cancel_btn(label=_("Cancel"), on_press=self.cancel)
self.width = len(text) + 4
super().__init__(
LineBox(
Pile([
('pack', Text(' ' + text)),
('pack', spinner),
('pack', button_pile([button])),
])))
def cancel(self, sender) -> None:
""" Close the loading animation and cancel the check operation. """
self.parent.controller.cancel_check_token()
self.parent.remove_overlay()
class UbuntuAdvantageView(BaseView): class UbuntuAdvantageView(BaseView):
""" Represent the view of the Ubuntu Advantage configuration. """ """ Represent the view of the Ubuntu Advantage configuration. """
@ -89,11 +140,107 @@ class UbuntuAdvantageView(BaseView):
super().__init__(self.form.as_screen(excerpt=_(self.excerpt))) super().__init__(self.form.as_screen(excerpt=_(self.excerpt)))
def done(self, form: UbuntuAdvantageForm) -> None: def done(self, form: UbuntuAdvantageForm) -> None:
""" Called when the user presses the Done button. """ """ If no token was supplied, move on to the next screen.
If a token was provided, open the loading dialog and
asynchronously check if the token is valid. """
log.debug("User input: %r", form.as_data()) log.debug("User input: %r", form.as_data())
self.controller.done(form.token.value) token: str = form.token.value
if token:
checking_token_overlay = CheckingUAToken(self)
self.show_overlay(checking_token_overlay,
width=checking_token_overlay.width,
min_width=None)
self.controller.check_token(token)
else:
self.controller.done(token)
def cancel(self) -> None: def cancel(self) -> None:
""" Called when the user presses the Back button. """ """ Called when the user presses the Back button. """
self.controller.cancel() self.controller.cancel()
def show_invalid_token(self) -> None:
""" Display an overlay that indicates that the user-supplied token is
invalid. """
self.remove_overlay()
self.show_stretchy_overlay(
SomethingFailed(self,
"Invalid token.",
"The Ubuntu Advantage token that you provided"
" is invalid. Please make sure that you typed"
" your token correctly."))
def show_expired_token(self) -> None:
""" Display an overlay that indicates that the user-supplied token has
expired. """
self.remove_overlay()
self.show_stretchy_overlay(
SomethingFailed(self,
"Token expired.",
"The Ubuntu Advantage token that you provided"
" has expired. Please use a different token."))
def show_unknown_error(self) -> None:
""" Display an overlay that indicates that we were unable to retrieve
the subscription information. Reasons can be multiple include lack of
network connection, temporary service unavailability, API issue ...
The user is prompted to continue anyway or go back.
"""
self.remove_overlay()
self.show_stretchy_overlay(ContinueAnywayWidget(self))
def show_available_services(self, services: dict) -> None:
""" Display an overlay with the list of services that will be enabled
via Ubuntu Advantage subscription. After the user confirms, the next we
will quit the current view and move on. """
self.remove_overlay()
self.show_stretchy_overlay(ShowServicesWidget(self, services))
class ShowServicesWidget(Stretchy):
""" Widget to show the available services for UA subscription. """
def __init__(self, parent: UbuntuAdvantageView, services: list):
""" Initializes the widget by including the list of services as a
bullet-point list. """
self.parent = parent
ok = ok_btn(label=_("OK"), on_press=self.ok)
title = _("Available Services")
header = _("List of services that will be enabled through your "
"Ubuntu Advantage subscription:")
widgets: List[Widget] = [Text(header)]
widgets.extend([Text(f"* {svc['description']}") for svc in services])
widgets.append(button_pile([ok]))
super().__init__(title, widgets, 0, 0)
def ok(self, sender) -> None:
""" Close the overlay and submit the token. """
self.parent.controller.done(self.parent.form.token.value)
class ContinueAnywayWidget(Stretchy):
""" Widget that requests the user if he wants to go back or continue
anyway. """
def __init__(self, parent: UbuntuAdvantageView) -> None:
""" Initializes the widget by showing two buttons, one to go back and
one to move forward anyway. """
self.parent = parent
back = back_btn(label=_("Back"), on_press=self.back)
cont = done_btn(label=_("Continue anyway"), on_press=self.cont)
widgets = [
Text("Unable to check your subscription information."
" Do you want to go back or continue anyway?"),
button_pile([back, cont]),
]
super().__init__("Unknown error", widgets, 0, 0)
def back(self, sender) -> None:
""" Close the overlay. """
self.parent.remove_overlay()
def cont(self, sender) -> None:
""" Move on to the next screen. """
self.parent.controller.done(self.parent.form.token.value)