diff --git a/examples/uaclient-status-expired.json b/examples/uaclient-status-expired.json
new file mode 100644
index 00000000..683337c8
--- /dev/null
+++ b/examples/uaclient-status-expired.json
@@ -0,0 +1,49 @@
+{
+ "version": "27.4.2~21.10.1",
+ "effective": null,
+ "expires": "2010-12-31T00:00:00+00:00",
+ "services": [
+ {
+ "name": "cis",
+ "description": "Center for Internet Security Audit Tools",
+ "entitled": "yes",
+ "auto_enabled": "no",
+ "available": "yes"
+ },
+ {
+ "name": "esm-apps",
+ "description": "UA Apps: Extended Security Maintenance (ESM)",
+ "entitled": "yes",
+ "auto_enabled": "yes",
+ "available": "yes"
+ },
+ {
+ "name": "esm-infra",
+ "description": "UA Infra: Extended Security Maintenance (ESM)",
+ "entitled": "yes",
+ "auto_enabled": "yes",
+ "available": "yes"
+ },
+ {
+ "name": "fips",
+ "description": "NIST-certified core packages",
+ "entitled": "yes",
+ "auto_enabled": "no",
+ "available": "yes"
+ },
+ {
+ "name": "fips-updates",
+ "description": "NIST-certified core packages with priority security updates",
+ "entitled": "yes",
+ "auto_enabled": "no",
+ "available": "yes"
+ },
+ {
+ "name": "livepatch",
+ "description": "Canonical Livepatch service",
+ "entitled": "yes",
+ "auto_enabled": "yes",
+ "available": "yes"
+ }
+ ]
+}
diff --git a/examples/uaclient-status-valid.json b/examples/uaclient-status-valid.json
new file mode 100644
index 00000000..58dd8199
--- /dev/null
+++ b/examples/uaclient-status-valid.json
@@ -0,0 +1,49 @@
+{
+ "version": "27.4.2~21.10.1",
+ "effective": null,
+ "expires": "2035-12-31T00:00:00+00:00",
+ "services": [
+ {
+ "name": "cis",
+ "description": "Center for Internet Security Audit Tools",
+ "entitled": "yes",
+ "auto_enabled": "no",
+ "available": "yes"
+ },
+ {
+ "name": "esm-apps",
+ "description": "UA Apps: Extended Security Maintenance (ESM)",
+ "entitled": "yes",
+ "auto_enabled": "yes",
+ "available": "yes"
+ },
+ {
+ "name": "esm-infra",
+ "description": "UA Infra: Extended Security Maintenance (ESM)",
+ "entitled": "yes",
+ "auto_enabled": "yes",
+ "available": "yes"
+ },
+ {
+ "name": "fips",
+ "description": "NIST-certified core packages",
+ "entitled": "yes",
+ "auto_enabled": "no",
+ "available": "yes"
+ },
+ {
+ "name": "fips-updates",
+ "description": "NIST-certified core packages with priority security updates",
+ "entitled": "yes",
+ "auto_enabled": "no",
+ "available": "yes"
+ },
+ {
+ "name": "livepatch",
+ "description": "Canonical Livepatch service",
+ "entitled": "yes",
+ "auto_enabled": "yes",
+ "available": "yes"
+ }
+ ]
+}
diff --git a/subiquity/client/controllers/ubuntu_advantage.py b/subiquity/client/controllers/ubuntu_advantage.py
index bc09af4b..f7e09bda 100644
--- a/subiquity/client/controllers/ubuntu_advantage.py
+++ b/subiquity/client/controllers/ubuntu_advantage.py
@@ -17,7 +17,18 @@
import logging
+from subiquitycore.async_helpers import schedule_task
+
from subiquity.client.controller import SubiquityTuiController
+from subiquity.common.ubuntu_advantage import (
+ InvalidUATokenError,
+ ExpiredUATokenError,
+ CheckSubscriptionError,
+ UAInterface,
+ UAInterfaceStrategy,
+ MockedUAInterfaceStrategy,
+ UAClientUAInterfaceStrategy,
+)
from subiquity.common.types import UbuntuAdvantageInfo
from subiquity.ui.views.ubuntu_advantage import UbuntuAdvantageView
@@ -32,13 +43,19 @@ class UbuntuAdvantageController(SubiquityTuiController):
endpoint_name = "ubuntu_advantage"
+ def __init__(self, app):
+ """ Initializer for client-side UA controller. """
+ strategy: UAInterfaceStrategy
+ if app.opts.dry_run:
+ strategy = MockedUAInterfaceStrategy(scale_factor=app.scale_factor)
+ else:
+ strategy = UAClientUAInterfaceStrategy()
+ self.ua_interface = UAInterface(strategy)
+ super().__init__(app)
+
async def make_ui(self) -> UbuntuAdvantageView:
""" Generate the UI, based on the data provided by the model. """
- # TODO remove these two lines to enable this screen
- await self.endpoint.skip.POST()
- raise Skip("Hiding the screen until we can validate the token.")
-
dry_run: bool = self.app.opts.dry_run
if "LTS" not in lsb_release(dry_run=dry_run)["description"]:
await self.endpoint.skip.POST()
@@ -51,6 +68,31 @@ class UbuntuAdvantageController(SubiquityTuiController):
if "token" in self.answers:
self.done(self.answers["token"])
+ def check_token(self, token: str):
+ """ Asynchronously check the token passed as an argument. """
+ async def inner() -> None:
+ try:
+ svcs = await self.ua_interface.get_avail_services(token=token)
+ except InvalidUATokenError:
+ if isinstance(self.ui.body, UbuntuAdvantageView):
+ self.ui.body.show_invalid_token()
+ except ExpiredUATokenError:
+ if isinstance(self.ui.body, UbuntuAdvantageView):
+ self.ui.body.show_expired_token()
+ except CheckSubscriptionError:
+ if isinstance(self.ui.body, UbuntuAdvantageView):
+ self.ui.body.show_unknown_error()
+ else:
+ if isinstance(self.ui.body, UbuntuAdvantageView):
+ self.ui.body.show_available_services(svcs)
+
+ self._check_task = schedule_task(inner())
+
+ def cancel_check_token(self) -> None:
+ """ Cancel the asynchronous token check (if started). """
+ if self._check_task is not None:
+ self._check_task.cancel()
+
def cancel(self) -> None:
self.app.prev_screen()
diff --git a/subiquity/common/tests/test_ubuntu_advantage.py b/subiquity/common/tests/test_ubuntu_advantage.py
new file mode 100644
index 00000000..f68f47f4
--- /dev/null
+++ b/subiquity/common/tests/test_ubuntu_advantage.py
@@ -0,0 +1,132 @@
+# Copyright 2021 Canonical, Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from subprocess import CalledProcessError, CompletedProcess
+import unittest
+from unittest.mock import patch
+
+from subiquity.common.ubuntu_advantage import (
+ InvalidUATokenError,
+ ExpiredUATokenError,
+ CheckSubscriptionError,
+ UAInterface,
+ MockedUAInterfaceStrategy,
+ UAClientUAInterfaceStrategy,
+ )
+from subiquitycore.tests.util import run_coro
+
+
+class TestMockedUAInterfaceStrategy(unittest.TestCase):
+ def setUp(self):
+ self.strategy = MockedUAInterfaceStrategy(scale_factor=1_000_000)
+
+ def test_query_info_invalid(self):
+ # Tokens starting with "i" in dry-run mode cause the token to be
+ # reported as invalid.
+ with self.assertRaises(InvalidUATokenError):
+ run_coro(self.strategy.query_info(token="invalidToken"))
+
+ def test_query_info_failure(self):
+ # Tokens starting with "f" in dry-run mode simulate an "internal"
+ # error.
+ with self.assertRaises(CheckSubscriptionError):
+ run_coro(self.strategy.query_info(token="failure"))
+
+ def test_query_info_expired(self):
+ # Tokens starting with "x" is dry-run mode simulate an expired token.
+ info = run_coro(self.strategy.query_info(token="xpiredToken"))
+ self.assertEqual(info["expires"], "2010-12-31T00:00:00+00:00")
+
+ def test_query_info_valid(self):
+ # Other tokens are considered valid in dry-run mode.
+ info = run_coro(self.strategy.query_info(token="validToken"))
+ self.assertEqual(info["expires"], "2035-12-31T00:00:00+00:00")
+
+
+class TestUAClientUAInterfaceStrategy(unittest.TestCase):
+ arun_command = "subiquity.common.ubuntu_advantage.utils.arun_command"
+
+ def test_query_info_succeeded(self):
+ strategy = UAClientUAInterfaceStrategy()
+ command = (
+ "ubuntu-advantage",
+ "status",
+ "--format", "json",
+ "--simulate-with-token", "123456789",
+ )
+
+ with patch(self.arun_command) as mock_arun:
+ mock_arun.return_value = CompletedProcess([], 0)
+ mock_arun.return_value.stdout = "{}"
+ run_coro(strategy.query_info(token="123456789"))
+ mock_arun.assert_called_once_with(command, check=True)
+
+ def test_query_info_failed(self):
+ strategy = UAClientUAInterfaceStrategy()
+ command = (
+ "ubuntu-advantage",
+ "status",
+ "--format", "json",
+ "--simulate-with-token", "123456789",
+ )
+
+ with patch(self.arun_command) as mock_arun:
+ mock_arun.side_effect = CalledProcessError(returncode=1,
+ cmd=command)
+ mock_arun.return_value.stdout = "{}"
+ with self.assertRaises(CheckSubscriptionError):
+ run_coro(strategy.query_info(token="123456789"))
+ mock_arun.assert_called_once_with(command, check=True)
+
+ def test_query_info_invalid_json(self):
+ strategy = UAClientUAInterfaceStrategy()
+ command = (
+ "ubuntu-advantage",
+ "status",
+ "--format", "json",
+ "--simulate-with-token", "123456789",
+ )
+
+ with patch(self.arun_command) as mock_arun:
+ mock_arun.return_value = CompletedProcess([], 0)
+ mock_arun.return_value.stdout = "invalid-json"
+ with self.assertRaises(CheckSubscriptionError):
+ run_coro(strategy.query_info(token="123456789"))
+ mock_arun.assert_called_once_with(command, check=True)
+
+
+class TestUAInterface(unittest.TestCase):
+
+ def test_mocked_get_avail_services(self):
+ strategy = MockedUAInterfaceStrategy(scale_factor=1_000_000)
+ interface = UAInterface(strategy)
+
+ with self.assertRaises(InvalidUATokenError):
+ run_coro(interface.get_avail_services(token="invalidToken"))
+ # Tokens starting with "f" in dry-run mode simulate an "internal"
+ # error.
+ with self.assertRaises(CheckSubscriptionError):
+ run_coro(interface.get_avail_services(token="failure"))
+
+ # Tokens starting with "x" is dry-run mode simulate an expired token.
+ with self.assertRaises(ExpiredUATokenError):
+ run_coro(interface.get_avail_services(token="xpiredToken"))
+
+ # Other tokens are considered valid in dry-run mode.
+ services = run_coro(interface.get_avail_services(token="validToken"))
+ for service in services:
+ self.assertIn("name", service)
+ self.assertIn("description", service)
+ self.assertTrue(service["available"])
diff --git a/subiquity/common/ubuntu_advantage.py b/subiquity/common/ubuntu_advantage.py
new file mode 100644
index 00000000..44f76a6b
--- /dev/null
+++ b/subiquity/common/ubuntu_advantage.py
@@ -0,0 +1,151 @@
+# Copyright 2021 Canonical, Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+""" This module defines utilities to interface with Ubuntu Advantage
+subscriptions. """
+
+from abc import ABC, abstractmethod
+from datetime import datetime as dt
+import json
+import logging
+from subprocess import CalledProcessError, CompletedProcess
+import asyncio
+
+from subiquitycore import utils
+
+
+log = logging.getLogger("subiquitycore.common.ubuntu_advantage")
+
+
+class InvalidUATokenError(Exception):
+ """ Exception to be raised when the supplied token is invalid. """
+ def __init__(self, token: str, message: str = "") -> None:
+ self.token = token
+ self.message = message
+ super().__init__(message)
+
+
+class ExpiredUATokenError(Exception):
+ """ Exception to be raised when the supplied token has expired. """
+ def __init__(self, token: str, expires: str, message: str = "") -> None:
+ self.token = token
+ self.expires = expires
+ self.message = message
+ super().__init__(message)
+
+
+class CheckSubscriptionError(Exception):
+ """ Exception to be raised when we are unable to fetch information about
+ the Ubuntu Advantage subscription. """
+ def __init__(self, token: str, message: str = "") -> None:
+ self.token = token
+ self.message = message
+ super().__init__(message)
+
+
+class UAInterfaceStrategy(ABC):
+ """ Strategy to query information about a UA subscription. """
+ @abstractmethod
+ async def query_info(token: str) -> dict:
+ """ Return information about the UA subscription based on the token
+ provided. """
+
+
+class MockedUAInterfaceStrategy(UAInterfaceStrategy):
+ """ Mocked version of the Ubuntu Advantage interface strategy. The info it
+ returns is based on example files and appearance of the UA token. """
+ def __init__(self, scale_factor: int = 1):
+ self.scale_factor = scale_factor
+ super().__init__()
+
+ async def query_info(self, token: str) -> dict:
+ """ Return the subscription info associated with the supplied
+ UA token. No actual query is done to the UA servers in this
+ implementation. Instead, we create a response based on the following
+ rules:
+ * Tokens starting with "x" will be considered expired.
+ * Tokens starting with "i" will be considered invalid.
+ * Tokens starting with "f" will generate an internal error.
+ """
+ await asyncio.sleep(1 / self.scale_factor)
+
+ if token[0] == "x":
+ path = "examples/uaclient-status-expired.json"
+ elif token[0] == "i":
+ raise InvalidUATokenError(token)
+ elif token[0] == "f":
+ raise CheckSubscriptionError(token)
+ else:
+ path = "examples/uaclient-status-valid.json"
+
+ with open(path, encoding="utf-8") as stream:
+ return json.load(stream)
+
+
+class UAClientUAInterfaceStrategy(UAInterfaceStrategy):
+ """ Strategy that relies on UA client script to retrieve the information.
+ """
+ async def query_info(self, token: str) -> dict:
+ """ Return the subscription info associated with the supplied
+ UA token. The information will be queried using UA client.
+ """
+ command = (
+ "ubuntu-advantage",
+ "status",
+ "--format", "json",
+ "--simulate-with-token", token,
+ )
+ try:
+ proc: CompletedProcess = await utils.arun_command(command,
+ check=True)
+ # TODO check if we're not returning a string or a list
+ return json.loads(proc.stdout)
+ except CalledProcessError:
+ log.exception("Failed to execute command %r", command)
+ # TODO Check if the command failed because the token is invalid.
+ # Currently, ubuntu-advantage fails with the following error when
+ # the token is invalid:
+ # * Failed to connect to authentication server
+ # * Check your Internet connection and try again.
+ except json.JSONDecodeError:
+ log.exception("Failed to parse output of command %r", command)
+
+ message = "Unable to retrieve subscription information."
+ raise CheckSubscriptionError(token, message=message)
+
+
+class UAInterface:
+ """ Interface to obtain Ubuntu Advantage subscription information. """
+ def __init__(self, strategy: UAInterfaceStrategy):
+ self.strategy = strategy
+
+ async def get_subscription(self, token: str) -> dict:
+ """ Return a dictionary containing the subscription information. """
+ return await self.strategy.query_info(token)
+
+ async def get_avail_services(self, token: str) -> list:
+ """ Return a list of available services for the subscription
+ associated with the token provided.
+ """
+ info = await self.get_subscription(token)
+
+ expiration = dt.fromisoformat(info["expires"])
+ if expiration.timestamp() <= dt.utcnow().timestamp():
+ raise ExpiredUATokenError(token, expires=info["expires"])
+
+ def is_avail_service(service: dict) -> bool:
+ # TODO do we need to check for service["entitled"] as well?
+ return service["available"] == "yes"
+
+ return [svc for svc in info["services"] if is_avail_service(svc)]
diff --git a/subiquity/ui/views/ubuntu_advantage.py b/subiquity/ui/views/ubuntu_advantage.py
index 73cf17f7..4a6f5294 100644
--- a/subiquity/ui/views/ubuntu_advantage.py
+++ b/subiquity/ui/views/ubuntu_advantage.py
@@ -16,15 +16,41 @@
import logging
import re
+from typing import List
-from urwid import connect_signal
+from urwid import (
+ connect_signal,
+ LineBox,
+ Text,
+ Widget,
+ )
from subiquitycore.view import BaseView
+from subiquitycore.ui.buttons import (
+ back_btn,
+ cancel_btn,
+ done_btn,
+ ok_btn,
+ )
+from subiquitycore.ui.container import (
+ Pile,
+ WidgetWrap,
+ )
from subiquitycore.ui.form import (
Form,
simple_field,
WantsToKnowFormField,
-)
+ )
+from subiquitycore.ui.spinner import (
+ Spinner,
+ )
+from subiquitycore.ui.stretchy import (
+ Stretchy,
+ )
+from subiquitycore.ui.utils import (
+ button_pile,
+ SomethingFailed,
+ )
from subiquitycore.ui.interactive import StringEditor
@@ -67,6 +93,31 @@ class UbuntuAdvantageForm(Form):
token = UATokenField(_("Ubuntu Advantage token:"), help=ua_help)
+class CheckingUAToken(WidgetWrap):
+ """ Widget displaying a loading animation while checking ubuntu advantage
+ subscription. """
+ def __init__(self, parent: BaseView):
+ """ Initializes the loading animation widget. """
+ self.parent = parent
+ spinner = Spinner(parent.controller.app.aio_loop, style="dots")
+ spinner.start()
+ text = _("Checking Ubuntu Advantage subscription...")
+ button = cancel_btn(label=_("Cancel"), on_press=self.cancel)
+ self.width = len(text) + 4
+ super().__init__(
+ LineBox(
+ Pile([
+ ('pack', Text(' ' + text)),
+ ('pack', spinner),
+ ('pack', button_pile([button])),
+ ])))
+
+ def cancel(self, sender) -> None:
+ """ Close the loading animation and cancel the check operation. """
+ self.parent.controller.cancel_check_token()
+ self.parent.remove_overlay()
+
+
class UbuntuAdvantageView(BaseView):
""" Represent the view of the Ubuntu Advantage configuration. """
@@ -89,11 +140,107 @@ class UbuntuAdvantageView(BaseView):
super().__init__(self.form.as_screen(excerpt=_(self.excerpt)))
def done(self, form: UbuntuAdvantageForm) -> None:
- """ Called when the user presses the Done button. """
+ """ If no token was supplied, move on to the next screen.
+ If a token was provided, open the loading dialog and
+ asynchronously check if the token is valid. """
log.debug("User input: %r", form.as_data())
- self.controller.done(form.token.value)
+ token: str = form.token.value
+ if token:
+ checking_token_overlay = CheckingUAToken(self)
+ self.show_overlay(checking_token_overlay,
+ width=checking_token_overlay.width,
+ min_width=None)
+ self.controller.check_token(token)
+ else:
+ self.controller.done(token)
def cancel(self) -> None:
""" Called when the user presses the Back button. """
self.controller.cancel()
+
+ def show_invalid_token(self) -> None:
+ """ Display an overlay that indicates that the user-supplied token is
+ invalid. """
+ self.remove_overlay()
+ self.show_stretchy_overlay(
+ SomethingFailed(self,
+ "Invalid token.",
+ "The Ubuntu Advantage token that you provided"
+ " is invalid. Please make sure that you typed"
+ " your token correctly."))
+
+ def show_expired_token(self) -> None:
+ """ Display an overlay that indicates that the user-supplied token has
+ expired. """
+ self.remove_overlay()
+ self.show_stretchy_overlay(
+ SomethingFailed(self,
+ "Token expired.",
+ "The Ubuntu Advantage token that you provided"
+ " has expired. Please use a different token."))
+
+ def show_unknown_error(self) -> None:
+ """ Display an overlay that indicates that we were unable to retrieve
+ the subscription information. Reasons can be multiple include lack of
+ network connection, temporary service unavailability, API issue ...
+ The user is prompted to continue anyway or go back.
+ """
+ self.remove_overlay()
+ self.show_stretchy_overlay(ContinueAnywayWidget(self))
+
+ def show_available_services(self, services: dict) -> None:
+ """ Display an overlay with the list of services that will be enabled
+ via Ubuntu Advantage subscription. After the user confirms, the next we
+ will quit the current view and move on. """
+ self.remove_overlay()
+ self.show_stretchy_overlay(ShowServicesWidget(self, services))
+
+
+class ShowServicesWidget(Stretchy):
+ """ Widget to show the available services for UA subscription. """
+ def __init__(self, parent: UbuntuAdvantageView, services: list):
+ """ Initializes the widget by including the list of services as a
+ bullet-point list. """
+ self.parent = parent
+
+ ok = ok_btn(label=_("OK"), on_press=self.ok)
+
+ title = _("Available Services")
+ header = _("List of services that will be enabled through your "
+ "Ubuntu Advantage subscription:")
+
+ widgets: List[Widget] = [Text(header)]
+ widgets.extend([Text(f"* {svc['description']}") for svc in services])
+ widgets.append(button_pile([ok]))
+
+ super().__init__(title, widgets, 0, 0)
+
+ def ok(self, sender) -> None:
+ """ Close the overlay and submit the token. """
+ self.parent.controller.done(self.parent.form.token.value)
+
+
+class ContinueAnywayWidget(Stretchy):
+ """ Widget that requests the user if he wants to go back or continue
+ anyway. """
+ def __init__(self, parent: UbuntuAdvantageView) -> None:
+ """ Initializes the widget by showing two buttons, one to go back and
+ one to move forward anyway. """
+ self.parent = parent
+ back = back_btn(label=_("Back"), on_press=self.back)
+ cont = done_btn(label=_("Continue anyway"), on_press=self.cont)
+ widgets = [
+ Text("Unable to check your subscription information."
+ " Do you want to go back or continue anyway?"),
+ button_pile([back, cont]),
+ ]
+ super().__init__("Unknown error", widgets, 0, 0)
+
+ def back(self, sender) -> None:
+ """ Close the overlay. """
+ self.parent.remove_overlay()
+
+ def cont(self, sender) -> None:
+ """ Move on to the next screen. """
+ self.parent.controller.done(self.parent.form.token.value)