Merge pull request #1299 from dbungert/fs-probe-no-udev-loops

filesystem: fix udev infinite loop
This commit is contained in:
Dan Bungert 2022-05-27 11:07:49 -06:00 committed by GitHub
commit d6cbbda685
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 12 deletions

View File

@ -29,6 +29,7 @@ from subiquitycore.async_helpers import (
run_in_thread, run_in_thread,
schedule_task, schedule_task,
SingleInstanceTask, SingleInstanceTask,
TaskAlreadyRunningError,
) )
from subiquitycore.context import with_context from subiquitycore.context import with_context
from subiquitycore.utils import ( from subiquitycore.utils import (
@ -94,7 +95,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
self._probe_once_task = SingleInstanceTask( self._probe_once_task = SingleInstanceTask(
self._probe_once, propagate_errors=False) self._probe_once, propagate_errors=False)
self._probe_task = SingleInstanceTask( self._probe_task = SingleInstanceTask(
self._probe, propagate_errors=False) self._probe, propagate_errors=False, cancel_restart=False)
self.supports_resilient_boot = False self.supports_resilient_boot = False
def load_autoinstall_data(self, data): def load_autoinstall_data(self, data):
@ -522,7 +523,12 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
while select.select([self._monitor.fileno()], [], [], 0)[0]: while select.select([self._monitor.fileno()], [], [], 0)[0]:
action, dev = self._monitor.receive_device() action, dev = self._monitor.receive_device()
log.debug("_udev_event %s %s", action, dev) log.debug("_udev_event %s %s", action, dev)
self._probe_task.start_sync() try:
self._probe_task.start_sync()
except TaskAlreadyRunningError:
log.debug('Skipping run of Probert - probe run already active')
else:
log.debug('Triggered Probert run on udev event')
def make_autoinstall(self): def make_autoinstall(self):
rendered = self.model.render() rendered = self.model.render()

View File

@ -48,12 +48,21 @@ async def run_in_thread(func, *args):
raise asyncio.CancelledError raise asyncio.CancelledError
class TaskAlreadyRunningError(Exception):
"""Used to let callers know that a task hasn't been started due to
cancel_restart == False and the task already running."""
pass
class SingleInstanceTask: class SingleInstanceTask:
def __init__(self, func, propagate_errors=True): def __init__(self, func, propagate_errors=True, cancel_restart=True):
self.func = func self.func = func
self.propagate_errors = propagate_errors self.propagate_errors = propagate_errors
self.task = None self.task = None
# if True, allow subsequent start calls to cancel a running task
# raises TaskAlreadyRunningError if we skip starting the task.
self.cancel_restart = cancel_restart
async def _start(self, old): async def _start(self, old):
if old is not None: if old is not None:
@ -69,6 +78,10 @@ class SingleInstanceTask:
return self.task return self.task
def start_sync(self, *args, **kw): def start_sync(self, *args, **kw):
if not self.cancel_restart:
if self.task is not None and not self.task.done():
raise TaskAlreadyRunningError(
'Skipping invocation of task - already running')
old = self.task old = self.task
coro = self.func(*args, **kw) coro = self.func(*args, **kw)
if asyncio.iscoroutine(coro): if asyncio.iscoroutine(coro):

View File

@ -0,0 +1,47 @@
# Copyright 2022 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import unittest
from unittest.mock import AsyncMock
from parameterized import parameterized
from subiquitycore.async_helpers import (
SingleInstanceTask,
TaskAlreadyRunningError,
)
class TestSingleInstanceTask(unittest.IsolatedAsyncioTestCase):
@parameterized.expand([(True, 2), (False, 1)])
async def test_cancellable(self, cancel_restart, expected_call_count):
async def fn():
await asyncio.sleep(3)
raise Exception('timeout')
mock_fn = AsyncMock(side_effect=fn)
sit = SingleInstanceTask(mock_fn, cancel_restart=cancel_restart)
await sit.start()
await asyncio.sleep(.01)
try:
await sit.start()
except TaskAlreadyRunningError:
restarted = False
else:
restarted = True
sit.task.cancel()
self.assertEqual(expected_call_count, mock_fn.call_count)
self.assertEqual(cancel_restart, restarted)

View File

@ -13,38 +13,37 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from unittest.mock import MagicMock from unittest.mock import MagicMock
from subiquitycore.tests import SubiTestCase
from subiquitycore.pubsub import MessageHub from subiquitycore.pubsub import MessageHub
from subiquitycore.tests.util import run_coro
class TestMessageHub(SubiTestCase): class TestMessageHub(unittest.IsolatedAsyncioTestCase):
def setUp(self): def setUp(self):
self.hub = MessageHub() self.hub = MessageHub()
def test_multicall(self): async def test_multicall(self):
cb = MagicMock() cb = MagicMock()
expected_calls = 3 expected_calls = 3
channel_id = 1234 channel_id = 1234
for _ in range(expected_calls): for _ in range(expected_calls):
self.hub.subscribe(channel_id, cb) self.hub.subscribe(channel_id, cb)
run_coro(self.hub.abroadcast(channel_id)) await self.hub.abroadcast(channel_id)
self.assertEqual(expected_calls, cb.call_count) self.assertEqual(expected_calls, cb.call_count)
def test_multisubscriber(self): async def test_multisubscriber(self):
cbs = [MagicMock() for _ in range(4)] cbs = [MagicMock() for _ in range(4)]
channel_id = 2345 channel_id = 2345
for cb in cbs: for cb in cbs:
self.hub.subscribe(channel_id, cb) self.hub.subscribe(channel_id, cb)
run_coro(self.hub.abroadcast(channel_id)) await self.hub.abroadcast(channel_id)
for cb in cbs: for cb in cbs:
cb.assert_called_once_with() cb.assert_called_once_with()
def test_message_arg(self): async def test_message_arg(self):
cb = MagicMock() cb = MagicMock()
channel_id = 'test-message-arg' channel_id = 'test-message-arg'
self.hub.subscribe(channel_id, cb) self.hub.subscribe(channel_id, cb)
run_coro(self.hub.abroadcast(channel_id, '0', 1, 'two', [3], four=4)) await self.hub.abroadcast(channel_id, '0', 1, 'two', [3], four=4)
cb.assert_called_once_with('0', 1, 'two', [3], four=4) cb.assert_called_once_with('0', 1, 'two', [3], four=4)