cloudinit: handle format=json status

Add version checks for cloud-init to know if we can read status in JSON
format or not.  If so, use that for a superior answer to the legacy
format.  Handle legacy code paths also.
This commit is contained in:
Dan Bungert 2024-01-29 12:26:26 -07:00
parent fa8cc0371f
commit b064528ed4
3 changed files with 202 additions and 14 deletions

View File

@ -1,7 +1,12 @@
"""Shared cloudinit utility functions"""
import asyncio
import json
import logging
import re
from typing import Optional
from subiquitycore.utils import arun_command, run_command
log = logging.getLogger("subiquity.cloudinit")
@ -24,3 +29,60 @@ def get_host_combined_cloud_config() -> dict:
except (IOError, OSError, AttributeError, json.decoder.JSONDecodeError) as ex:
log.debug("Failed to load combined-cloud-config: %s", ex)
return {}
def cloud_init_version() -> str:
# looks like 24.1~3gb729a4c4-0ubuntu1
cmd = ["dpkg-query", "-W", "-f=${Version}", "cloud-init"]
sp = run_command(cmd, check=False)
version = re.split("[-~]", sp.stdout)[0]
log.debug(f"cloud-init version: {version}")
return version
def supports_format_json() -> bool:
return cloud_init_version() >= "22.4"
def supports_recoverable_errors() -> bool:
return cloud_init_version() >= "23.4"
def read_json_extended_status(stream):
try:
status = json.loads(stream)
except json.JSONDecodeError:
return None
return status.get("extended_status", status.get("status", None))
def read_legacy_status(stream):
for line in stream.splitlines():
if line.startswith("status:"):
try:
return line.split()[1]
except IndexError:
pass
return None
async def cloud_init_status_wait() -> (bool, Optional[str]):
"""Wait for cloud-init completion, and return if timeout ocurred and best
available status information.
:return: tuple of (ok, status string or None)
"""
cmd = ["cloud-init", "status", "--wait"]
if format_json := supports_format_json():
cmd += ["--format=json"]
status_coro = arun_command(cmd)
try:
sp = await asyncio.wait_for(status_coro, 600)
except asyncio.TimeoutError:
return (False, "timeout")
if format_json:
status = read_json_extended_status(sp.stdout)
else:
status = read_legacy_status(sp.stdout)
return (True, status)

View File

@ -26,7 +26,7 @@ from aiohttp import web
from cloudinit.config.cc_set_passwords import rand_user_password
from systemd import journal
from subiquity.cloudinit import get_host_combined_cloud_config
from subiquity.cloudinit import cloud_init_status_wait, get_host_combined_cloud_config
from subiquity.common.api.server import bind, controller_for_request
from subiquity.common.apidef import API
from subiquity.common.errorreport import ErrorReporter, ErrorReportKind
@ -55,7 +55,7 @@ from subiquitycore.file_util import copy_file_if_exists, write_file
from subiquitycore.prober import Prober
from subiquitycore.snapd import AsyncSnapd, SnapdConnection, get_fake_connection
from subiquitycore.ssh import host_key_fingerprints, user_key_fingerprints
from subiquitycore.utils import arun_command, run_command
from subiquitycore.utils import run_command
NOPROBERARG = "NOPROBER"
@ -556,21 +556,13 @@ class SubiquityServer(Application):
if self.opts.dry_run:
self.cloud_init_ok = True
return
ci_start = time.time()
status_coro = arun_command(["cloud-init", "status", "--wait"])
try:
status_cp = await asyncio.wait_for(status_coro, 600)
except asyncio.TimeoutError:
status_txt = "<timeout>"
self.cloud_init_ok = False
else:
status_txt = status_cp.stdout
self.cloud_init_ok = True
self.cloud_init_ok, status = await cloud_init_status_wait()
log.debug("waited %ss for cloud-init", time.time() - ci_start)
if "status: done" in status_txt:
log.debug("cloud-init status: %r", status)
if self.cloud_init_ok:
self.load_cloud_config()
else:
log.debug("cloud-init status: %r, assumed disabled", status_txt)
def select_autoinstall(self):
# precedence

View File

