Merge pull request #1446 from ogayot/pr/event-loop-rework

Stop calling deprecated asyncio.get_event_loop() function
This commit is contained in:
Olivier Gayot 2022-10-28 17:50:24 +02:00 committed by GitHub
commit fa351ed7bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 126 additions and 87 deletions

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse import argparse
import asyncio
import sys import sys
import os import os
import logging import logging
@ -78,16 +79,19 @@ def main():
logger.info("Starting console-conf v{}".format(VERSION)) logger.info("Starting console-conf v{}".format(VERSION))
logger.info("Arguments passed: {}".format(sys.argv)) logger.info("Arguments passed: {}".format(sys.argv))
if opts.chooser_systems: async def run_with_loop():
# when running as a chooser, the stdin/stdout streams are set up by the if opts.chooser_systems:
# process that runs us, attempt to restore the tty in/out by looking at # when running as a chooser, the stdin/stdout streams are set up by
# stderr # the process that runs us, attempt to restore the tty in/out by
chooser_input, chooser_output = restore_std_streams_from(sys.stderr) # looking at stderr
interface = RecoveryChooser(opts, chooser_input, chooser_output) chooser_input, chooser_output = restore_std_streams_from(
else: sys.stderr)
interface = ConsoleConf(opts) interface = RecoveryChooser(opts, chooser_input, chooser_output)
else:
interface = ConsoleConf(opts)
await interface.run()
interface.run() asyncio.run(run_with_loop())
def restore_std_streams_from(from_file): def restore_std_streams_from(from_file):

View File

