From 77119a8e0bf39bee69a265bde532fbb0fd215c7f Mon Sep 17 00:00:00 2001 From: Dan Bungert Date: Tue, 24 May 2022 13:31:51 -0600 Subject: [PATCH] SingleInstanceTask: add cancel_restart cancel_restart is a mode for SingleInstanceTask that changes the behavior when starting the task - if the task is already running, do not cancel it to start another. --- subiquitycore/async_helpers.py | 15 +++++++- subiquitycore/tests/test_async_helpers.py | 47 +++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 subiquitycore/tests/test_async_helpers.py diff --git a/subiquitycore/async_helpers.py b/subiquitycore/async_helpers.py index 73ead494..d20cb6c1 100644 --- a/subiquitycore/async_helpers.py +++ b/subiquitycore/async_helpers.py @@ -48,12 +48,21 @@ async def run_in_thread(func, *args): raise asyncio.CancelledError +class TaskAlreadyRunningError(Exception): + """Used to let callers know that a task hasn't been started due to + cancel_restart == False and the task already running.""" + pass + + class SingleInstanceTask: - def __init__(self, func, propagate_errors=True): + def __init__(self, func, propagate_errors=True, cancel_restart=True): self.func = func self.propagate_errors = propagate_errors self.task = None + # if True, allow subsequent start calls to cancel a running task + # raises TaskAlreadyRunningError if we skip starting the task. + self.cancel_restart = cancel_restart async def _start(self, old): if old is not None: @@ -69,6 +78,10 @@ class SingleInstanceTask: return self.task def start_sync(self, *args, **kw): + if not self.cancel_restart: + if self.task is not None and not self.task.done(): + raise TaskAlreadyRunningError( + 'Skipping invocation of task - already running') old = self.task coro = self.func(*args, **kw) if asyncio.iscoroutine(coro): diff --git a/subiquitycore/tests/test_async_helpers.py b/subiquitycore/tests/test_async_helpers.py new file mode 100644 index 00000000..0dca6b24 --- /dev/null +++ b/subiquitycore/tests/test_async_helpers.py @@ -0,0 +1,47 @@ +# Copyright 2022 Canonical, Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import asyncio +import unittest +from unittest.mock import AsyncMock + +from parameterized import parameterized + +from subiquitycore.async_helpers import ( + SingleInstanceTask, + TaskAlreadyRunningError, +) + + +class TestSingleInstanceTask(unittest.IsolatedAsyncioTestCase): + @parameterized.expand([(True, 2), (False, 1)]) + async def test_cancellable(self, cancel_restart, expected_call_count): + async def fn(): + await asyncio.sleep(3) + raise Exception('timeout') + + mock_fn = AsyncMock(side_effect=fn) + sit = SingleInstanceTask(mock_fn, cancel_restart=cancel_restart) + await sit.start() + await asyncio.sleep(.01) + try: + await sit.start() + except TaskAlreadyRunningError: + restarted = False + else: + restarted = True + sit.task.cancel() + self.assertEqual(expected_call_count, mock_fn.call_count) + self.assertEqual(cancel_restart, restarted)