simpler api for doing stuff in a background thread

This commit is contained in:
Michael Hudson-Doyle 2017-01-18 12:01:59 +13:00
parent cfecadcb85
commit a4d6682478
5 changed files with 78 additions and 67 deletions

View File

@ -120,8 +120,7 @@ class InstallProgressController(BaseController):
curtin_cmd = curtin_install_cmd(configs)
log.debug('Curtin install cmd: {}'.format(curtin_cmd))
fut = utils.run_command_async(self.pool, curtin_cmd)
fut.add_done_callback(lambda fut:self.call_from_thread(self.curtin_install_completed, fut))
self.run_in_bg(lambda: utils.run_command(curtin_cmd), self.curtin_install_completed)
def curtin_install_completed(self, fut):
result = fut.result()
@ -163,8 +162,7 @@ class InstallProgressController(BaseController):
curtin_cmd = curtin_install_cmd(configs)
log.debug('Curtin postinstall cmd: {}'.format(curtin_cmd))
fut = utils.run_command_async(self.pool, curtin_cmd)
fut.add_done_callback(lambda fut:self.call_from_thread(self.curtin_postinstall_completed, fut))
self.run_in_bg(lambda: utils.run_command(curtin_cmd), self.curtin_postinstall_completed)
def curtin_postinstall_completed(self, fut):
result = fut.result()

View File

@ -16,6 +16,7 @@
from abc import ABC, abstractmethod
import logging
import os
log = logging.getLogger("subiquitycore.controller")
@ -41,7 +42,6 @@ class BaseController(ABC):
self.prober = common['prober']
self.controllers = common['controllers']
self.pool = common['pool']
self._callfromthread = common['_callfromthread']
self.view_stack = []
def register_signals(self):
@ -56,8 +56,23 @@ class BaseController(ABC):
meth, args, kw = self.view_stack.pop()
meth(*args, **kw)
def call_from_thread(self, func, *args, **kw):
return self._callfromthread.call_from_thread(func, *args, **kw)
def run_in_bg(self, func, callback):
"""Run func() in a thread and call callback on UI thread.
callback will be passed a concurrent.futures.Future containing
the result of func(). The result of callback is discarded. Any
exception will be logged.
"""
fut = self.pool.submit(func)
def in_main_thread(ignored):
try:
callback(fut)
except:
log.exception("callback %s after calling %s failed", callback, func)
pipe = self.loop.watch_pipe(in_main_thread)
def in_random_thread(ignored):
os.write(pipe, b'x')
fut.add_done_callback(in_random_thread)
@abstractmethod
def cancel(self):

View File