@ -140,23 +140,11 @@ class SubiquityClient(TuiApplication):
except OSError: except OSError:
self.our_tty = "not a tty" self.our_tty = "not a tty"
self.conn = aiohttp.UnixConnector(self.opts.socket)
self.in_make_view_cvar = contextvars.ContextVar( self.in_make_view_cvar = contextvars.ContextVar(
'in_make_view', default=False) 'in_make_view', default=False)
def header_func():
if self.in_make_view_cvar.get():
return {'x-make-view-request': 'yes'}
else:
return None
self.client = make_client_for_conn(API, self.conn, self.resp_hook,
header_func=header_func)
self.error_reporter = ErrorReporter( self.error_reporter = ErrorReporter(
self.context.child("ErrorReporter"), self.opts.dry_run, self.root, self.context.child("ErrorReporter"), self.opts.dry_run, self.root)
self.client)
self.note_data_for_apport("SnapUpdated", str(self.updated)) self.note_data_for_apport("SnapUpdated", str(self.updated))
self.note_data_for_apport("UsingAnswers", str(bool(self.answers))) self.note_data_for_apport("UsingAnswers", str(bool(self.answers)))
@ -325,6 +313,18 @@ class SubiquityClient(TuiApplication):
return status return status
async def start(self): async def start(self):
conn = aiohttp.UnixConnector(self.opts.socket)
def header_func():
if self.in_make_view_cvar.get():
return {'x-make-view-request': 'yes'}
else:
return None
self.client = make_client_for_conn(API, conn, self.resp_hook,
header_func=header_func)
self.error_reporter.client = self.client
status = await self.connect() status = await self.connect()
self.interactive = status.interactive self.interactive = status.interactive
if self.interactive: if self.interactive:
@ -402,9 +402,9 @@ class SubiquityClient(TuiApplication):
def extra_urwid_loop_args(self): def extra_urwid_loop_args(self):
return dict(input_filter=self.input_filter.filter) return dict(input_filter=self.input_filter.filter)
def run(self): async def run(self):
try: try:
super().run() await super().run()
except Exception: except Exception:
print("generating crash report") print("generating crash report")
try: try:
@ -424,10 +424,26 @@ class SubiquityClient(TuiApplication):
signal.pause() signal.pause()
finally: finally:
if self.opts.server_pid: if self.opts.server_pid:
print('killing server {}'.format(self.opts.server_pid)) # If we signal the server with a SIGINT, it will write a
# KeyboardInterrupt exception trace to its standard error
# stream. Integration tests do not appreciate that. Let's leave
# the server up to a second to exit, and then we signal it.
pid = int(self.opts.server_pid) pid = int(self.opts.server_pid)
os.kill(pid, 2)
os.waitpid(pid, 0) print(f'giving the server [{pid}] up to a second to exit')
for unused in range(10):
try:
if os.waitpid(pid, os.WNOHANG) != (0, 0):
break
except ChildProcessError:
# If we attached to an existing server process,
# waitpid will fail.
pass
await asyncio.sleep(.1)
else:
print('killing server {}'.format(pid))
os.kill(pid, 2)
os.waitpid(pid, 0)
async def confirm_install(self): async def confirm_install(self):
await self.client.meta.confirm.POST(self.our_tty) await self.client.meta.confirm.POST(self.our_tty)

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import copy import copy
import json import json
import sys import sys
@ -59,9 +60,12 @@ def make_app():
def main(): def main():
schema = make_schema(make_app()) async def run_with_loop():
jsonschema.validate({"version": 1}, schema) schema = make_schema(make_app())
print(json.dumps(schema, indent=4)) jsonschema.validate({"version": 1}, schema)
print(json.dumps(schema, indent=4))
asyncio.run(run_with_loop())
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -14,6 +14,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse import argparse
import asyncio
import logging import logging
import os import os
import pathlib import pathlib
@ -154,14 +155,15 @@ def main():
logger.debug("Kernel commandline: {}".format(opts.kernel_cmdline)) logger.debug("Kernel commandline: {}".format(opts.kernel_cmdline))
logger.debug("Storage version: {}".format(opts.storage_version)) logger.debug("Storage version: {}".format(opts.storage_version))
server = SubiquityServer(opts, block_log_dir) async def run_with_loop():
server = SubiquityServer(opts, block_log_dir)
server.note_file_for_apport(
"InstallerServerLog", logfiles['debug'])
server.note_file_for_apport(
"InstallerServerLogInfo", logfiles['info'])
await server.run()
server.note_file_for_apport( asyncio.run(run_with_loop())
"InstallerServerLog", logfiles['debug'])
server.note_file_for_apport(
"InstallerServerLogInfo", logfiles['info'])
server.run()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse import argparse
import asyncio
import logging import logging
import os import os
import fcntl import fcntl
@ -140,14 +141,15 @@ def main():
opts.answers.close() opts.answers.close()
opts.answers = None opts.answers = None
subiquity_interface = SubiquityClient(opts) async def run_with_loop():
subiquity_interface = SubiquityClient(opts)
subiquity_interface.note_file_for_apport(
"InstallerLog", logfiles['debug'])
subiquity_interface.note_file_for_apport(
"InstallerLogInfo", logfiles['info'])
await subiquity_interface.run()
subiquity_interface.note_file_for_apport( asyncio.run(run_with_loop())
"InstallerLog", logfiles['debug'])
subiquity_interface.note_file_for_apport(
"InstallerLogInfo", logfiles['info'])
subiquity_interface.run()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -62,7 +62,7 @@ class Upload(metaclass=urwid.MetaSignals):
def start(self): def start(self):
self.pipe_r, self.pipe_w = os.pipe() self.pipe_r, self.pipe_w = os.pipe()
fcntl.fcntl(self.pipe_r, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(self.pipe_r, fcntl.F_SETFL, os.O_NONBLOCK)
asyncio.get_event_loop().add_reader(self.pipe_r, self._progress) asyncio.get_running_loop().add_reader(self.pipe_r, self._progress)
def _progress(self): def _progress(self):
os.read(self.pipe_r, 4096) os.read(self.pipe_r, 4096)
@ -75,7 +75,7 @@ class Upload(metaclass=urwid.MetaSignals):
os.write(self.pipe_w, b'x') os.write(self.pipe_w, b'x')
def stop(self): def stop(self):
asyncio.get_event_loop().remove_reader(self.pipe_r) asyncio.get_running_loop().remove_reader(self.pipe_r)
os.close(self.pipe_w) os.close(self.pipe_w)
os.close(self.pipe_r) os.close(self.pipe_r)
@ -174,7 +174,7 @@ class ErrorReport(metaclass=urwid.MetaSignals):
_bg_add_info() _bg_add_info()
context.description = "written to " + self.path context.description = "written to " + self.path
else: else:
self._info_task = asyncio.get_event_loop().create_task(add_info()) self._info_task = asyncio.create_task(add_info())
async def load(self): async def load(self):
with self._context.child("load"): with self._context.child("load"):
@ -432,7 +432,7 @@ class ErrorReporter(object):
if report is not None: if report is not None:
return report return report
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
await self.client.errors.wait.GET(error_ref) await self.client.errors.wait.GET(error_ref)

View File

@ -268,7 +268,7 @@ class SubiquityModel:
async def wait_confirmation(self): async def wait_confirmation(self):
if self._confirmation_task is None: if self._confirmation_task is None:
self._confirmation_task = asyncio.get_event_loop().create_task( self._confirmation_task = asyncio.create_task(
self._confirmation.wait()) self._confirmation.wait())
try: try:
await self._confirmation_task await self._confirmation_task

View File

@ -669,11 +669,11 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
await self._probe_task.start() await self._probe_task.start()
def start_listening_udev(self): def start_listening_udev(self):
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
loop.add_reader(self._monitor.fileno(), self._udev_event) loop.add_reader(self._monitor.fileno(), self._udev_event)
def stop_listening_udev(self): def stop_listening_udev(self):
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
loop.remove_reader(self._monitor.fileno()) loop.remove_reader(self._monitor.fileno())
def _udev_event(self): def _udev_event(self):
@ -681,7 +681,7 @@ class FilesystemController(SubiquityController, FilesystemManipulator):
if cp.returncode != 0: if cp.returncode != 0:
log.debug("waiting 0.1 to let udev event queue settle") log.debug("waiting 0.1 to let udev event queue settle")
self.stop_listening_udev() self.stop_listening_udev()
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
loop.call_later(0.1, self.start_listening_udev) loop.call_later(0.1, self.start_listening_udev)
return return
# Drain the udev events in the queue -- if we stopped listening to # Drain the udev events in the queue -- if we stopped listening to

View File

@ -104,7 +104,7 @@ class _CurtinCommand:
async def start(self, context, **opts): async def start(self, context, **opts):
self._fd = journald_listen( self._fd = journald_listen(
asyncio.get_event_loop(), [self._event_syslog_id], self._event) asyncio.get_running_loop(), [self._event_syslog_id], self._event)
# Yield to the event loop before starting curtin to avoid missing the # Yield to the event loop before starting curtin to avoid missing the
# first couple of events. # first couple of events.
await asyncio.sleep(0) await asyncio.sleep(0)
@ -120,7 +120,7 @@ class _CurtinCommand:
waited += 0.1 waited += 0.1
log.debug("waited %s seconds for events to drain", waited) log.debug("waited %s seconds for events to drain", waited)
self._event_contexts.pop('', None) self._event_contexts.pop('', None)
asyncio.get_event_loop().remove_reader(self._fd) asyncio.get_running_loop().remove_reader(self._fd)
return result return result
async def run(self, context): async def run(self, context):

View File

@ -28,7 +28,7 @@ from subiquity.server.server import (
class TestAutoinstallLoad(SubiTestCase): class TestAutoinstallLoad(SubiTestCase):
def setUp(self): async def asyncSetUp(self):
self.tempdir = self.tmp_dir() self.tempdir = self.tmp_dir()
os.makedirs(self.tempdir + '/cdrom', exist_ok=True) os.makedirs(self.tempdir + '/cdrom', exist_ok=True)
opts = Mock() opts = Mock()

View File

@ -120,7 +120,10 @@ class IdentityViewTests(unittest.IsolatedAsyncioTestCase):
crypted_password=CRYPTED) crypted_password=CRYPTED)
view.controller.done.assert_called_once_with(expected) view.controller.done.assert_called_once_with(expected)
def test_can_tab_to_done_when_valid(self): async def test_can_tab_to_done_when_valid(self):
# NOTE: this test needs a running event loop because the username field
# triggers the creation of an asyncio task upon losing focus.
#
# Urwid doesn't distinguish very well between widgets that are # Urwid doesn't distinguish very well between widgets that are
# not currently selectable and widgets that can never be # not currently selectable and widgets that can never be
# selectable. The "button pile" of the identity view is # selectable. The "button pile" of the identity view is

