Refactor: move installer.Commands and ScriptingError to new files

This commit is contained in:
Xodetaetl 2015-10-03 20:07:54 +02:00
parent 025f7824e0
commit 5b720d848b
6 changed files with 371 additions and 337 deletions

View file

@ -3,7 +3,8 @@ import time
from gi.repository import Gtk
import yaml
from lutris import installer, settings, shortcuts
from lutris import settings, shortcuts
from lutris.installer import interpreter
from lutris.game import Game
from lutris.gui.widgets import DownloadProgressBox, FileChooserEntry
from lutris.util.log import logger
@ -86,7 +87,7 @@ class InstallerDialog(Gtk.Window):
logger.debug("Opening script: %s", game_ref)
self.scripts = yaml.safe_load(open(game_ref, 'r').read())
else:
self.scripts = installer.fetch_script(self, game_ref)
self.scripts = interpreter.fetch_script(self, game_ref)
if not self.scripts:
self.destroy()
return
@ -157,7 +158,7 @@ class InstallerDialog(Gtk.Window):
def prepare_install(self, script_index):
script = self.scripts[script_index]
self.interpreter = installer.ScriptInterpreter(script, self)
self.interpreter = interpreter.ScriptInterpreter(script, self)
game_name = self.interpreter.game_name.replace('&', '&')
self.title_label.set_markup(u"<b>Installing {}</b>".format(game_name))
self.select_install_folder()

View file

@ -0,0 +1 @@
"""Install script interpreter package."""

View file

