Merge pull request #595 from mwhudson/remove-run_in_bg

remove run_in_bg

the asyncioification is complete (apart from bug fixes)
This commit is contained in:
Michael Hudson-Doyle 2019-12-15 09:44:09 +13:00 committed by GitHub
commit bd3ede4c21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 31 additions and 61 deletions

View File

@ -164,12 +164,6 @@ running things in the background and subiquity uses
[trio](https://trio.readthedocs.io/en/stable/) has nicer APIs but is
a bit too new for now.
The older approach which is still present in the codebase is the `run_in_bg`
function, which takes two functions: one that takes no arguments and is called
in a background thread and a callback that takes one argument, and is called
in the main/UI thread with a `concurrent.futures.Future` representing the
result of calling the first function.
A cast-iron rule: Only touch the UI from the main thread.
### Terminal things

View File

@ -157,10 +157,10 @@ class ErrorReport(metaclass=urwid.MetaSignals):
del self.pr['ProcMaps']
self.pr.write(self._file)
def added_info(fut):
log.debug("done adding info for report %s", self.base)
async def add_info():
log.debug("adding info for report %s", self.base)
try:
fut.result()
await run_in_thread(_bg_add_info)
except Exception:
self.state = ErrorReportState.ERROR_GENERATING
log.exception("adding info to problem report failed")
@ -172,7 +172,7 @@ class ErrorReport(metaclass=urwid.MetaSignals):
if wait:
_bg_add_info()
else:
self.controller.run_in_bg(_bg_add_info, added_info)
schedule_task(add_info())
async def load(self):
log.debug("loading report %s", self.base)
@ -238,9 +238,9 @@ class ErrorReport(metaclass=urwid.MetaSignals):
response.raise_for_status()
return response.text.split()[0]
def uploaded(fut):
async def upload():
try:
oops_id = fut.result()
oops_id = await run_in_thread(_bg_upload)
except requests.exceptions.RequestException:
log.exception("upload for %s failed", self.base)
else:
@ -252,7 +252,8 @@ class ErrorReport(metaclass=urwid.MetaSignals):
urwid.emit_signal(self, 'changed')
uploader.start()
self.controller.run_in_bg(_bg_upload, uploaded)
schedule_task(upload())
def _path_with_ext(self, ext):
return os.path.join(

View File

@ -34,7 +34,7 @@ class Thing:
class MiniApplication:
ui = signal = loop = run_in_bg = None
ui = signal = loop = None
answers = {}
opts = Thing()
opts.dry_run = True

View File

@ -21,6 +21,10 @@ import traceback
import apport.hookutils
from subiquitycore.async_helpers import (
run_in_thread,
schedule_task,
)
from subiquitycore.core import Application
from subiquity.controllers.error import (
@ -101,7 +105,7 @@ class Subiquity(Application):
connection = SnapdConnection(self.root, self.snapd_socket_path)
self.snapd = AsyncSnapd(connection)
self.signal.connect_signals([
('network-proxy-set', self._proxy_set),
('network-proxy-set', lambda: schedule_task(self._proxy_set())),
('network-change', self._network_change),
])
self._apport_data = []
@ -130,13 +134,10 @@ class Subiquity(Application):
def _network_change(self):
self.signal.emit_signal('snapd-network-change')
def _proxy_set(self):
self.run_in_bg(
lambda: self.snapd.connection.configure_proxy(
self.base_model.proxy),
lambda fut: (
fut.result(), self.signal.emit_signal('snapd-network-change')),
)
async def _proxy_set(self):
await run_in_thread(
self.snapd.connection.configure_proxy, self.base_model.proxy)
self.signal.emit_signal('snapd-network-change')
def unhandled_input(self, key):
if key == 'f1':
@ -163,7 +164,7 @@ class Subiquity(Application):
print(DEBUG_SHELL_INTRO)
self.run_command_in_foreground(
"bash", before_hook=_before, cwd='/')
["bash"], before_hook=_before, cwd='/')
def note_file_for_apport(self, key, path):
self._apport_files.append((key, path))

View File

@ -46,7 +46,6 @@ class BaseController(ABC):
# subiquity/controllers/installprogress.py
self.debug_flags = os.environ.get('SUBIQUITY_DEBUG', '').split(',')
self.loop = app.loop
self.run_in_bg = app.run_in_bg
self.app = app
self.answers = app.answers.get(self.name, {})

View File

@ -13,19 +13,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from concurrent import futures
import fcntl
import json
import logging
import os
import struct
import subprocess
import sys
import tty
import urwid
import yaml
from subiquitycore.async_helpers import schedule_task
from subiquitycore.controller import (
RepeatedController,
Skip,
@ -33,6 +32,7 @@ from subiquitycore.controller import (
from subiquitycore.signals import Signal
from subiquitycore.prober import Prober
from subiquitycore.ui.frame import SubiquityCoreUI
from subiquitycore.utils import arun_command
log = logging.getLogger('subiquitycore.core')
@ -372,29 +372,8 @@ class Application:
self.signal = Signal()
self.prober = prober
self.loop = None
self.pool = futures.ThreadPoolExecutor(10)
self.controllers = ControllerSet(self, self.controllers)
def run_in_bg(self, func, callback):
"""Run func() in a thread and call callback on UI thread.
callback will be passed a concurrent.futures.Future containing
the result of func(). The result of callback is discarded. An
exception will crash the process so be careful!
"""
fut = self.pool.submit(func)
def in_main_thread(ignored):
self.loop.remove_watch_pipe(pipe)
os.close(pipe)
callback(fut)
pipe = self.loop.watch_pipe(in_main_thread)
def in_random_thread(ignored):
os.write(pipe, b'x')
fut.add_done_callback(in_random_thread)
def run_command_in_foreground(self, cmd, before_hook=None, after_hook=None,
**kw):
screen = self.loop.screen
@ -415,10 +394,9 @@ class Application:
# there the symptom is that we are running in the foreground but not
# listening to stdin! The fix is the same.
def run():
subprocess.run(cmd, **kw)
def restore(fut):
async def _run():
await arun_command(
cmd, stdin=None, stdout=None, stderr=None)
screen.start()
urwid.emit_signal(
screen, urwid.display_common.INPUT_DESCRIPTORS_CHANGED)
@ -431,7 +409,7 @@ class Application:
screen, urwid.display_common.INPUT_DESCRIPTORS_CHANGED)
if before_hook is not None:
before_hook()
self.run_in_bg(run, restore)
schedule_task(_run())
def _connect_base_signals(self):
"""Connect signals used in the core controller."""
@ -652,9 +630,3 @@ class Application:
except Exception:
log.exception("Exception in controller.run():")
raise
finally:
# concurrent.futures.ThreadPoolExecutor tries to join all
# threads before exiting. We don't want that and this
# ghastly hack prevents it.
from concurrent.futures import thread
thread._threads_queues = {}

View File

@ -66,7 +66,8 @@ async def arun_command(cmd, *, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding='utf-8', input=None, errors='replace',
env=None, check=False, **kw):
if input is None:
kw['stdin'] = subprocess.DEVNULL
if 'stdin' not in kw:
kw['stdin'] = subprocess.DEVNULL
else:
kw['stdin'] = subprocess.PIPE
input = input.encode(encoding)
@ -75,8 +76,10 @@ async def arun_command(cmd, *, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
*cmd, stdout=stdout, stderr=stderr, env=_clean_env(env), **kw)
stdout, stderr = await proc.communicate(input=input)
if encoding:
stdout = stdout.decode(encoding)
stderr = stderr.decode(encoding)
if stdout is not None:
stdout = stdout.decode(encoding)
if stderr is not None:
stderr = stderr.decode(encoding)
log.debug("arun_command %s exited with code %s", cmd, proc.returncode)
if check and proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, cmd)