View File

@ -29,7 +29,7 @@ def _done(fut):
def schedule_task(coro, propagate_errors=True): def schedule_task(coro, propagate_errors=True):
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
if asyncio.iscoroutine(coro): if asyncio.iscoroutine(coro):
task = asyncio.Task(coro) task = asyncio.Task(coro)
else: else:
@ -41,7 +41,7 @@ def schedule_task(coro, propagate_errors=True):
async def run_in_thread(func, *args): async def run_in_thread(func, *args):
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
try: try:
return await loop.run_in_executor(None, func, *args) return await loop.run_in_executor(None, func, *args)
except concurrent.futures.CancelledError: except concurrent.futures.CancelledError:

View File

@ -162,7 +162,7 @@ class BaseNetworkController(BaseController):
def stop_watching(self): def stop_watching(self):
if not self._watching: if not self._watching:
return return
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
for fd in self._observer_fds: for fd in self._observer_fds:
loop.remove_reader(fd) loop.remove_reader(fd)
self._watching = False self._watching = False
@ -170,7 +170,7 @@ class BaseNetworkController(BaseController):
def start_watching(self): def start_watching(self):
if self._watching: if self._watching:
return return
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
for fd in self._observer_fds: for fd in self._observer_fds:
loop.add_reader(fd, self._data_ready, fd) loop.add_reader(fd, self._data_ready, fd)
self._watching = True self._watching = True
@ -180,7 +180,7 @@ class BaseNetworkController(BaseController):
if cp.returncode != 0: if cp.returncode != 0:
log.debug("waiting 0.1 to let udev event queue settle") log.debug("waiting 0.1 to let udev event queue settle")
self.stop_watching() self.stop_watching()
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
loop.call_later(0.1, self.start_watching) loop.call_later(0.1, self.start_watching)
return return
self.observer.data_ready(fd) self.observer.data_ready(fd)

