Simplify process watcher

This commit is contained in:
Mathieu Comandon 2021-02-02 22:54:09 -08:00
parent 791830c31e
commit 449a00d390
6 changed files with 120 additions and 178 deletions

View file

@ -83,16 +83,22 @@ class MonitoredCommand:
def stdout(self):
return self._stdout.getvalue()
@property
def wrapper_command(self):
def get_wrapper_command(self):
"""Return launch arguments for the wrapper script"""
return [
wrapper_command = [
WRAPPER_SCRIPT,
self._title,
str(len(self.include_processes)),
str(len(self.exclude_processes)),
] + self.include_processes + self.exclude_processes + self.command
] + self.include_processes + self.exclude_processes
if not self.terminal:
return wrapper_command + self.command
terminal_path = system.find_executable(self.terminal)
if not terminal_path:
raise RuntimeError("Couldn't find terminal %s" % self.terminal)
script_path = get_terminal_script(self.command, self.cwd, self.env)
return wrapper_command + [terminal_path, "-e", script_path]
def set_log_buffer(self, log_buffer):
"""Attach a TextBuffer to this command enables the buffer handler"""
@ -127,20 +133,11 @@ class MonitoredCommand:
def start(self):
"""Run the thread."""
for key, value in self.env.items():
logger.debug("%s=\"%s\"", key, value)
logger.debug(" ".join(self.wrapper_command))
if self.terminal:
terminal = system.find_executable(self.terminal)
if not terminal:
logger.error("Couldn't find terminal %s", self.terminal)
return
script_path = get_terminal_script(self.wrapper_command, self.cwd, self.env)
self.game_process = self.execute_process([terminal, "-e", script_path])
else:
env = self.get_child_environment()
self.game_process = self.execute_process(self.wrapper_command, env)
# for key, value in self.env.items():
# logger.debug("%s=\"%s\"", key, value)
wrapper_command = self.get_wrapper_command()
env = self.get_child_environment()
self.game_process = self.execute_process(wrapper_command, env)
if not self.game_process:
logger.error("No game process available")
@ -177,7 +174,7 @@ class MonitoredCommand:
if self.prevent_on_stop: # stop() already in progress
return False
if self.game_process.returncode is None:
logger.debug("Process hasn't terminated yet")
logger.debug("Waiting for process to give a return code")
self.game_process.wait()
logger.debug("Process %s has terminated with code %s", pid, self.game_process.returncode)
self.is_running = False
@ -239,10 +236,10 @@ class MonitoredCommand:
# process already dead.
pass
if hasattr(self, "stop_func"):
resume_stop = self.stop_func()
if not resume_stop:
return False
resume_stop = self.stop_func()
if not resume_stop:
logger.warning("Stop execution halted by demand of stop_func")
return False
if self.stdout_monitor:
GLib.source_remove(self.stdout_monitor)

View file

@ -18,21 +18,20 @@ class BattleNetService(OnlineService):
@property
def oauth_url(self):
"""Return the URL used for OAuth sign in"""
if self.region == 'cn':
if self.region == "cn":
return "https://www.battlenet.com.cn/oauth"
return f"https://{self.region}.battle.net/oauth"
return "https://%s.battle.net/oauth" % self.region
@property
def api_url(self):
"""Main API endpoint"""
if self.region == 'cn':
if self.region == "cn":
return "https://gateway.battlenet.com.cn"
return f"https://{self.region}.api.blizzard.com"
return "https://%s.api.blizzard.com" % self.region
@property
def login_url(self):
"""Battle.net login URL"""
if self.region == 'cn':
return 'https://www.battlenet.com.cn/login/zh'
else:
return f'https://{self.region}.battle.net/login/en'
if self.region == "cn":
return "https://www.battlenet.com.cn/login/zh"
return "https://%s.battle.net/login/en" % self.region

View file

