SingleInstanceTask: add cancel_restart

cancel_restart is a mode for SingleInstanceTask that changes the
behavior when starting the task - if the task is already running, do not
cancel it to start another.
This commit is contained in:
Dan Bungert 2022-05-24 13:31:51 -06:00
parent 884f1c87ae
commit 77119a8e0b
2 changed files with 61 additions and 1 deletions

View File

@ -48,12 +48,21 @@ async def run_in_thread(func, *args):
raise asyncio.CancelledError
class TaskAlreadyRunningError(Exception):
"""Used to let callers know that a task hasn't been started due to
cancel_restart == False and the task already running."""
pass
class SingleInstanceTask:
def __init__(self, func, propagate_errors=True):
def __init__(self, func, propagate_errors=True, cancel_restart=True):
self.func = func
self.propagate_errors = propagate_errors
self.task = None
# if True, allow subsequent start calls to cancel a running task
# raises TaskAlreadyRunningError if we skip starting the task.
self.cancel_restart = cancel_restart
async def _start(self, old):
if old is not None:
@ -69,6 +78,10 @@ class SingleInstanceTask:
return self.task
def start_sync(self, *args, **kw):
if not self.cancel_restart:
if self.task is not None and not self.task.done():
raise TaskAlreadyRunningError(
'Skipping invocation of task - already running')
old = self.task
coro = self.func(*args, **kw)
if asyncio.iscoroutine(coro):

View File

@ -0,0 +1,47 @@
# Copyright 2022 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import unittest
from unittest.mock import AsyncMock
from parameterized import parameterized
from subiquitycore.async_helpers import (
SingleInstanceTask,
TaskAlreadyRunningError,
)
class TestSingleInstanceTask(unittest.IsolatedAsyncioTestCase):
@parameterized.expand([(True, 2), (False, 1)])
async def test_cancellable(self, cancel_restart, expected_call_count):
async def fn():
await asyncio.sleep(3)
raise Exception('timeout')
mock_fn = AsyncMock(side_effect=fn)
sit = SingleInstanceTask(mock_fn, cancel_restart=cancel_restart)
await sit.start()
await asyncio.sleep(.01)
try:
await sit.start()
except TaskAlreadyRunningError:
restarted = False
else:
restarted = True
sit.task.cancel()
self.assertEqual(expected_call_count, mock_fn.call_count)
self.assertEqual(cancel_restart, restarted)