View File

@ -70,10 +70,11 @@ class Application:
os.environ.get('SUBIQUITY_REPLAY_TIMESCALE', "1")) os.environ.get('SUBIQUITY_REPLAY_TIMESCALE', "1"))
self.updated = os.path.exists(self.state_path('updating')) self.updated = os.path.exists(self.state_path('updating'))
self.hub = MessageHub() self.hub = MessageHub()
self.aio_loop = asyncio.get_event_loop() self.aio_loop = asyncio.get_running_loop()
self.aio_loop.set_exception_handler(self._exception_handler) self.aio_loop.set_exception_handler(self._exception_handler)
self.load_controllers(self.controllers) self.load_controllers(self.controllers)
self.context = Context.new(self) self.context = Context.new(self)
self.exit_event = asyncio.Event()
def load_controllers(self, controllers): def load_controllers(self, controllers):
""" Load the corresponding list of controllers """ Load the corresponding list of controllers
@ -86,7 +87,7 @@ class Application:
def _exception_handler(self, loop, context): def _exception_handler(self, loop, context):
exc = context.get('exception') exc = context.get('exception')
if exc: if exc:
loop.stop() self.exit_event.set()
self._exc = exc self._exc = exc
else: else:
loop.default_exception_handler(context) loop.default_exception_handler(context)
@ -114,7 +115,7 @@ class Application:
# EventLoop ------------------------------------------------------------------- # EventLoop -------------------------------------------------------------------
def exit(self): def exit(self):
self.aio_loop.stop() self.exit_event.set()
def start_controllers(self): def start_controllers(self):
log.debug("starting controllers") log.debug("starting controllers")
@ -126,10 +127,10 @@ class Application:
self.controllers.load_all() self.controllers.load_all()
self.start_controllers() self.start_controllers()
def run(self): async def run(self):
self.base_model = self.make_model() self.base_model = self.make_model()
self.aio_loop.create_task(self.start()) self.aio_loop.create_task(self.start())
self.aio_loop.run_forever() await self.exit_event.wait()
if self._exc: if self._exc:
exc, self._exc = self._exc, None exc, self._exc = self._exc, None
raise exc raise exc

