Fixed instances of unreachable code, inconsistent returns, consider-using-with, consider-using-from-import, use-a-generator, consider-using-generator, redundant-u-string-prefix, arguments-renamed

This commit is contained in:
Alexander Ravenheart 2021-10-25 09:31:53 +03:00 committed by Mathieu Comandon
parent 7cff0a0301
commit 4174f7425c
29 changed files with 82 additions and 76 deletions

View file

@ -10,7 +10,7 @@ jobs:
- run: sudo apt-get install make libdbus-1-dev libgirepository1.0-dev
gir1.2-gnomedesktop-3.0 gir1.2-gtk-3.0 gir1.2-notify-0.7 gir1.2-webkit2-4.0
- run: pip install --upgrade pip wheel poetry
- run: make install
- run: make dev
- run: make isort-check
- run: make flake8
- run: make black || true

View file

@ -27,7 +27,7 @@ class db_cursor(object):
def cursor_execute(cursor, query, params=None):
"""Execute a SQL query, run it in a lock block"""
params = params or ()
lock = DB_LOCK.acquire(timeout=1)
lock = DB_LOCK.acquire(timeout=1) # pylint: disable=consider-using-with
if not lock:
logger.error("Database is busy. Not executing %s", query)
return

View file

@ -349,9 +349,10 @@ class Game(GObject.Object):
def set_keyboard_layout(layout):
setxkbmap_command = ["setxkbmap", "-model", "pc101", layout, "-print"]
xkbcomp_command = ["xkbcomp", "-", os.environ.get("DISPLAY", ":0")]
xkbcomp = subprocess.Popen(xkbcomp_command, stdin=subprocess.PIPE)
subprocess.Popen(setxkbmap_command, env=os.environ, stdout=xkbcomp.stdin).communicate()
xkbcomp.communicate()
with subprocess.Popen(xkbcomp_command, stdin=subprocess.PIPE) as xkbcomp:
with subprocess.Popen(setxkbmap_command, env=os.environ, stdout=xkbcomp.stdin) as setxkbmap:
setxkbmap.communicate()
xkbcomp.communicate()
def start_prelaunch_command(self):
"""Start the prelaunch command specified in the system options"""
@ -649,7 +650,8 @@ class Game(GObject.Object):
self.screen_saver_inhibitor_cookie = None
if self.runner.system_config.get("use_us_layout"):
subprocess.Popen(["setxkbmap"], env=os.environ).communicate()
with subprocess.Popen(["setxkbmap"], env=os.environ) as setxkbmap:
setxkbmap.communicate()
if self.runner.system_config.get("restore_gamma"):
restore_gamma()

View file

@ -173,7 +173,7 @@ class InstallerWindow(BaseApplicationWindow): # pylint: disable=too-many-public
)
self.destroy()
return
self.title_label.set_markup(_(u"<b>Installing {}</b>").format(gtk_safe(self.interpreter.installer.game_name)))
self.title_label.set_markup(_("<b>Installing {}</b>").format(gtk_safe(self.interpreter.installer.game_name)))
self.select_install_folder()
def select_install_folder(self):

View file

@ -136,5 +136,5 @@ class DownloadProgressBox(Gtk.Box):
return True
def _set_text(self, text):
markup = u"<span size='10000'>{}</span>".format(gtk_safe(text))
markup = "<span size='10000'>{}</span>".format(gtk_safe(text))
self.progress_label.set_markup(markup)

View file

@ -35,12 +35,13 @@ def fetch_script(game_slug, revision=None):
def read_script(filename):
"""Return scripts from a local file"""
logger.debug("Loading script(s) from %s", filename)
script = yaml.safe_load(open(filename, "r", encoding='utf-8').read())
if isinstance(script, list):
return script
if "results" in script:
return script["results"]
return [script]
with open(filename, "r", encoding='utf-8') as local_file:
script = yaml.safe_load(local_file.read())
if isinstance(script, list):
return script
if "results" in script:
return script["results"]
return [script]
def get_installers(game_slug, installer_file=None, revision=None):

View file