@ -1,9 +1,8 @@
"""Process monitor management"""
# Standard Library
"""Process management"""
import os
import shlex
import sys
# Lutris Modules
from lutris.util.process import Process
# Processes that are considered sufficiently self-managing by the
@ -26,23 +25,18 @@ SYSTEM_PROCESSES = {
}
class ProcessMonitor:
"""Class to keep track of a process and its children status"""
class ProcessWatcher:
"""Keeps track of child processes of the client"""
def __init__(self, include_processes, exclude_processes):
"""Creates a process monitor
All arguments accept a list of process names
Args:
"""Create a process watcher.
Params:
exclude_processes (str or list): list of processes that shouldn't be monitored
include_processes (str or list): list of process that should be forced to be monitored
"""
include_processes = self.parse_process_list(include_processes)
exclude_processes = self.parse_process_list(exclude_processes)
self.unmonitored_processes = (exclude_processes | SYSTEM_PROCESSES) - include_processes
self.unmonitored_processes = (
self.parse_process_list(exclude_processes) | SYSTEM_PROCESSES
) - self.parse_process_list(include_processes)
@staticmethod
def parse_process_list(process_list):
@ -55,28 +49,23 @@ class ProcessMonitor:
return {p[0:15] for p in process_list}
@staticmethod
def iterate_all_processes():
def iterate_children():
"""Iterates through all children process of the lutris client.
This is not accurate since not all processes are started by
lutris but are started by Systemd instead.
"""
return Process(os.getpid()).iter_children()
def iterate_game_processes(self):
for child in self.iterate_all_processes():
def iterate_processes(self):
for child in self.iterate_children():
if child.state == 'Z':
continue
if child.name and child.name not in self.unmonitored_processes:
yield child
def iterate_monitored_processes(self):
for child in self.iterate_all_processes():
if child.state == 'Z':
continue
if child.name not in self.unmonitored_processes:
yield child
def is_game_alive(self):
"""Returns whether at least one nonexcluded process exists"""
return next(self.iterate_game_processes(), None) is not None
def are_monitored_processes_alive(self):
return next(self.iterate_monitored_processes(), None) is not None
def is_alive(self, message=None):
"""Returns whether at least one watched process exists"""
if message:
sys.stdout.write("%s\n" % message)
return next(self.iterate_processes(), None) is not None

View file

@ -24,6 +24,7 @@ def get_terminal_script(command, cwd, env):
cd "%s"
%s
exec %s
exit $?
""" % (cwd, exported_environment, command)
)
)
@ -60,6 +61,8 @@ def get_bash_rc_file(cwd, env, aliases=None):
def get_shell_command(cwd, env, aliases=None):
"""Generates a scripts whichs opens a bash shell configured with given
environment variables and aliases.
"""
bashrc_file = get_bash_rc_file(cwd, env, aliases)
script_path = get_terminal_script(["bash", "--rcfile", bashrc_file], cwd, env)
return script_path
return get_terminal_script(["bash", "--rcfile", bashrc_file], cwd, env)

View file