@ -46,10 +46,36 @@ log = logging.getLogger("subiquitycore.controller.network")
class BackgroundTask:
"""Something that runs without blocking the UI and can be canceled."""
def start(self):
"""Start the task.
This is called on the UI thread, so must not block.
"""
raise NotImplementedError(self.start)
def run(self):
"""Run the task.
This is called on an arbitrary thread so don't do UI stuff!
"""
raise NotImplementedError(self.run)
def end(self, observer, fut):
"""Call task_succeeded or task_failed on observer.
This is called on the UI thread.
fut is a concurrent.futures.Future holding the result of running run.
"""
raise NotImplementedError(self.end)
def cancel(self):
"""Abort the task.
Any calls to task_succeeded or task_failed on the observer will
be ignored after this point so it doesn't really matter what run
returns after this is called.
"""
raise NotImplementedError(self.cancel)
@ -62,10 +88,15 @@ class BackgroundProcess(BackgroundTask):
def __repr__(self):
return 'BackgroundProcess(%r)'%(self.cmd,)
def run(self, observer):
def start(self):
self.proc = run_command_start(self.cmd)
def run(self):
stdout, stderr = self.proc.communicate()
result = run_command_summarize(self.proc, stdout, stderr)
return run_command_summarize(self.proc, stdout, stderr)
def end(self, observer, fut):
result = fut.result()
if result['status'] == 0:
observer.task_succeeded()
else:
@ -89,13 +120,20 @@ class PythonSleep(BackgroundTask):
def __repr__(self):
return 'PythonSleep(%r)'%(self.duration,)
def run(self, observer):
def start(self):
pass
def run(self):
r, _, _ = select.select([self.r], [], [], self.duration)
if not r:
observer.task_succeeded()
return True
os.close(self.r)
os.close(self.w)
def end(self, observer, fut):
if fut.result():
observer.task_succeeded()
def cancel(self):
os.write(self.w, b'x')
@ -104,8 +142,6 @@ class WaitForDefaultRouteTask(BackgroundTask):
def __init__(self, timeout, udev_observer):
self.timeout = timeout
self.fail_r, self.fail_w = os.pipe()
self.success_r, self.success_w = os.pipe()
self.udev_observer = udev_observer
def __repr__(self):
@ -114,28 +150,32 @@ class WaitForDefaultRouteTask(BackgroundTask):
def got_route(self):
os.write(self.success_w, b'x')
def run(self, observer):
def start(self):
self.fail_r, self.fail_w = os.pipe()
self.success_r, self.success_w = os.pipe()
self.udev_observer.add_default_route_waiter(self.got_route)
def run(self):
try:
r, _, _ = select.select([self.fail_r, self.success_r], [], [], self.timeout)
if self.success_r in r:
observer.task_succeeded()
else:
observer.task_failed()
return self.success_r in r
finally:
os.close(self.fail_r)
os.close(self.fail_w)
os.close(self.success_r)
os.close(self.success_w)
def end(self, observer, fut):
if fut.result():
observer.task_succeeded()
def cancel(self):
os.write(self.fail_w, b'x')
class TaskSequence:
def __init__(self, call_from_thread, pool, tasks, watcher):
self.call_from_thread = call_from_thread
self.pool = pool
def __init__(self, run_in_bg, tasks, watcher):
self.run_in_bg = run_in_bg
self.tasks = tasks
self.watcher = watcher
self.canceled = False
@ -155,17 +195,10 @@ class TaskSequence:
self.stage, self.curtask = self.tasks[0]
self.tasks = self.tasks[1:]
log.debug('running %s for stage %s', self.curtask, self.stage)
def cb(fut):
# We do this just so that any exceptions raised don't get lost.
# Vomiting a traceback all over the console is nasty, but not as
# nasty as silently doing nothing.
fut.result()
self.pool.submit(self.curtask.run, self).add_done_callback(cb)
self.curtask.start()
self.run_in_bg(self.curtask.run, lambda fut:self.curtask.end(self, fut))
def task_succeeded(self):
self.call_from_thread(self._task_succeeded)
def _task_succeeded(self):
if self.canceled:
return
self.watcher.task_complete(self.stage)
@ -177,7 +210,7 @@ class TaskSequence:
def task_failed(self, info=None):
if self.canceled:
return
self.call_from_thread(self.watcher.task_error, self.stage, info)
self.watcher.task_error(self.stage, info)
netplan_config_file_name = '00-snapd-config.yaml'
@ -370,7 +403,7 @@ class NetworkController(BaseController):
self.acw = ApplyingConfigWidget(len(tasks), cancel)
self.ui.frame.body.show_overlay(self.acw)
self.cs = TaskSequence(self.call_from_thread, self.pool, tasks, self)
self.cs = TaskSequence(self.run_in_bg, tasks, self)
self.cs.run()
def task_complete(self, stage):

View File

@ -15,9 +15,6 @@
from concurrent import futures
import logging
import os
import queue
import sys
import urwid
@ -32,32 +29,6 @@ class ApplicationError(Exception):
""" Basecontroller exception """
pass
class _CallFromThread(object):
def __init__(self, loop):
self.incoming = queue.Queue()
self.pipe = loop.watch_pipe(self._thread_callback)
def call_from_thread(self, func, *args, **kw):
log.debug('call_from_thread %s %s', func, args)
outgoing = queue.Queue()
self.incoming.put((outgoing, func, args, kw))
os.write(self.pipe, b'x')
result = outgoing.get()
if len(result) == 1:
return result[0]
else:
typ, val, tb = result
raise val.with_traceback(tb)
def _thread_callback(self, ignored):
outgoing, func, args, kw = self.incoming.get()
try:
result = func(*args, **kw)
except BaseException:
outgoing.put(sys.exc_info())
else:
outgoing.put((result,))
class Application:
@ -168,7 +139,6 @@ class Application:
try:
self.common['loop'].set_alarm_in(0.05, self.next_screen)
self.common['_callfromthread'] = _CallFromThread(self.common['loop'])
controllers_mod = __import__('%s.controllers' % self.project, None, None, [''])
for k in self.common['controllers']:
log.debug("Importing controller: {}".format(k))

View File

@ -84,11 +84,6 @@ def environment_check(check):
return env_ok
def run_command_async(pool, cmd, timeout=None, shell=False):
log.debug('calling Async command: {}'.format(cmd))
return pool.submit(run_command, cmd, timeout, shell)
def run_command_start(command, timeout=None, shell=False):
log.debug('run_command called: {}'.format(command))
cmd_env = os.environ.copy()