@ -518,13 +518,13 @@ class CommandsMixin:
def _killable_process(self, func, *args, **kwargs):
"""Run function `func` in a separate, killable process."""
process = multiprocessing.Pool(1)
result_obj = process.apply_async(func, args, kwargs)
self.abort_current_task = process.terminate
result = result_obj.get() # Wait process end & reraise exceptions
self.abort_current_task = None
logger.debug("Process %s returned: %s", func, result)
return result
with multiprocessing.Pool(1) as process:
result_obj = process.apply_async(func, args, kwargs)
self.abort_current_task = process.terminate
result = result_obj.get() # Wait process end & re-raise exceptions
self.abort_current_task = None
logger.debug("Process %s returned: %s", func, result)
return result
def _extract_gog_game(self, file_id):
self.extract({

View file

@ -26,7 +26,8 @@ def get_libretro_cores():
if not os.path.exists(info_path):
req = requests.get("http://buildbot.libretro.com/assets/frontend/info.zip", allow_redirects=True)
if req.status_code == requests.codes.ok: # pylint: disable=no-member
open(get_default_config_path('info.zip'), 'wb').write(req.content)
with open(get_default_config_path('info.zip'), 'wb') as info_zip:
info_zip.write(req.content)
with ZipFile(get_default_config_path('info.zip'), 'r') as info_zip:
info_zip.extractall(info_path)
else:
@ -170,9 +171,9 @@ class libretro(Runner):
# TODO: review later
# Create retroarch.cfg if it doesn't exist.
if not system.path_exists(config_file):
f = open(config_file, "w", encoding='utf-8')
f.write("# Lutris RetroArch Configuration")
f.close()
with open(config_file, "w", encoding='utf-8') as f:
f.write("# Lutris RetroArch Configuration")
f.close()
# Build the default config settings.
retro_config = RetroConfig(config_file)

View file

@ -157,15 +157,15 @@ class mednafen(Runner):
joy_ids = []
if not self.is_installed:
return []
output = subprocess.Popen(
with subprocess.Popen(
[self.get_executable(), "dummy"],
stdout=subprocess.PIPE,
universal_newlines=True,
).communicate()[0]
ouput = output.split("\n")
) as mednafen_process:
output = mednafen_process.communicate()[0].split("\n")
found = False
joy_list = []
for line in ouput:
for line in output:
if found and "Joystick" in line:
joy_list.append(line)
else:

View file

@ -103,9 +103,10 @@ class residualvm(Runner):
def get_game_list(self):
"""Return the entire list of games supported by ResidualVM."""
residual_output = subprocess.Popen([self.get_executable(), "--list-games"],
stdout=subprocess.PIPE).communicate()[0]
game_list = str.split(residual_output, "\n")
with subprocess.Popen([self.get_executable(), "--list-games"],
stdout=subprocess.PIPE, encoding="utf-8", universal_newlines=True) as residualvm_process:
residual_output = residualvm_process.communicate()[0]
game_list = str.split(residual_output, "\n")
game_array = []
game_list_start = False
for game in game_list:

View file

@ -401,7 +401,7 @@ class Runner: # pylint: disable=too-many-public-methods
callback()
@staticmethod
def remove_game_data(game_path=None):
def remove_game_data(app_id=None, game_path=None):
system.remove_folder(game_path)
def can_uninstall(self):

View file

@ -150,9 +150,10 @@ class scummvm(Runner):
def get_game_list(self):
"""Return the entire list of games supported by ScummVM."""
scumm_output = subprocess.Popen([self.get_executable(), "--list-games"],
stdout=subprocess.PIPE).communicate()[0]
game_list = str.split(scumm_output, "\n")
with subprocess.Popen([self.get_executable(), "--list-games"],
stdout=subprocess.PIPE, encoding="utf-8", universal_newlines=True) as scummvm_process:
scumm_output = scummvm_process.communicate()[0]
game_list = str.split(scumm_output, "\n")
game_array = []
game_list_start = False
for game in game_list:

View file

@ -66,7 +66,8 @@ class snes9x(Runner):
def set_option(self, option, value):
config_file = os.path.expanduser("~/.snes9x/snes9x.xml")
if not system.path_exists(config_file):
subprocess.Popen([self.get_executable(), "-help"])
with subprocess.Popen([self.get_executable(), "-help"]) as snes9x_process:
snes9x_process.communicate()
if not system.path_exists(config_file):
logger.error("Snes9x config file creation failed")
return

View file

@ -284,7 +284,7 @@ class steam(Runner):
shutdown()
time.sleep(5)
command = [self.get_executable(), "steam://install/%s" % appid]
subprocess.Popen(command)
subprocess.Popen(command) # pylint: disable=consider-using-with
def prelaunch(self):
def has_steam_shutdown(times=10):

View file

@ -257,7 +257,7 @@ class OnlineService(BaseService):
def is_authenticated(self):
"""Return whether the service is authenticated"""
return all([system.path_exists(path) for path in self.credential_files])
return all(system.path_exists(path) for path in self.credential_files)
def wipe_game_cache(self):
"""Wipe the game cache, allowing it to be reloaded"""

View file

@ -59,8 +59,8 @@ class DieselGameMedia(ServiceMedia):
thumb_image = thumb_image.convert("RGB")
thumb_image.save(thumb_path)
def get_media_url(self, detail):
for image in detail.get("keyImages", []):
def get_media_url(self, details):
for image in details.get("keyImages", []):
if image["type"] == self.api_field:
return image["url"] + "?w=%s&resize=1&h=%s" % (
self.remote_size[0],

View file

@ -8,7 +8,6 @@ from urllib.parse import parse_qsl, urlencode, urlparse
from lxml import etree
import lutris.util.i18n as i18n
from lutris import settings
from lutris.exceptions import AuthenticationError, UnavailableGame
from lutris.installer import AUTO_ELF_EXE, AUTO_WIN32_EXE
@ -16,7 +15,7 @@ from lutris.installer.installer_file import InstallerFile
from lutris.services.base import OnlineService
from lutris.services.service_game import ServiceGame
from lutris.services.service_media import ServiceMedia
from lutris.util import system
from lutris.util import i18n, system
from lutris.util.http import HTTPError, Request, UnauthorizedAccess
from lutris.util.log import logger
from lutris.util.strings import slugify

View file

@ -2,6 +2,7 @@ import json
import os
import random
import time
from typing import Optional
from lutris import settings
from lutris.database.services import ServiceGameCollection
@ -67,7 +68,7 @@ class ServiceMedia:
medias[game["slug"]] = media_url
return medias
def download(self, slug, url):
def download(self, slug: str, url: str) -> Optional[str]:
"""Downloads the banner if not present"""
if not url:
return
@ -78,17 +79,13 @@ class ServiceMedia:
cache_stats = os.stat(cache_path)
# Empty files have a life time between 1 and 2 weeks, retry them after
if time.time() - cache_stats.st_mtime < 3600 * 24 * random.choice(range(7, 15)):
return
return cache_path
os.unlink(cache_path)
try:
return download_file(url, cache_path, raise_errors=True)
except HTTPError as ex:
if ex.code == 404:
open(cache_path, "a", encoding='utf-8').close()
else:
logger.error(ex.code)
return None
return cache_path
logger.error(ex)
return
def render(self):
"""Used if the media requires extra processing"""

View file

@ -30,7 +30,7 @@ def restore_gamma():
"""Restores gamma to a normal level."""
xgamma_path = system.find_executable("xgamma")
try:
subprocess.Popen([xgamma_path, "-gamma", "1.0"])
subprocess.Popen([xgamma_path, "-gamma", "1.0"]) # pylint: disable=consider-using-with
except (FileNotFoundError, TypeError):
logger.warning("xgamma is not available on your system")
except PermissionError:

View file

@ -67,7 +67,7 @@ class Downloader:
self.last_check_time = get_time()
if self.overwrite and os.path.isfile(self.dest):
os.remove(self.dest)
self.file_pointer = open(self.dest, "wb")
self.file_pointer = open(self.dest, "wb") # pylint: disable=consider-using-with
self.thread = jobs.AsyncCall(self.async_download, self.download_cb)
self.stop_request = self.thread.stop_request

View file

@ -218,9 +218,9 @@ def extract_deb(archive, dest):
if not os.path.exists(data_file):
data_file = os.path.join(dest, "data.tar.xz")
extractor = "r:xz"
handler = tarfile.open(data_file, extractor)
handler.extractall(dest)
handler.close()
with tarfile.open(data_file, extractor) as handler:
handler.extractall(dest)
handler.close()
os.remove(data_file)

View file

@ -76,7 +76,8 @@ def turn_off_except(display):
for output in get_outputs():
if output.name != display:
logger.info("Turning off %s", output[0])
subprocess.Popen(["xrandr", "--output", output.name, "--off"])
with subprocess.Popen(["xrandr", "--output", output.name, "--off"]) as xrandr:
xrandr.communicate()
def get_resolutions():
@ -111,7 +112,8 @@ def change_resolution(resolution):
logger.warning("Resolution %s doesn't exist.", resolution)
else:
logger.info("Changing resolution to %s", resolution)
subprocess.Popen(["xrandr", "-s", resolution])
with subprocess.Popen(["xrandr", "-s", resolution]) as xrandr:
xrandr.communicate()
else:
for display in resolution:
logger.debug("Switching to %s on %s", display.mode, display.name)
@ -126,7 +128,7 @@ def change_resolution(resolution):
else:
rotation = "normal"
logger.info("Switching resolution of %s to %s", display.name, display.mode)
subprocess.Popen(
with subprocess.Popen(
[
"xrandr",
"--output",
@ -140,7 +142,8 @@ def change_resolution(resolution):
"--rate",
display.rate,
]
).communicate()
) as xrandr:
xrandr.communicate()
class LegacyDisplayManager: # pylint: disable=too-few-public-methods

View file

@ -87,7 +87,7 @@ class Request:
if self.opener:
request = self.opener.open(req, timeout=self.timeout)
else:
request = urllib.request.urlopen(req, timeout=self.timeout)
request = urllib.request.urlopen(req, timeout=self.timeout) # pylint: disable=consider-using-with
except (urllib.error.HTTPError, CertificateError) as error:
if error.code == 401:
raise UnauthorizedAccess("Access to %s denied" % self.url) from error

View file

@ -79,4 +79,4 @@ class RetroConfig:
self._config.append((key, self.serialize_value(value)))
def keys(self):
return list([key for (key, _value) in self.config])
return [key for (key, _value) in self.config]

View file

@ -298,7 +298,7 @@ class LinuxSystem: # pylint: disable=too-many-public-methods
# Ignore paths where 64-bit path is link to supposed 32-bit path
if os.path.realpath(lib_paths[0]) == os.path.realpath(lib_paths[1]):
continue
if all([os.path.exists(path) for path in lib_paths]):
if all(os.path.exists(path) for path in lib_paths):
if lib_paths[0] not in exported_lib_folders:
yield lib_paths[0]
if len(lib_paths) != 1:

View file

@ -104,7 +104,7 @@ def unpack_dependencies(string):
dependencies = [dep.strip() for dep in string.split(",") if dep.strip()]
for index, dependency in enumerate(dependencies):
if "|" in dependency:
dependencies[index] = tuple([option.strip() for option in dependency.split("|") if option.strip()])
dependencies[index] = tuple(option.strip() for option in dependency.split("|") if option.strip())
return [dependency for dependency in dependencies if dependency]

View file

@ -59,14 +59,16 @@ def execute(command, env=None, cwd=None, log_errors=False, quiet=False, shell=Fa
# Piping stderr can cause slowness in the programs, use carefully
# (especially when using regedit with wine)
try:
stdout, stderr = subprocess.Popen(
with subprocess.Popen(
command,
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE if log_errors else subprocess.DEVNULL,
env=existing_env,
cwd=cwd,
).communicate(timeout=timeout)
errors="replace"
) as command_process:
stdout, stderr = command_process.communicate(timeout=timeout)
except (OSError, TypeError) as ex:
logger.error("Could not run command %s (env: %s): %s", command, env, ex)
return ""
@ -75,7 +77,7 @@ def execute(command, env=None, cwd=None, log_errors=False, quiet=False, shell=Fa
return ""
if stderr and log_errors:
logger.error(stderr)
return stdout.decode(errors="replace").strip()
return stdout.strip()
def read_process_output(command, timeout=5):

View file

@ -87,14 +87,12 @@ class DLLManager:
def dll_exists(self, dll_name):
"""Check if the dll is provided by the component
The DLL might not be available for all archs so
only check if one exists for the supported architectures
The DLL might not be available for all architectures so
only check if one exists for the supported ones
"""
return any(
[
system.path_exists(os.path.join(self.path, arch, dll_name + ".dll"))
for arch in self.archs.values()
]
system.path_exists(os.path.join(self.path, arch, dll_name + ".dll"))
for arch in self.archs.values()
)
def get_download_url(self):

View file

@ -61,9 +61,9 @@ def create_launcher(game_slug, game_id, game_name, desktop=False, menu=False):
launcher_filename = get_xdg_basename(game_slug, game_id)
tmp_launcher_path = os.path.join(CACHE_DIR, launcher_filename)
tmp_launcher = open(tmp_launcher_path, "w", encoding='utf-8')
tmp_launcher.write(launcher_content)
tmp_launcher.close()
with open(tmp_launcher_path, "w", encoding='utf-8') as tmp_launcher:
tmp_launcher.write(launcher_content)
tmp_launcher.close()
os.chmod(
tmp_launcher_path,
stat.S_IREAD