@ -12,7 +12,7 @@ import signal
import logging
import ctypes
from ctypes.util import find_library
from lutris.util.monitor import ProcessMonitor
from lutris.util.process_watcher import ProcessWatcher
from lutris.util.log import logger
try:
@ -21,7 +21,11 @@ except ImportError:
setproctitle = print
PR_SET_CHILD_SUBREAPER = 36
PR_SET_CHILD_SUBREAPER = 36 # Value of the constant in prctl.h
class NoMoreChildren(Exception):
"""Raised when async_reap_children finds no children left"""
def set_child_subreaper():
@ -51,23 +55,36 @@ def set_child_subreaper():
"""
result = ctypes.CDLL(find_library('c')).prctl(PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0, 0)
if result == -1:
print("PR_SET_CHILD_SUBREAPER failed, process watching may fail")
print("PR_SET_CHILD_SUBREAPER is not supported by your kernel (Linux 3.4 and above)")
def log(line):
def log(log_message):
"""Generic log function that can be adjusted for any log output method
(stdout, file, logging, t2s, Discord, ...)
"""
line = str(log_message) + "\n"
try:
sys.stdout.write(line + "\n")
sys.stdout.write(line)
sys.stdout.flush()
except BrokenPipeError:
pass
# File output example
# with open(os.path.expanduser("~/lutris.log"), "a") as logfile:
# logfile.write(line)
# logfile.write("\n")
# File output
with open(os.path.expanduser("~/lutris.log"), "a") as logfile:
logfile.write(line)
def kill_pid(pid, sigkill=False):
"""Attempt to kill a process with SIGTERM or SIGKILL"""
if sigkill:
_signal = signal.SIGKILL
else:
_signal = signal.SIGTERM
log("Killing PID %s with %s" % (pid, "SIGKILL" if sigkill else "SIGTERM"))
try:
os.kill(pid, _signal)
except ProcessLookupError: # process already dead
pass
def main():
@ -75,7 +92,7 @@ def main():
# pylint: disable=too-many-branches,too-many-statements
# TODO: refactor
set_child_subreaper()
_, proc_title, include_proc_count, exclude_proc_count, *args = sys.argv
_script_name, proc_title, include_proc_count, exclude_proc_count, *args = sys.argv
setproctitle("lutris-wrapper: " + proc_title)
@ -87,47 +104,33 @@ def main():
if "PYTHONPATH" in os.environ:
del os.environ["PYTHONPATH"]
monitor = ProcessMonitor(include_procs, exclude_procs)
watcher = ProcessWatcher(include_procs, exclude_procs)
def hard_sig_handler(signum, _frame):
log("Caught another signal, sending SIGKILL.")
for _ in range(3): # just in case we race a new process.
for child in monitor.iterate_all_processes():
try:
os.kill(child.pid, signal.SIGKILL)
except ProcessLookupError: # process already dead
pass
log("--killed processes--")
for child in watcher.iterate_children():
kill_pid(child.pid, sigkill=True)
def sig_handler(signum, _frame):
log("Caught signal %s" % signum)
signal.signal(signal.SIGTERM, hard_sig_handler)
signal.signal(signal.SIGINT, hard_sig_handler)
for child in monitor.iterate_game_processes():
log("passing along signal to PID %s" % child.pid)
try:
os.kill(child.pid, signum)
except ProcessLookupError: # process already dead
pass
log("--terminated processes--")
for child in watcher.iterate_children():
kill_pid(child.pid)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
log("Running %s" % " ".join(args))
returncode = None
try:
initial_pid = subprocess.Popen(args).pid
process_pid = subprocess.Popen(args).pid
except FileNotFoundError:
log("Failed to execute process. Check that the file exists")
return
log("Started initial process %d from %s" % (process_pid, " ".join(args)))
log("Initial process has started with pid %d" % initial_pid)
class NoMoreChildren(Exception):
"Raised when async_reap_children finds no children left"
def async_reap_children():
def reap_children(message=None):
"""
Attempts to reap zombie child processes. Thanks to setting
ourselves as a subreaper, we are assigned zombie processes
@ -138,87 +141,39 @@ def main():
code was so that we can forward it to our caller.
"""
nonlocal returncode
if message:
log(message)
while True:
try:
dead_pid, dead_returncode, usage = os.wait3(os.WNOHANG)
child_pid, child_returncode, resource_usage = os.wait3(os.WNOHANG)
except ChildProcessError:
# No processes remain. No need to check monitor.
raise NoMoreChildren from None
raise NoMoreChildren from None # No processes remain.
if child_pid == process_pid:
if returncode:
log("Reassigning returncode, which is very weird but you'll have to deal with it.")
returncode = child_returncode
log("Initial process has exited (return code: %s)" % child_returncode)
if returncode is None and dead_pid == initial_pid:
returncode = dead_returncode
log("Initial process has exited (return code: %s)" % dead_returncode)
if dead_pid == 0:
if child_pid == 0:
break
log("Start monitoring process.")
try:
# While we are inside this try..except, if at the time of any
# call to async_reap_children there are no children left, we
# will skip the rest of our cleanup logic, since with no
# children remaining, there's nothing left to wait for.
#
# This behavior doesn't help with ignoring "system processes",
# so its more of a shortcut out of this code than it is
# essential for correctness.
# The initial wait loop:
# the initial process may have been excluded. Wait for the game
# to be considered "started".
if not monitor.is_game_alive():
log("Waiting for game to start (first non-excluded process started)")
while not monitor.is_game_alive():
async_reap_children()
time.sleep(0.1)
# The main wait loop:
# The game is running. Our process is now just waiting around
# for processes to exit, waiting up every .1s to reap child
# processes.
log("Start monitoring process.")
while monitor.is_game_alive():
async_reap_children()
while watcher.is_alive():
reap_children()
time.sleep(0.1)
log("Monitored process exited.")
async_reap_children()
# The exit wait loop:
# The game is no longer running. We ask monitored processes
# to exit and wait 30 seconds before sending more SIGTERMs.
while monitor.are_monitored_processes_alive():
async_reap_children()
child = None
for child in monitor.iterate_monitored_processes():
log("Sending SIGTERM to PID %s (pid %s)" % (child.name, child.pid))
try:
os.kill(child.pid, signal.SIGTERM)
except ProcessLookupError: # process already dead
pass
# Spend 60 seconds waiting for processes to clean up.
async_reap_children()
for _ in range(600):
if not monitor.are_monitored_processes_alive():
break
if _ == 0:
log("Waiting up to 30sec for processes to exit.")
async_reap_children()
time.sleep(0.1)
async_reap_children()
log("All monitored processes have exited.")
reap_children()
# The game is no longer running. We kill other processes
for child in watcher.iterate_monitored_processes():
kill_pid(child.pid)
reap_children()
except NoMoreChildren:
log("All children have exited.")
log("All processes have quit")
if returncode is None:
returncode = 0
log("Monitored process didn't return an exit code.")
log("Exit with returncode %s" % returncode)
log("Failed to read return code from process")
returncode = -1
else:
log("Process exited with returncode %s" % returncode)
sys.exit(returncode)

View file

@ -36,13 +36,6 @@ class LutrisWrapperTestCase(unittest.TestCase):
try:
# Wait for the "Hello World" message that indicates that the process
# tree has started. This message arrives on stdout.
for line in wrapper_proc.stdout:
if line.strip() == b'Hello World':
# We found the output we're looking for.
break
else:
self.fail("stdout EOF unexpectedly")
# Wait a short while to see if lutris-wrapper will exit (it shouldn't)
try:
wrapper_proc.wait(0.5)
@ -54,8 +47,14 @@ class LutrisWrapperTestCase(unittest.TestCase):
self.fail("Process exited unexpectedly")
finally:
if wrapper_proc.returncode is None:
wrapper_proc.terminate()
wrapper_proc.wait(30)
wrapper_proc.kill()
wrapper_proc.wait(3)
for line in wrapper_proc.stdout:
if line.strip() == b'Hello World':
# We found the output we're looking for.
break
else:
self.fail("stdout EOF unexpectedly")
wrapper_proc.stdout.close()
def test_cleanup_children(self):