View File

@ -36,5 +36,4 @@ class MessageHub:
await v await v
def broadcast(self, channel, *args, **kwargs): def broadcast(self, channel, *args, **kwargs):
loop = asyncio.get_event_loop() return asyncio.create_task(self.abroadcast(channel, *args, **kwargs))
return loop.create_task(self.abroadcast(channel, *args, **kwargs))

View File

@ -374,11 +374,11 @@ class TuiApplication(Application):
if start_urwid: if start_urwid:
self.start_urwid() self.start_urwid()
def run(self): async def run(self):
if self.opts.scripts: if self.opts.scripts:
self.run_scripts(self.opts.scripts) self.run_scripts(self.opts.scripts)
try: try:
super().run() await super().run()
finally: finally:
if self.urwid_loop is not None: if self.urwid_loop is not None:
self.urwid_loop.stop() self.urwid_loop.stop()

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import json import json
import sys import sys
@ -38,9 +39,12 @@ def make_app():
def main(): def main():
schema = make_schema(make_app()) async def run_with_loop():
jsonschema.validate({"version": 1}, schema) schema = make_schema(make_app())
print(json.dumps(schema, indent=4)) jsonschema.validate({"version": 1}, schema)
print(json.dumps(schema, indent=4))
asyncio.run(run_with_loop())
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -14,6 +14,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse import argparse
import asyncio
import logging import logging
import os import os
import sys import sys
@ -96,14 +97,15 @@ def main():
.format(prefillFile)) .format(prefillFile))
opts.prefill = None opts.prefill = None
server = SystemSetupServer(opts, block_log_dir) async def run_with_loop():
server = SystemSetupServer(opts, block_log_dir)
server.note_file_for_apport(
"InstallerServerLog", logfiles['debug'])
server.note_file_for_apport(
"InstallerServerLogInfo", logfiles['info'])
await server.run()
server.note_file_for_apport( asyncio.run(run_with_loop())
"InstallerServerLog", logfiles['debug'])
server.note_file_for_apport(
"InstallerServerLogInfo", logfiles['info'])
server.run()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse import argparse
import asyncio
import logging import logging
import os import os
import fcntl import fcntl
@ -146,14 +147,15 @@ def main():
opts.answers.close() opts.answers.close()
opts.answers = None opts.answers = None
subiquity_interface = SystemSetupClient(opts) async def run_with_loop():
subiquity_interface = SystemSetupClient(opts)
subiquity_interface.note_file_for_apport(
"InstallerLog", logfiles['debug'])
subiquity_interface.note_file_for_apport(
"InstallerLogInfo", logfiles['info'])
await subiquity_interface.run()
subiquity_interface.note_file_for_apport( asyncio.run(run_with_loop())
"InstallerLog", logfiles['debug'])
subiquity_interface.note_file_for_apport(
"InstallerLogInfo", logfiles['info'])
subiquity_interface.run()
if __name__ == '__main__': if __name__ == '__main__':