@ -0,0 +1,306 @@
import os
import shutil
import shlex
from gi.repository import Gdk
from .errors import ScriptingError
from lutris import runtime
from lutris.util import extract, devices, system
from lutris.util.fileio import EvilConfigParser, MultiOrderedDict
from lutris.util.log import logger
from lutris.runners import wine, import_task, import_runner, InvalidRunner
from lutris.thread import LutrisThread
class Commands(object):
"""The directives for the "installer:" part of the install script."""
def _check_required_params(self, params, command_data, command_name):
"""Verify presence of a list of parameters required by a command."""
if type(params) is str:
params = [params]
for param in params:
if param not in command_data:
raise ScriptingError('The "%s" parameter is mandatory for '
'the %s command' % (param, command_name),
command_data)
def check_md5(self, data):
self._check_required_params(['file', 'value'], data, 'check_md5')
filename = self._get_file(data['file'])
_hash = system.get_md5_hash(filename)
if _hash != data['value']:
raise ScriptingError("MD5 checksum mismatch", data)
def chmodx(self, filename):
filename = self._substitute(filename)
os.popen('chmod +x "%s"' % filename)
def execute(self, data):
"""Run an executable file."""
args = []
if isinstance(data, dict):
self._check_required_params('file', data, 'execute')
file_ref = data['file']
args_string = data.get('args', '')
for arg in shlex.split(args_string):
args.append(self._substitute(arg))
else:
file_ref = data
# Determine whether 'file' value is a file id or a path
exec_path = self._get_file(file_ref) or self._substitute(file_ref)
if not exec_path:
raise ScriptingError("Unable to find file %s" % file_ref,
file_ref)
if not os.path.exists(exec_path):
raise ScriptingError("Unable to find required executable",
exec_path)
self.chmodx(exec_path)
terminal = data.get('terminal')
if terminal:
terminal = system.get_default_terminal()
command = [exec_path] + args
logger.debug("Executing %s" % command)
thread = LutrisThread(command, env=runtime.get_env(), term=terminal)
thread.run()
def extract(self, data):
"""Extract a file, guessing the compression method."""
self._check_required_params('file', data, 'extract')
filename = self._get_file(data['file'])
if not filename:
filename = self._substitute(data['file'])
if not os.path.exists(filename):
raise ScriptingError("%s does not exists" % filename)
if 'dst' in data:
dest_path = self._substitute(data['dst'])
else:
dest_path = self.target_path
msg = "Extracting %s" % os.path.basename(filename)
logger.debug(msg)
self.parent.set_status(msg)
merge_single = 'nomerge' not in data
extractor = data.get('format')
logger.debug("extracting file %s to %s", filename, dest_path)
extract.extract_archive(filename, dest_path, merge_single, extractor)
def input_menu(self, data):
"""Display an input request as a dropdown menu with options."""
self._check_required_params('options', data, 'input_menu')
identifier = data.get('id')
alias = 'INPUT_%s' % identifier if identifier else None
has_entry = data.get('entry')
options = data['options']
preselect = self._substitute(data.get('preselect', ''))
self.parent.input_menu(alias, options, preselect, has_entry,
self._on_input_menu_validated)
return 'STOP'
def _on_input_menu_validated(self, widget, *args):
alias = args[0]
menu = args[1]
choosen_option = menu.get_active_id()
if choosen_option:
self.user_inputs.append({'alias': alias,
'value': choosen_option})
self.parent.continue_button.hide()
self._iter_commands()
def insert_disc(self, data):
self._check_required_params('requires', data, 'insert_disc')
requires = data.get('requires')
message = data.get(
'message',
"Insert game disc or mount disk image and click OK."
)
message += (
"\n\nLutris is looking for a mounted disk drive or image \n"
"containing the following file or folder:\n"
"<i>%s</i>" % requires
)
self.parent.wait_for_user_action(message, self._find_matching_disc,
requires)
return 'STOP'
def _find_matching_disc(self, widget, requires):
drives = devices.get_mounted_discs()
for drive in drives:
mount_point = drive.get_root().get_path()
required_abspath = os.path.join(mount_point, requires)
required_abspath = system.fix_path_case(required_abspath)
if required_abspath:
logger.debug("Found %s on cdrom %s" % (requires, mount_point))
self.game_disc = mount_point
self._iter_commands()
break
def mkdir(self, directory):
directory = self._substitute(directory)
try:
os.makedirs(directory)
except OSError:
logger.debug("Directory %s already exists" % directory)
else:
logger.debug("Created directory %s" % directory)
def merge(self, params):
self._check_required_params(['src', 'dst'], params, 'merge')
src, dst = self._get_move_paths(params)
logger.debug("Merging %s into %s" % (src, dst))
if not os.path.exists(src):
raise ScriptingError("Source does not exist: %s" % src, params)
if not os.path.exists(dst):
os.makedirs(dst)
if os.path.isfile(src):
# If single file, copy it and change reference in game file so it
# can be used as executable. Skip copying if the source is the same
# as destination.
if os.path.dirname(src) != dst:
shutil.copy(src, dst)
if params['src'] in self.game_files.keys():
self.game_files[params['src']] = os.path.join(
dst, os.path.basename(src)
)
return
system.merge_folders(src, dst)
def move(self, params):
"""Move a file or directory."""
self._check_required_params(['src', 'dst'], params, 'move')
src, dst = self._get_move_paths(params)
logger.debug("Moving %s to %s" % (src, dst))
if not os.path.exists(src):
raise ScriptingError("I can't move %s, it does not exist" % src)
if os.path.isfile(src):
src_filename = os.path.basename(src)
src_dir = os.path.dirname(src)
dst_path = os.path.join(dst, src_filename)
if src_dir == dst:
logger.info("Source file is the same as destination, skipping")
elif os.path.exists(dst_path):
# May not be the best choice, but it's the safest.
# Maybe should display confirmation dialog (Overwrite / Skip) ?
logger.info("Destination file exists, skipping")
else:
shutil.move(src, dst)
else:
try:
shutil.move(src, dst)
except shutil.Error:
raise ScriptingError("Can't move %s to destination %s"
% (src, dst))
if os.path.isfile(src) and params['src'] in self.game_files.keys():
# Change game file reference so it can be used as executable
self.game_files['src'] = src
def _get_move_paths(self, params):
"""Convert raw data passed to 'move'."""
src_ref = params['src']
src = (self.game_files.get(src_ref) or self._substitute(src_ref))
if not src:
raise ScriptingError("Wrong value for 'src' param", src_ref)
dst_ref = params['dst']
dst = self._substitute(dst_ref)
if not dst:
raise ScriptingError("Wrong value for 'dst' param", dst_ref)
return (src, dst)
def _get_file(self, fileid):
return self.game_files.get(fileid)
def substitute_vars(self, data):
self._check_required_params('file', data, 'execute')
filename = self._substitute(data['file'])
logger.debug('Substituting variables for file %s', filename)
tmp_filename = filename + '.tmp'
with open(filename, 'r') as source_file:
with open(tmp_filename, 'w') as dest_file:
line = '.'
while line:
line = source_file.readline()
line = self._substitute(line)
dest_file.write(line)
os.rename(tmp_filename, filename)
def task(self, data):
"""Directive triggering another function specific to a runner.
The 'name' parameter is mandatory. If 'args' is provided it will be
passed to the runner task.
"""
self._check_required_params('name', data, 'task')
task_name = data.pop('name')
if '.' in task_name:
# Run a task from a different runner
# than the one for this installer
runner_name, task_name = task_name.split('.')
else:
runner_name = self.script["runner"]
try:
runner_class = import_runner(runner_name)
except InvalidRunner:
raise ScriptingError('Invalid runner provided %s', runner_name)
runner = runner_class()
# Check/install Wine runner at version specified in the script
wine_version = None
if runner_name == 'wine' and self.script.get('wine'):
wine_version = self.script.get('wine').get('version')
# Old lutris versions used a version + arch tuple, we now include
# everything in the version.
# Before that change every wine runner was for i386
if '-' not in wine_version:
wine_version += '-i386'
if wine_version and task_name == 'wineexec':
if not wine.is_version_installed(wine_version):
Gdk.threads_init()
Gdk.threads_enter()
runner.install(wine_version)
Gdk.threads_leave()
data['wine_path'] = wine.get_wine_version_exe(wine_version)
# Check/install other runner
elif not runner.is_installed():
Gdk.threads_init()
Gdk.threads_enter()
runner.install()
Gdk.threads_leave()
for key in data:
data[key] = self._substitute(data[key])
task = import_task(runner_name, task_name)
task(**data)
def write_config(self, params):
self._check_required_params(['file', 'section', 'key', 'value'],
params, 'move')
"""Write a key-value pair into an INI type config file."""
# Get file
config_file = self._get_file(params['file'])
if not config_file:
config_file = self._substitute(params['file'])
# Create it if necessary
basedir = os.path.dirname(config_file)
if not os.path.exists(basedir):
os.makedirs(basedir)
parser = EvilConfigParser(allow_no_value=True,
dict_type=MultiOrderedDict)
parser.optionxform = str # Preserve text case
parser.read(config_file)
if not parser.has_section(params['section']):
parser.add_section(params['section'])
parser.set(params['section'], params['key'], params['value'])
with open(config_file, 'wb') as f:
parser.write(f)