@ -0,0 +1,134 @@
# Copyright 2024 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
from subprocess import CompletedProcess
from unittest import skipIf
from unittest.mock import Mock, patch
from subiquity.cloudinit import (
cloud_init_status_wait,
cloud_init_version,
read_json_extended_status,
read_legacy_status,
supports_format_json,
supports_recoverable_errors,
)
from subiquitycore.tests import SubiTestCase
from subiquitycore.tests.parameterized import parameterized
class TestCloudInitVersion(SubiTestCase):
@parameterized.expand(
(
("23.4-0ubuntu1~23.10.1", "23.4"),
("24.1~4gd9677655-0ubuntu1", "24.1"),
("23.3.1-1", "23.3.1"),
)
)
def test_split_version(self, pkgver, expected):
with patch("subiquity.cloudinit.run_command") as rc:
rc.return_value = Mock()
rc.return_value.stdout = pkgver
self.assertEqual(expected, cloud_init_version())
def test_cloud_init_not_present(self):
with patch("subiquity.cloudinit.run_command") as rc:
rc.return_value = Mock()
rc.return_value.stdout = ""
self.assertEqual("", cloud_init_version())
@parameterized.expand(
(
("22.3", False),
("22.4", True),
("23.1", True),
)
)
def test_can_status_json(self, civer, expected):
with patch("subiquity.cloudinit.cloud_init_version") as civ:
civ.return_value = civer
self.assertEqual(expected, supports_format_json())
@parameterized.expand(
(
("23.3", False),
("23.4", True),
("24.1", True),
)
)
def test_can_show_warnings(self, civer, expected):
with patch("subiquity.cloudinit.cloud_init_version") as civ:
civ.return_value = civer
self.assertEqual(expected, supports_recoverable_errors())
@skipIf(len(cloud_init_version()) < 1, "cloud-init not found")
def test_ver_compare(self):
# purposefully reads from the host system, as a canary warning that the
# version scheme has changed.
self.assertGreater(cloud_init_version(), "20.0")
def test_read_json_extended_status(self):
jsondata = '{"extended_status": "degraded done", "status": "done"}'
self.assertEqual("degraded done", read_json_extended_status(jsondata))
def test_read_json_extended_status_malformed(self):
self.assertIsNone(read_json_extended_status('{"extended_status"}'))
def test_read_json_extended_status_empty(self):
self.assertIsNone(read_json_extended_status(""))
def test_read_json_status(self):
jsondata = '{"status": "done"}'
self.assertEqual("done", read_json_extended_status(jsondata))
def test_read_legacy_status(self):
self.assertEqual("disabled", read_legacy_status("status: disabled\n"))
def test_read_legacy_status_malformed(self):
self.assertEqual(None, read_legacy_status("status disabled\n"))
def test_read_legacy_status_empty(self):
self.assertEqual(None, read_legacy_status("status:\n"))
def test_read_legacy_status_no_newline(self):
self.assertEqual("done", read_legacy_status("status: done\n"))
@patch("subiquity.cloudinit.arun_command", new=Mock())
@patch("subiquity.cloudinit.asyncio.wait_for")
async def test_cloud_init_status_wait_timeout(self, m_wait_for):
# arun_command mocked with regular Mock because the m_wait_for
# immediate timeout means nobody ever awaits on arun_command.
# Then this test fails with an obtuse looking RuntimeError.
m_wait_for.side_effect = asyncio.TimeoutError()
self.assertEqual((False, "timeout"), await cloud_init_status_wait())
@patch("subiquity.cloudinit.supports_format_json", new=Mock(return_value=True))
@patch("subiquity.cloudinit.arun_command", new=Mock())
@patch("subiquity.cloudinit.asyncio.wait_for")
async def test_cloud_init_status_wait_json(self, m_wait_for):
m_wait_for.return_value = CompletedProcess(
args=[], returncode=0, stdout='{"extended_status": "disabled"}'
)
self.assertEqual((True, "disabled"), await cloud_init_status_wait())
@patch("subiquity.cloudinit.supports_format_json", new=Mock(return_value=False))
@patch("subiquity.cloudinit.arun_command", new=Mock())
@patch("subiquity.cloudinit.asyncio.wait_for")
async def test_cloud_init_status_wait_legacy(self, m_wait_for):
m_wait_for.return_value = CompletedProcess(
args=[], returncode=0, stdout="status: done\n"
)
self.assertEqual((True, "done"), await cloud_init_status_wait())