View file

@ -0,0 +1,33 @@
import sys
from lutris.util.log import logger
from lutris.gui.dialogs import ErrorDialog
class ScriptingError(Exception):
"""Custom exception for scripting errors, can be caught by modifying
excepthook."""
def __init__(self, message, faulty_data=None):
self.message = message
self.faulty_data = faulty_data
logger.error(self.message + repr(self.faulty_data))
super(ScriptingError, self).__init__()
def __str__(self):
return self.message + "\n" + repr(self.faulty_data)
def __repr__(self):
return self.message
_excepthook = sys.excepthook
def error_handler(error_type, value, traceback):
if error_type == ScriptingError:
message = value.message
if value.faulty_data:
message += "\n<b>" + str(value.faulty_data) + "</b>"
ErrorDialog(message)
else:
_excepthook(error_type, value, traceback)
sys.excepthook = error_handler

View file

@ -1,60 +1,28 @@
# pylint: disable=E1101, E0611
"""Install a game by following its install script."""
import os
import sys
import yaml
import shutil
import urllib2
import platform
import shlex
import webbrowser
from gi.repository import Gdk, GLib
from gi.repository import GLib
from lutris import pga, runtime, settings
from lutris.util import extract, devices, system
from lutris.util.fileio import EvilConfigParser, MultiOrderedDict
from .errors import ScriptingError
from .commands import Commands
from lutris import pga, settings
from lutris.util import system
from lutris.util.jobs import AsyncCall
from lutris.util.log import logger
from lutris.util.steam import get_app_states
from lutris.game import Game
from lutris.config import LutrisConfig
from lutris.game import Game
from lutris.gui.config_dialogs import AddGameDialog
from lutris.gui.dialogs import ErrorDialog, NoInstallerDialog
from lutris.runners import (
wine, winesteam, steam, import_task, import_runner, InvalidRunner
)
from lutris.thread import LutrisThread
class ScriptingError(Exception):
"""Custom exception for scripting errors, can be caught by modifying
excepthook."""
def __init__(self, message, faulty_data=None):
self.message = message
self.faulty_data = faulty_data
logger.error(self.message + repr(self.faulty_data))
super(ScriptingError, self).__init__()
def __str__(self):
return self.message + "\n" + repr(self.faulty_data)
def __repr__(self):
return self.message
_excepthook = sys.excepthook
def error_handler(error_type, value, traceback):
if error_type == ScriptingError:
message = value.message
if value.faulty_data:
message += "\n<b>" + str(value.faulty_data) + "</b>"
ErrorDialog(message)
else:
_excepthook(error_type, value, traceback)
sys.excepthook = error_handler
from lutris.gui.dialogs import NoInstallerDialog
from lutris.runners import wine, winesteam, steam
def fetch_script(window, game_ref):
@ -81,297 +49,6 @@ def fetch_script(window, game_ref):
return yaml.safe_load(script_contents)
class Commands(object):
"""The directives for the "installer:" part of the script."""
def _check_required_params(self, params, command_data, command_name):
"""Verify presence of a list of parameters required by a command."""
if type(params) is str:
params = [params]
for param in params:
if param not in command_data:
raise ScriptingError('The "%s" parameter is mandatory for '
'the %s command' % (param, command_name),
command_data)
def check_md5(self, data):
self._check_required_params(['file', 'value'], data, 'check_md5')
filename = self._get_file(data['file'])
_hash = system.get_md5_hash(filename)
if _hash != data['value']:
raise ScriptingError("MD5 checksum mismatch", data)
def chmodx(self, filename):
filename = self._substitute(filename)
os.popen('chmod +x "%s"' % filename)
def execute(self, data):
"""Run an executable file."""
args = []
if isinstance(data, dict):
self._check_required_params('file', data, 'execute')
file_ref = data['file']
args_string = data.get('args', '')
for arg in shlex.split(args_string):
args.append(self._substitute(arg))
else:
file_ref = data
# Determine whether 'file' value is a file id or a path
exec_path = self._get_file(file_ref) or self._substitute(file_ref)
if not exec_path:
raise ScriptingError("Unable to find file %s" % file_ref,
file_ref)
if not os.path.exists(exec_path):
raise ScriptingError("Unable to find required executable",
exec_path)
self.chmodx(exec_path)
terminal = data.get('terminal')
if terminal:
terminal = system.get_default_terminal()
command = [exec_path] + args
logger.debug("Executing %s" % command)
thread = LutrisThread(command, env=runtime.get_env(), term=terminal)
thread.run()
def extract(self, data):
"""Extract a file, guessing the compression method."""
self._check_required_params('file', data, 'extract')
filename = self._get_file(data['file'])
if not filename:
filename = self._substitute(data['file'])
if not os.path.exists(filename):
raise ScriptingError("%s does not exists" % filename)
if 'dst' in data:
dest_path = self._substitute(data['dst'])
else:
dest_path = self.target_path
msg = "Extracting %s" % os.path.basename(filename)
logger.debug(msg)
self.parent.set_status(msg)
merge_single = 'nomerge' not in data
extractor = data.get('format')
logger.debug("extracting file %s to %s", filename, dest_path)
extract.extract_archive(filename, dest_path, merge_single, extractor)
def input_menu(self, data):
"""Display an input request as a dropdown menu with options."""
self._check_required_params('options', data, 'input_menu')
identifier = data.get('id')
alias = 'INPUT_%s' % identifier if identifier else None
has_entry = data.get('entry')
options = data['options']
preselect = self._substitute(data.get('preselect', ''))
self.parent.input_menu(alias, options, preselect, has_entry,
self._on_input_menu_validated)
return 'STOP'
def _on_input_menu_validated(self, widget, *args):
alias = args[0]
menu = args[1]
choosen_option = menu.get_active_id()
if choosen_option:
self.user_inputs.append({'alias': alias,
'value': choosen_option})
self.parent.continue_button.hide()
self._iter_commands()
def insert_disc(self, data):
self._check_required_params('requires', data, 'insert_disc')
requires = data.get('requires')
message = data.get(
'message',
"Insert game disc or mount disk image and click OK."
)
message += (
"\n\nLutris is looking for a mounted disk drive or image \n"
"containing the following file or folder:\n"
"<i>%s</i>" % requires
)
self.parent.wait_for_user_action(message, self._find_matching_disc,
requires)
return 'STOP'
def _find_matching_disc(self, widget, requires):
drives = devices.get_mounted_discs()
for drive in drives:
mount_point = drive.get_root().get_path()
required_abspath = os.path.join(mount_point, requires)
required_abspath = system.fix_path_case(required_abspath)
if required_abspath:
logger.debug("Found %s on cdrom %s" % (requires, mount_point))
self.game_disc = mount_point
self._iter_commands()
break
def mkdir(self, directory):
directory = self._substitute(directory)
try:
os.makedirs(directory)
except OSError:
logger.debug("Directory %s already exists" % directory)
else:
logger.debug("Created directory %s" % directory)
def merge(self, params):
self._check_required_params(['src', 'dst'], params, 'merge')
src, dst = self._get_move_paths(params)
logger.debug("Merging %s into %s" % (src, dst))
if not os.path.exists(src):
raise ScriptingError("Source does not exist: %s" % src, params)
if not os.path.exists(dst):
os.makedirs(dst)
if os.path.isfile(src):
# If single file, copy it and change reference in game file so it
# can be used as executable. Skip copying if the source is the same
# as destination.
if os.path.dirname(src) != dst:
shutil.copy(src, dst)
if params['src'] in self.game_files.keys():
self.game_files[params['src']] = os.path.join(
dst, os.path.basename(src)
)
return
system.merge_folders(src, dst)
def move(self, params):
"""Move a file or directory."""
self._check_required_params(['src', 'dst'], params, 'move')
src, dst = self._get_move_paths(params)
logger.debug("Moving %s to %s" % (src, dst))
if not os.path.exists(src):
raise ScriptingError("I can't move %s, it does not exist" % src)
if os.path.isfile(src):
src_filename = os.path.basename(src)
src_dir = os.path.dirname(src)
dst_path = os.path.join(dst, src_filename)
if src_dir == dst:
logger.info("Source file is the same as destination, skipping")
elif os.path.exists(dst_path):
# May not be the best choice, but it's the safest.
# Maybe should display confirmation dialog (Overwrite / Skip) ?
logger.info("Destination file exists, skipping")
else:
shutil.move(src, dst)
else:
try:
shutil.move(src, dst)
except shutil.Error:
raise ScriptingError("Can't move %s to destination %s"
% (src, dst))
if os.path.isfile(src) and params['src'] in self.game_files.keys():
# Change game file reference so it can be used as executable
self.game_files['src'] = src
def _get_move_paths(self, params):
"""Convert raw data passed to 'move'."""
src_ref = params['src']
src = (self.game_files.get(src_ref) or self._substitute(src_ref))
if not src:
raise ScriptingError("Wrong value for 'src' param", src_ref)
dst_ref = params['dst']
dst = self._substitute(dst_ref)
if not dst:
raise ScriptingError("Wrong value for 'dst' param", dst_ref)
return (src, dst)
def _get_file(self, fileid):
return self.game_files.get(fileid)
def substitute_vars(self, data):
self._check_required_params('file', data, 'execute')
filename = self._substitute(data['file'])
logger.debug('Substituting variables for file %s', filename)
tmp_filename = filename + '.tmp'
with open(filename, 'r') as source_file:
with open(tmp_filename, 'w') as dest_file:
line = '.'
while line:
line = source_file.readline()
line = self._substitute(line)
dest_file.write(line)
os.rename(tmp_filename, filename)
def task(self, data):
"""Directive triggering another function specific to a runner.
The 'name' parameter is mandatory. If 'args' is provided it will be
passed to the runner task.
"""
self._check_required_params('name', data, 'task')
task_name = data.pop('name')
if '.' in task_name:
# Run a task from a different runner
# than the one for this installer
runner_name, task_name = task_name.split('.')
else:
runner_name = self.script["runner"]
try:
runner_class = import_runner(runner_name)
except InvalidRunner:
raise ScriptingError('Invalid runner provided %s', runner_name)
runner = runner_class()
# Check/install Wine runner at version specified in the script
wine_version = None
if runner_name == 'wine' and self.script.get('wine'):
wine_version = self.script.get('wine').get('version')
# Old lutris versions used a version + arch tuple, we now include
# everything in the version.
# Before that change every wine runner was for i386
if '-' not in wine_version:
wine_version += '-i386'
if wine_version and task_name == 'wineexec':
if not wine.is_version_installed(wine_version):
Gdk.threads_init()
Gdk.threads_enter()
runner.install(wine_version)
Gdk.threads_leave()
data['wine_path'] = wine.get_wine_version_exe(wine_version)
# Check/install other runner
elif not runner.is_installed():
Gdk.threads_init()
Gdk.threads_enter()
runner.install()
Gdk.threads_leave()
for key in data:
data[key] = self._substitute(data[key])
task = import_task(runner_name, task_name)
task(**data)
def write_config(self, params):
self._check_required_params(['file', 'section', 'key', 'value'],
params, 'move')
"""Write a key-value pair into an INI type config file."""
# Get file
config_file = self._get_file(params['file'])
if not config_file:
config_file = self._substitute(params['file'])
# Create it if necessary
basedir = os.path.dirname(config_file)
if not os.path.exists(basedir):
os.makedirs(basedir)
parser = EvilConfigParser(allow_no_value=True,
dict_type=MultiOrderedDict)
parser.optionxform = str # Preserve text case
parser.read(config_file)
if not parser.has_section(params['section']):
parser.add_section(params['section'])
parser.set(params['section'], params['key'], params['value'])
with open(config_file, 'wb') as f:
parser.write(f)
class ScriptInterpreter(Commands):
"""Convert raw installer script data into actions."""
def __init__(self, script, parent):
@ -380,6 +57,7 @@ class ScriptInterpreter(Commands):
self.files = []
self.target_path = None
self.parent = parent
self.reversion_data = {}
self.game_name = None
self.game_slug = None
self.game_files = {}
@ -452,6 +130,7 @@ class ScriptInterpreter(Commands):
if self.should_create_target:
os.makedirs(self.target_path)
self.reversion_data['created_main_dir'] = True
if len(self.game_files) < len(self.files):
logger.info(
@ -757,6 +436,19 @@ class ScriptInterpreter(Commands):
if os.path.exists(self.download_cache_path):
shutil.rmtree(self.download_cache_path)
# --------------
# Revert install
# --------------
def revert(self):
if os.path.exists(self.download_cache_path):
shutil.rmtree(self.download_cache_path)
if self.reversion_data.get('created_main_dir'):
if os.path.exists(self.target_path):
shutil.rmtree(self.target_path)
return
# -------------
# Utility stuff
# -------------

View file

@ -1,9 +1,10 @@
from unittest import TestCase
from lutris.installer import ScriptInterpreter, ScriptingError
from lutris.installer.interpreter import ScriptInterpreter
from lutris.installer.errors import ScriptingError
class MockInterpreter(ScriptInterpreter):
""" a script interpreter mock """
"""A script interpreter mock."""
script = {'runner': 'linux'}
def is_valid(self):