Deprecate pga module and create database package

This commit is contained in:
Mathieu Comandon 2020-08-08 20:39:25 -07:00 committed by Mathieu Comandon
parent 1f72e63880
commit d6c81d7977
48 changed files with 931 additions and 991 deletions

View file

@ -65,7 +65,6 @@ disable=
inconsistent-return-statements,
unsubscriptable-object,
not-an-iterable,
no-member,
unused-argument,
bare-except,
too-many-statements,

View file

View file

@ -0,0 +1,60 @@
from lutris import settings
from lutris.database import sql
PGA_DB = settings.PGA_DB
def get_categories():
"""Get the list of every category in database."""
return sql.db_select(PGA_DB, "categories",)
def get_category(name):
"""Return a category by name"""
categories = sql.db_select(PGA_DB, "categories", condition=("name", name))
if categories:
return categories[0]
def get_games_in_category(category_name):
"""Get the ids of games in database."""
query = (
"select game_id from games_categories "
"JOIN categories ON categories.id = games_categories.category_id "
"WHERE categories.name=?"
)
return [
game["game_id"]
for game in sql.db_query(PGA_DB, query, (category_name, ))
]
def get_categories_in_game(game_id):
"""Get the categories of a game in database."""
query = (
"select categories.name from categories "
"JOIN games_categories ON categories.id = games_categories.category_id "
"JOIN games ON games.id = games_categories.game_id "
"WHERE games.id=?"
)
return [
category["name"]
for category in sql.db_query(PGA_DB, query, (game_id,))
]
def add_category(category_name):
"""Add a category to the database"""
return sql.db_insert(PGA_DB, "categories", {"name": category_name})
def add_game_to_category(game_id, category_id):
"""Add a category to a game"""
return sql.db_insert(PGA_DB, "games_categories", {"game_id": game_id, "category_id": category_id})
def remove_category_from_game(game_id, category_id):
"""Remove a category from a game"""
query = "DELETE FROM games_categories WHERE category_id=? AND game_id=?"
with sql.db_cursor(PGA_DB) as cursor:
sql.cursor_execute(cursor, query, (category_id, game_id))

251
lutris/database/games.py Normal file
View file

@ -0,0 +1,251 @@
import math
import time
from itertools import chain
from lutris import settings
from lutris.database import sql
from lutris.util.log import logger
from lutris.util.strings import slugify
PGA_DB = settings.PGA_DB
def get_games(
name_filter=None,
filter_installed=False,
extra_filters=None,
ordering="slug"
):
"""Get the list of every game in database."""
query = "select * from games"
params = []
filters = []
if name_filter:
filters.append("name LIKE ?")
params.append(name_filter)
if filter_installed:
filters.append("installed = 1")
for field in extra_filters or {}:
filters.append("%s = ?" % field)
params.append(extra_filters[field])
if filters:
query += " WHERE " + " AND ".join(filters)
query += " ORDER BY slug"
return sql.db_query(PGA_DB, query, tuple(params))
def get_game_ids():
"""Return a list of ids of games in the database."""
return [game["id"] for game in get_games()]
def get_games_where(**conditions):
"""
Query games table based on conditions
Args:
conditions (dict): named arguments with each field matches its desired value.
Special values for field names can be used:
<field>__isnull will return rows where `field` is NULL if the value is True
<field>__not will invert the condition using `!=` instead of `=`
<field>__in will match rows for every value of `value`, which should be an iterable
Returns:
list: Rows matching the query
"""
query = "select * from games"
condition_fields = []
condition_values = []
for field, value in conditions.items():
field, *extra_conditions = field.split("__")
if extra_conditions:
extra_condition = extra_conditions[0]
if extra_condition == "isnull":
condition_fields.append("{} is {} null".format(field, "" if value else "not"))
if extra_condition == "not":
condition_fields.append("{} != ?".format(field))
condition_values.append(value)
if extra_condition == "in":
if not hasattr(value, "__iter__"):
raise ValueError("Value should be an iterable (%s given)" % value)
if len(value) > 999:
raise ValueError("SQLite limnited to a maximum of 999 parameters.")
if value:
condition_fields.append("{} in ({})".format(field, ", ".join("?" * len(value)) or ""))
condition_values = list(chain(condition_values, value))
else:
condition_fields.append("{} = ?".format(field))
condition_values.append(value)
condition = " AND ".join(condition_fields)
if condition:
query = " WHERE ".join((query, condition))
else:
# Inspect and document why we should return
# an empty list when no condition is present.
return []
return sql.db_query(PGA_DB, query, tuple(condition_values))
def get_games_by_ids(game_ids):
# sqlite limits the number of query parameters to 999, to
# bypass that limitation, divide the query in chunks
size = 999
return list(
chain.from_iterable(
[
get_games_where(id__in=list(game_ids)[page * size:page * size + size])
for page in range(math.ceil(len(game_ids) / size))
]
)
)
def get_game_by_field(value, field="slug"):
"""Query a game based on a database field"""
if field not in ("slug", "installer_slug", "id", "configpath", "steamid"):
raise ValueError("Can't query by field '%s'" % field)
game_result = sql.db_select(PGA_DB, "games", condition=(field, value))
if game_result:
return game_result[0]
return {}
def get_games_by_runner(runner):
"""Return all games using a specific runner"""
return sql.db_select(PGA_DB, "games", condition=("runner", runner))
def get_games_by_slug(slug):
"""Return all games using a specific slug"""
return sql.db_select(PGA_DB, "games", condition=("slug", slug))
def add_game(name, **game_data):
"""Add a game to the PGA database."""
game_data["name"] = name
game_data["installed_at"] = int(time.time())
if "slug" not in game_data:
game_data["slug"] = slugify(name)
return sql.db_insert(PGA_DB, "games", game_data)
def add_games_bulk(games):
"""
Add a list of games to the PGA database.
The dicts must have an identical set of keys.
Args:
games (list): list of games in dict format
Returns:
list: List of inserted game ids
"""
return [sql.db_insert(PGA_DB, "games", game) for game in games]
def add_or_update(**params):
"""Add a game to the PGA or update an existing one
If an 'id' is provided in the parameters then it
will try to match it, otherwise it will try matching
by slug, creating one when possible.
"""
game_id = get_matching_game(params)
if game_id:
params["id"] = game_id
sql.db_update(PGA_DB, "games", params, ("id", game_id))
return game_id
return add_game(**params)
def get_matching_game(params):
"""Tries to match given parameters with an existing game"""
# Always match by ID if provided
if params.get("id"):
game = get_game_by_field(params["id"], "id")
if game:
return game["id"]
logger.warning("Game ID %s provided but couldn't be matched", params["id"])
slug = params.get("slug") or slugify(params.get("name"))
if not slug:
raise ValueError("Can't add or update without an identifier")
for game in get_games_by_slug(slug):
if game["installed"]:
if game["configpath"] == params.get("configpath"):
return game["id"]
else:
if (game["runner"] == params.get("runner") or not all([params.get("runner"), game["runner"]])):
return game["id"]
return None
def delete_game(game_id):
"""Delete a game from the PGA."""
sql.db_delete(PGA_DB, "games", "id", game_id)
def set_uninstalled(game_id):
sql.db_update(PGA_DB, "games", {"installed": 0, "runner": ""}, ("id", game_id))
def get_used_runners():
"""Return a list of the runners in use by installed games."""
with sql.db_cursor(PGA_DB) as cursor:
query = "select distinct runner from games where runner is not null order by runner"
rows = cursor.execute(query)
results = rows.fetchall()
return [result[0] for result in results if result[0]]
def get_used_runners_game_count():
"""Return a dictionary listing for each runner in use, how many games are using it."""
with sql.db_cursor(PGA_DB) as cursor:
query = "select runner, count(*) from games where runner is not null group by runner order by runner"
rows = cursor.execute(query)
results = rows.fetchall()
return {result[0]: result[1] for result in results if result[0]}
def get_used_platforms():
"""Return a list of platforms currently in use"""
with sql.db_cursor(PGA_DB) as cursor:
query = (
"select distinct platform from games "
"where platform is not null and platform is not '' order by platform"
)
rows = cursor.execute(query)
results = rows.fetchall()
return [result[0] for result in results if result[0]]
def get_used_platforms_game_count():
"""Return a dictionary listing for each platform in use, how many games are using it."""
with sql.db_cursor(PGA_DB) as cursor:
# The extra check for 'installed is 1' is needed because
# the platform lists don't show uninstalled games, but the platform of a game
# is remembered even after the game is uninstalled.
query = (
"select platform, count(*) from games "
"where platform is not null and platform is not '' and installed is 1 "
"group by platform "
"order by platform"
)
rows = cursor.execute(query)
results = rows.fetchall()
return {result[0]: result[1] for result in results if result[0]}
def get_hidden_ids():
"""Return a list of game IDs to be excluded from the library view"""
# Load the ignore string and filter out empty strings to prevent issues
ignores_raw = settings.read_setting("library_ignores", section="lutris", default="").split(",")
ignores = [ignore for ignore in ignores_raw if not ignore == ""]
# Turn the strings into integers
return [int(game_id) for game_id in ignores]
def set_hidden_ids(games):
"""Writes a list of game IDs that are to be hidden into the config file"""
ignores_str = [str(game_id) for game_id in games]
settings.write_setting("library_ignores", ','.join(ignores_str), section="lutris")

222
lutris/database/schema.py Normal file
View file

@ -0,0 +1,222 @@
from lutris import settings
from lutris.database import sql
from lutris.util.log import logger
PGA_DB = settings.PGA_DB
DATABASE = {
"games": [
{
"name": "id",
"type": "INTEGER",
"indexed": True
},
{
"name": "name",
"type": "TEXT"
},
{
"name": "slug",
"type": "TEXT"
},
{
"name": "installer_slug",
"type": "TEXT"
},
{
"name": "parent_slug",
"type": "TEXT"
},
{
"name": "platform",
"type": "TEXT"
},
{
"name": "runner",
"type": "TEXT"
},
{
"name": "executable",
"type": "TEXT"
},
{
"name": "directory",
"type": "TEXT"
},
{
"name": "updated",
"type": "DATETIME"
},
{
"name": "lastplayed",
"type": "INTEGER"
},
{
"name": "installed",
"type": "INTEGER"
},
{
"name": "installed_at",
"type": "INTEGER"
},
{
"name": "year",
"type": "INTEGER"
},
{
"name": "steamid",
"type": "INTEGER"
},
{
"name": "gogid",
"type": "INTEGER"
},
{
"name": "humblestoreid",
"type": "TEXT"
},
{
"name": "configpath",
"type": "TEXT"
},
{
"name": "has_custom_banner",
"type": "INTEGER"
},
{
"name": "has_custom_icon",
"type": "INTEGER"
},
{
"name": "playtime",
"type": "REAL"
},
],
"store_games": [
{
"name": "id",
"type": "INTEGER",
"indexed": True
},
{
"name": "store",
"type": "TEXT"
},
{
"name": "appid",
"type": "TEXT"
},
{
"name": "name",
"type": "TEXT"
},
{
"name": "slug",
"type": "TEXT"
},
{
"name": "logo",
"type": "TEXT"
},
{
"name": "url",
"type": "TEXT"
},
{
"name": "details",
"type": "TEXT"
},
{
"name": "lutris_slug",
"type": "TEXT"
},
],
"sources": [
{"name": "id", "type": "INTEGER", "indexed": True},
{"name": "uri", "type": "TEXT UNIQUE"},
],
"categories": [
{"name": "id", "type": "INTEGER", "indexed": True},
{"name": "name", "type": "TEXT", "unique": True},
],
"games_categories": [
{"name": "game_id", "type": "INTEGER", "indexed": False},
{"name": "category_id", "type": "INTEGER", "indexed": False},
]
}
def get_schema(tablename):
"""
Fields:
- position
- name
- type
- not null
- default
- indexed
"""
tables = []
query = "pragma table_info('%s')" % tablename
with sql.db_cursor(PGA_DB) as cursor:
for row in cursor.execute(query).fetchall():
field = {
"name": row[1],
"type": row[2],
"not_null": row[3],
"default": row[4],
"indexed": row[5],
}
tables.append(field)
return tables
def field_to_string(name="", type="", indexed=False, unique=False): # pylint: disable=redefined-builtin
"""Converts a python based table definition to it's SQL statement"""
field_query = "%s %s" % (name, type)
if indexed:
field_query += " PRIMARY KEY"
if unique:
field_query += " UNIQUE"
return field_query
def create_table(name, schema):
"""Creates a new table in the database"""
fields = ", ".join([field_to_string(**f) for f in schema])
query = "CREATE TABLE IF NOT EXISTS %s (%s)" % (name, fields)
logger.debug("[PGAQuery] %s", query)
with sql.db_cursor(PGA_DB) as cursor:
cursor.execute(query)
def migrate(table, schema):
"""Compare a database table with the reference model and make necessary changes
This is very basic and only the needed features have been implemented (adding columns)
Args:
table (str): Name of the table to migrate
schema (dict): Reference schema for the table
Returns:
list: The list of column names that have been added
"""
existing_schema = get_schema(table)
migrated_fields = []
if existing_schema:
columns = [col["name"] for col in existing_schema]
for field in schema:
if field["name"] not in columns:
logger.info("Migrating %s field %s", table, field["name"])
migrated_fields.append(field["name"])
sql.add_field(PGA_DB, table, field)
else:
create_table(table, schema)
return migrated_fields
def syncdb():
"""Update the database to the current version, making necessary changes
for backwards compatibility."""
for table in DATABASE:
migrate(table, DATABASE[table])

View file

@ -0,0 +1,54 @@
import os
from lutris import settings
from lutris.database import sql
from lutris.util import system
from lutris.util.log import logger
PGA_DB = settings.PGA_DB
def add_source(uri):
sql.db_insert(PGA_DB, "sources", {"uri": uri})
def delete_source(uri):
sql.db_delete(PGA_DB, "sources", "uri", uri)
def read_sources():
with sql.db_cursor(PGA_DB) as cursor:
rows = cursor.execute("select uri from sources")
results = rows.fetchall()
return [row[0] for row in results]
def write_sources(sources):
db_sources = read_sources()
for uri in db_sources:
if uri not in sources:
sql.db_delete(PGA_DB, "sources", "uri", uri)
for uri in sources:
if uri not in db_sources:
sql.db_insert(PGA_DB, "sources", {"uri": uri})
def check_for_file(game, file_id):
for source in read_sources():
if source.startswith("file://"):
source = source[7:]
else:
protocol = source[:7]
logger.warning("PGA source protocol %s not implemented", protocol)
continue
if not system.path_exists(source):
logger.info("PGA source %s unavailable", source)
continue
game_dir = os.path.join(source, game)
if not system.path_exists(game_dir):
continue
for game_file in os.listdir(game_dir):
game_base, _ext = os.path.splitext(game_file)
if game_base == file_id:
return os.path.join(game_dir, game_file)
return False

View file

@ -9,9 +9,11 @@ from gettext import gettext as _
from gi.repository import GLib, GObject, Gtk
from lutris import pga, runtime
from lutris import runtime
from lutris.command import MonitoredCommand
from lutris.config import LutrisConfig
from lutris.database import categories as categories_db
from lutris.database import games as games_db
from lutris.discord import DiscordPresence
from lutris.exceptions import GameConfigError, watch_lutris_errors
from lutris.gui import dialogs
@ -58,7 +60,7 @@ class Game(GObject.Object):
self.config = None
# Load attributes from database
game_data = pga.get_game_by_field(game_id, "id")
game_data = games_db.get_game_by_field(game_id, "id")
self.slug = game_data.get("slug") or ""
self.runner_name = game_data.get("runner") or ""
self.directory = game_data.get("directory") or ""
@ -106,7 +108,7 @@ class Game(GObject.Object):
@property
def is_favorite(self):
"""Return whether the game is in the user's favorites"""
categories = pga.get_categories_in_game(self.id)
categories = categories_db.get_categories_in_game(self.id)
for category in categories:
if category == "favorite":
return True
@ -114,36 +116,36 @@ class Game(GObject.Object):
def add_to_favorites(self):
"""Add the game to the 'favorite' category"""
favorite = pga.get_category("favorite")
favorite = categories_db.get_category("favorite")
if not favorite:
favorite = pga.add_category("favorite")
pga.add_game_to_category(self.id, favorite["id"])
favorite = categories_db.add_category("favorite")
categories_db.add_game_to_category(self.id, favorite["id"])
self.emit("game-updated")
def remove_from_favorites(self):
"""Remove game from favorites"""
favorite = pga.get_category("favorite")
pga.remove_category_from_game(self.id, favorite["id"])
favorite = categories_db.get_category("favorite")
categories_db.remove_category_from_game(self.id, favorite["id"])
self.emit("game-updated")
@property
def is_hidden(self):
"""Is the game hidden in the UI?"""
return self.id in pga.get_hidden_ids()
return self.id in games_db.get_hidden_ids()
def hide(self):
"""Do not show this game in the UI"""
# Append the new hidden ID and save it
ignores = pga.get_hidden_ids() + [self.id]
pga.set_hidden_ids(ignores)
ignores = games_db.get_hidden_ids() + [self.id]
games_db.set_hidden_ids(ignores)
self.emit("game-updated")
def unhide(self):
"""Remove the game from hidden games"""
# Remove the ID to unhide and save it
ignores = pga.get_hidden_ids()
ignores = games_db.get_hidden_ids()
ignores.remove(self.id)
pga.set_hidden_ids(ignores)
games_db.set_hidden_ids(ignores)
self.emit("game-updated")
@property
@ -251,15 +253,15 @@ class Game(GObject.Object):
self.runner.remove_game_data(game_path=self.directory)
# Do not keep multiple copies of the same game
existing_games = pga.get_games_where(slug=self.slug)
existing_games = games_db.get_games_where(slug=self.slug)
if len(existing_games) > 1:
from_library = True
if from_library:
logger.debug("Removing game %s from library", self.id)
pga.delete_game(self.id)
games_db.delete_game(self.id)
else:
pga.set_uninstalled(self.id)
games_db.set_uninstalled(self.id)
if self.config:
self.config.remove()
xdgshortcuts.remove_launcher(self.slug, self.id, desktop=True, menu=True)
@ -286,7 +288,7 @@ class Game(GObject.Object):
if not metadata_only:
self.config.save()
self.set_platform_from_runner()
self.id = pga.add_or_update(
self.id = games_db.add_or_update(
name=self.name,
runner=self.runner_name,
slug=self.slug,

View file

@ -24,14 +24,12 @@ import tempfile
from gettext import gettext as _
import gi
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
gi.require_version("GnomeDesktop", "3.0")
from gi.repository import Gio, GLib, Gtk
from lutris import pga, settings
from lutris import settings
from lutris.api import parse_installer_url
from lutris.command import exec_command
from lutris.database import games as games_db
from lutris.game import Game
from lutris.gui.dialogs import ErrorDialog, InstallOrPlayDialog
from lutris.gui.dialogs.issue import IssueReportWindow
@ -49,6 +47,10 @@ from lutris.util.wine.dxvk import init_dxvk_versions, wait_for_dxvk_init
from .lutriswindow import LutrisWindow
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
gi.require_version("GnomeDesktop", "3.0")
class Application(Gtk.Application):
@ -296,7 +298,7 @@ class Application(Gtk.Application):
# List game
if options.contains("list-games"):
game_list = pga.get_games()
game_list = games_db.get_games()
if options.contains("installed"):
game_list = [game for game in game_list if game["installed"]]
if options.contains("json"):
@ -372,21 +374,22 @@ class Application(Gtk.Application):
if action == "rungameid":
# Force db_game to use game id
self.run_in_background = True
db_game = pga.get_game_by_field(game_slug, "id")
db_game = games_db.get_game_by_field(game_slug, "id")
elif action == "rungame":
# Force db_game to use game slug
self.run_in_background = True
db_game = pga.get_game_by_field(game_slug, "slug")
db_game = games_db.get_game_by_field(game_slug, "slug")
elif action == "install":
# Installers can use game or installer slugs
self.run_in_background = True
db_game = pga.get_game_by_field(game_slug, "slug") \
or pga.get_game_by_field(game_slug, "installer_slug")
db_game = games_db.get_game_by_field(game_slug, "slug") \
or games_db.get_game_by_field(game_slug, "installer_slug")
else:
# Dazed and confused, try anything that might works
db_game = (
pga.get_game_by_field(game_slug, "id") or pga.get_game_by_field(game_slug, "slug")
or pga.get_game_by_field(game_slug, "installer_slug")
games_db.get_game_by_field(game_slug, "id")
or games_db.get_game_by_field(game_slug, "slug")
or games_db.get_game_by_field(game_slug, "installer_slug")
)
# If reinstall flag is passed, force the action to install

View file

@ -1,19 +1,16 @@
"""
Commonly used dialogs
isort:skip_file
Commonly used dialogs
isort:skip_file
"""
# Standard Library
# pylint: disable=no-member
import os
from gettext import gettext as _
# Third Party Libraries
import gi
gi.require_version("WebKit2", "4.0")
from gi.repository import GLib, GObject, Gtk, WebKit2
from gi.repository import GObject, Gtk, WebKit2
# Lutris Modules
from lutris import api, pga, runtime, settings
from lutris import api, settings
from lutris.database import sources as sources_db
from lutris.gui.widgets.log_text_view import LogTextView
from lutris.util import datapath
from lutris.util.log import logger
@ -39,6 +36,7 @@ class GtkBuilderDialog(GObject.Object):
}
def __init__(self, parent=None, **kwargs):
# pylint: disable=no-member
super().__init__()
ui_filename = os.path.join(datapath.get(), "ui", self.glade_file)
if not os.path.exists(ui_filename):
@ -203,34 +201,6 @@ class InstallOrPlayDialog(Gtk.Dialog):
self.destroy()
class RuntimeUpdateDialog(Gtk.Dialog):
"""Dialog showing the progress of ongoing runtime update."""
def __init__(self, parent=None):
Gtk.Dialog.__init__(self, _("Runtime updating"), parent=parent)
self.set_size_request(360, 104)
self.set_border_width(12)
progress_box = Gtk.Box()
self.progressbar = Gtk.ProgressBar()
self.progressbar.set_margin_top(40)
self.progressbar.set_margin_bottom(40)
self.progressbar.set_margin_right(20)
self.progressbar.set_margin_left(20)
progress_box.pack_start(self.progressbar, True, True, 0)
self.get_content_area().add(progress_box)
GLib.timeout_add(200, self.on_runtime_check)
self.show_all()
def on_runtime_check(self, *args, **kwargs): # pylint: disable=unused-argument
self.progressbar.pulse()
if not runtime.is_updating():
self.response(Gtk.ResponseType.OK)
self.destroy()
return False
return True
class PgaSourceDialog(GtkBuilderDialog):
glade_file = "dialog-pga-sources.ui"
dialog_object = "pga_dialog"
@ -250,7 +220,7 @@ class PgaSourceDialog(GtkBuilderDialog):
uri_column = Gtk.TreeViewColumn("URI", renderer, text=0)
self.sources_treeview.append_column(uri_column)
self.sources_treeview.set_model(self.sources_liststore)
sources = pga.read_sources()
sources = sources_db.read_sources()
for __, source in enumerate(sources):
self.sources_liststore.append((source, ))
@ -262,7 +232,7 @@ class PgaSourceDialog(GtkBuilderDialog):
return [source[0] for source in self.sources_liststore]
def on_apply(self, widget, data=None):
pga.write_sources(self.sources_list)
sources_db.write_sources(self.sources_list)
self.on_close(widget, data)
def on_add_source_button_clicked(self, widget, data=None): # pylint: disable=unused-argument
@ -392,7 +362,7 @@ class WebConnectDialog(Dialog):
self.webview.load_uri(service.login_url)
self.webview.connect("load-changed", self.on_navigation)
self.webview.connect("create", self.on_webview_popup)
self.vbox.pack_start(self.webview, True, True, 0)
self.vbox.pack_start(self.webview, True, True, 0) # pylint: disable=no-member
webkit_settings = self.webview.get_settings()
# Allow popups (Doesn't work...)
@ -433,6 +403,7 @@ class WebPopupDialog(Dialog):
"""Dialog for handling web popups"""
def __init__(self, webview, parent=None):
# pylint: disable=no-member
self.parent = parent
super(WebPopupDialog, self).__init__(title=_('Loading...'), parent=parent)
self.webview = webview
@ -507,7 +478,7 @@ class DontShowAgainDialog(Gtk.MessageDialog):
parent=None,
checkbox_message=None,
):
# pylint: disable=no-member
if settings.read_setting(setting) == "True":
logger.info("Dialog %s dismissed by user", setting)
return

View file

@ -54,7 +54,7 @@ class LogWindow(Gtk.ApplicationWindow):
self.search_entry.emit("next-match")
def attach_search_entry(self):
if self.search_entry.props.parent is None:
if self.search_entry.props.parent is None: # pylint: disable=no-member
self.vbox.pack_start(self.search_entry, False, False, 0)
self.show_all()
self.search_entry.grab_focus()
@ -62,7 +62,7 @@ class LogWindow(Gtk.ApplicationWindow):
self.logtextview.find_first(self.search_entry)
def dettach_search_entry(self, searched_entry):
if self.search_entry.props.parent is not None:
if self.search_entry.props.parent is not None: # pylint: disable=no-member
self.logtextview.reset_search()
self.vbox.remove(self.search_entry)
# Replace to bottom of log

View file

@ -12,9 +12,9 @@ from gi.repository import GLib, Gtk
# Lutris Modules
from lutris import api, settings
from lutris.database.games import get_games_by_runner
from lutris.game import Game
from lutris.gui.dialogs import Dialog, ErrorDialog, QuestionDialog
from lutris.pga import get_games_by_runner
from lutris.util import jobs, system
from lutris.util.downloader import Downloader
from lutris.util.extract import extract_archive

View file

@ -198,7 +198,7 @@ class RunnersDialog(GtkBuilderDialog):
self.populate_runners()
def on_close_clicked(self, _widget):
self.destroy()
self.destroy() # pylint: disable=no-member
def set_install_state(self, _widget, runner, runner_label):
if runner.is_installed():

View file

@ -9,7 +9,8 @@ from gettext import gettext as _
from gi.repository import Gtk
# Lutris Modules
from lutris import api, pga, settings
from lutris import api, settings
from lutris.database.games import add_game, get_game_by_field
from lutris.game import Game
from lutris.gui.config.add_game import AddGameDialog
from lutris.gui.dialogs import DirectoryDialog, InstallerSourceDialog, NoInstallerDialog, QuestionDialog
@ -140,7 +141,7 @@ class InstallerWindow(BaseApplicationWindow): # pylint: disable=too-many-public
webbrowser.open(settings.GAME_URL % self.game_slug)
def manually_configure_game(self):
game_data = pga.get_game_by_field(self.game_slug, "slug")
game_data = get_game_by_field(self.game_slug, "slug")
if game_data and "slug" in game_data:
# Game data already exist locally.
@ -158,7 +159,7 @@ class InstallerWindow(BaseApplicationWindow): # pylint: disable=too-many-public
"updated": remote_game["updated"],
"steamid": remote_game["steamid"],
}
game = Game(pga.add_game(**game_data))
game = Game(add_game(**game_data))
else:
game = None
AddGameDialog(self.parent, game=game)

View file

@ -1,15 +1,13 @@
"""Main window for the Lutris interface."""
# Standard Library
# pylint: disable=no-member
import os
from collections import namedtuple
from gettext import gettext as _
# Third Party Libraries
from gi.repository import Gdk, Gio, GLib, GObject, Gtk
# Lutris Modules
from lutris import api, pga, settings
from lutris import api, settings
from lutris.database import games as games_db
from lutris.game import Game
from lutris.game_actions import GameActions
from lutris.gui import dialogs
@ -94,9 +92,7 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
# Window initialization
self.game_actions = GameActions(application=application, window=self)
self.filter_type = None # Type of filter corresponding to the selected sidebar element
self.filter_value = None
self.search_terms = None
self.filters = {} # Type of filter corresponding to the selected sidebar element
self.search_timer_id = None
self.search_mode = "local"
self.game_store = None
@ -246,19 +242,18 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
self.sync_services()
def hidden_state_change(self, action, value):
"""Hides or shows the hidden games"""
action.set_state(value)
# Add or remove hidden games
ignores = pga.get_hidden_ids()
ignores = games_db.get_hidden_ids()
settings.write_setting("show_hidden_games", str(self.show_hidden_games).lower(), section="lutris")
# If we have to show the hidden games now, we need to add them back to
# the view. If we need to hide them, we just remove them from the view
if value:
self.game_store.add_games(pga.get_games_by_ids(ignores))
self.game_store.add_games(games_db.get_games_by_ids(ignores))
else:
for game_id in ignores:
self.game_store.remove_game(game_id)
@ -323,20 +318,23 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
return settings.read_setting("show_hidden_games").lower() == "true"
def get_games_from_filters(self):
if self.filter_type == "dynamic_category":
if "dynamic_category" in self.filters:
raise NotImplementedError
if not self.filter_type:
filters = {}
if "category" in self.filters:
raise NotImplementedError
if "text" in self.filters:
search_query = self.filters.pop("text")
else:
filters = {self.filter_type: self.filter_value}
games = pga.get_games(extra_filters=filters)
search_query = None
games = games_db.get_games(
name_filter=search_query,
extra_filters=self.filters
)
logger.info("Returned %s games from filters", len(games))
return games
def get_store(self):
"""Return an instance of the game store"""
print("get store")
# games = self.get_games_from_filters()
game_store = GameStore(
[],
self.icon_type,
@ -350,20 +348,16 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
return game_store
def update_store(self, games=None):
print("update store")
self.view.set_model(Gtk.ListStore())
self.game_store.games = []
self.game_store.modelfilter.clear_cache()
self.game_store.store.clear()
self.game_store.modelfilter.refilter()
games = games or self.get_games_from_filters()
for game in games:
self.game_store.add_game(game)
return self.game_store
return False
def update_game_by_id(self, game_id):
"""Update the view by DB ID"""
pga_game = pga.get_game_by_field(game_id, "id")
pga_game = games_db.get_game_by_field(game_id, "id")
if pga_game:
return self.game_store.update(pga_game)
return self.game_store.remove_game(game_id)
@ -389,7 +383,7 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
try:
self.update_game_by_id(game_id)
except ValueError:
self.game_store.add_games(pga.get_games_by_ids([game_id]))
self.game_store.add_games(games_db.get_games_by_ids([game_id]))
for game_id in removed_games:
self.update_game_by_id(game_id)
@ -405,7 +399,7 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
# self.running_game.notify_steam_game_changed(appmanifest)
runner_name = appmanifest.get_runner_name()
games = pga.get_games_where(steamid=appmanifest.steamid)
games = games_db.get_games_where(steamid=appmanifest.steamid)
if operation == Gio.FileMonitorEvent.DELETED:
for game in games:
if game["runner"] == runner_name:
@ -502,11 +496,6 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
self.icon_type = default
return self.icon_type
def reload_view(self):
logger.info("Reloading view")
self.update_store()
self.view.model = self.game_store
def switch_view(self, view_type=None):
"""Switch between grid view and list view."""
print("switch view")
@ -523,7 +512,7 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
self._connect_signals()
self.invalidate_game_filter()
GLib.idle_add(self.reload_view)
GLib.idle_add(self.update_store)
self.set_show_installed_state(self.filter_installed)
self.zoom_adjustment.props.value = list(IMAGE_SIZES.keys()).index(self.icon_type)
@ -552,7 +541,7 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
return
if result:
added_ids, updated_ids = result
self.game_store.add_games(pga.get_games_by_ids(added_ids))
self.game_store.add_games(games_db.get_games_by_ids(added_ids))
for game_id in updated_ids.difference(added_ids):
self.update_game_by_id(game_id)
else:
@ -666,8 +655,8 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
def invalidate_game_filter(self):
"""Refilter the game view based on current filters"""
self.game_store.modelfilter.refilter()
self.game_store.modelsort.clear_cache()
self.game_store.sort_view(self.view_sorting, self.view_sorting_ascending)
# self.game_store.modelsort.clear_cache()
# self.game_store.sort_view(self.view_sorting, self.view_sorting_ascending)
self.no_results_overlay.props.visible = not bool(self.game_store.games)
def on_show_installed_first_state_change(self, action, value):
@ -696,8 +685,8 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
def on_search_entry_changed(self, entry):
"""Callback for the search input keypresses"""
if self.search_mode == "local":
self.game_store.filters["text"] = entry.get_text()
self.invalidate_game_filter()
self.filters["text"] = entry.get_text().strip()
self.update_store()
elif self.search_mode == "website":
search_terms = entry.get_text().lower().strip()
self.search_spinner.props.active = True
@ -719,13 +708,12 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
@GtkTemplate.Callback
def on_website_search_toggle_toggled(self, toggle_button):
self.search_terms = self.search_entry.props.text
if toggle_button.props.active:
self.search_mode = "website"
self.search_entry.set_placeholder_text(_("Search Lutris.net"))
self.search_entry.set_icon_from_icon_name(Gtk.EntryIconPosition.PRIMARY, "folder-download-symbolic")
self.game_store.search_mode = True
self.search_games(self.search_terms)
self.search_games(self.search_entry.props.text)
else:
self.search_mode = "local"
self.search_entry.set_placeholder_text(_("Filter the list of games"))
@ -758,9 +746,9 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
self.game_selection_changed(None, None)
game.load_config()
try:
self.game_store.update_game_by_id(game.id)
self.update_game_by_id(game.id)
except ValueError:
self.game_store.add_game_by_id(game.id)
self.game_store.add_games(games_db.get_games_by_ids([game.id]))
self.game_panel.refresh()
return True
@ -774,15 +762,10 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
def search_games(self, query):
"""Search for games from the website API"""
logger.debug("%s search for :%s", self.search_mode, query)
self.search_terms = query
self.view.destroy()
self.game_store = self.get_store(api.search_games(query) if query else None)
self.filters["text"] = query
self.game_store.set_icon_type(self.icon_type)
self.game_store.load(from_search=bool(query))
self.game_store.filters["text"] = self.search_entry.props.text
self.search_spinner.props.active = False
self.switch_view(self.get_view_type())
self.invalidate_game_filter()
self.update_store()
def game_selection_changed(self, _widget, game):
"""Callback to handle the selection of a game in the view"""
@ -805,20 +788,24 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
self.game_selection_changed(panel, None)
def update_game(self, slug):
for pga_game in pga.get_games_where(slug=slug):
for pga_game in games_db.get_games_where(slug=slug):
self.game_store.update(pga_game)
@GtkTemplate.Callback
def on_add_game_button_clicked(self, *_args):
"""Add a new game manually with the AddGameDialog."""
self.add_popover.hide()
AddGameDialog(self, runner=self.selected_runner)
if "runner" in self.filters:
runner = self.filters["runner"]
else:
runner = None
AddGameDialog(self, runner=runner)
return True
def remove_game_from_view(self, game_id, from_library=False):
"""Remove a game from the view"""
self.game_store.update_game_by_id(game_id)
self.sidebar_listbox.update()
self.update_game_by_id(game_id)
self.sidebar.update()
def on_toggle_viewtype(self, *args):
self.switch_view("list" if self.current_view_type == "grid" else "grid")
@ -839,7 +826,7 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
elif self.current_view_type == "list":
settings.write_setting("icon_type_listview", self.icon_type)
self.game_store.set_icon_type(self.icon_type)
self.switch_view(self.get_view_type())
self.switch_view()
def on_icontype_state_change(self, action, value):
action.set_state(value)
@ -883,24 +870,9 @@ class LutrisWindow(Gtk.ApplicationWindow): # pylint: disable=too-many-public-me
def on_sidebar_changed(self, widget):
row = widget.get_selected_row()
if row is None:
self.set_selected_filter(None, None, None)
elif row.type == "runner":
self.set_selected_filter(row.id, None, None)
elif row.type == "category":
self.set_selected_filter(None, None, row.id)
else:
self.set_selected_filter(None, row.id, None)
def set_selected_filter(self, runner, platform, category):
"""Filter the view to a given runner and platform"""
self.selected_runner = runner
self.selected_platform = platform
self.selected_category = category
self.game_store.filters["runner"] = self.selected_runner
self.game_store.filters["platform"] = self.selected_platform
self.game_store.filters["category"] = self.selected_category
self.invalidate_game_filter()
if row:
self.filters[row.type] = row.id
GLib.idle_add(self.update_store)
def show_invalid_credential_warning(self):
dialogs.ErrorDialog(_("Could not connect to your Lutris account. Please sign in again."))

View file

@ -1,14 +1,13 @@
# Third Party Libraries
from gi.repository import Gdk, GObject
# Lutris Modules
from lutris import pga
from lutris.database.games import get_games_by_slug
from lutris.game import Game
from lutris.gui.views import COL_ID, COL_INSTALLED, COL_NAME, COL_SLUG
from lutris.util.log import logger
class GameView:
# pylint: disable=no-member
__gsignals__ = {
"game-selected": (GObject.SIGNAL_RUN_FIRST, None, (Game, )),
"game-activated": (GObject.SIGNAL_RUN_FIRST, None, (Game, )),
@ -41,15 +40,15 @@ class GameView:
model = self.get_model()
game_id = model.get_value(selected_item, COL_ID)
game_slug = model.get_value(selected_item, COL_SLUG)
pga_game = pga.get_games_by_slug(game_slug)
pga_game = get_games_by_slug(game_slug)
if game_id > 0:
selected_game = Game(game_id)
elif pga_game:
selected_game = Game(pga_game[0]["id"])
else:
logger.debug("Don't query the game from anywhere")
selected_game = Game(game_id)
selected_game.id = game_id
selected_game.slug = game_slug
selected_game.slug = model.get_value(selected_item, COL_SLUG)
selected_game.name = model.get_value(selected_item, COL_NAME)
selected_game.installed = model.get_value(selected_item, COL_INSTALLED)
return selected_game
@ -63,4 +62,5 @@ class GameView:
return
key = event.keyval
if key == Gdk.KEY_Delete:
logger.debug("Emit remove-game")
self.emit("remove-game")

View file

@ -15,8 +15,9 @@ class GameGridView(Gtk.IconView, GameView):
def __init__(self, store):
self.game_store = store
self.model = self.game_store.modelsort
super().__init__(model=self.model)
self.model = self.game_store.store
super().__init__(model=self.game_store.store)
GameView.__init__(self)
self.set_column_spacing(1)
self.set_pixbuf_column(COL_ICON)

View file

@ -23,7 +23,7 @@ class GameListView(Gtk.TreeView, GameView):
def __init__(self, store):
self.game_store = store
self.model = self.game_store.modelsort
self.model = self.game_store.store
super().__init__(self.model)
self.set_rules_hint(True)
@ -81,7 +81,9 @@ class GameListView(Gtk.TreeView, GameView):
def set_column_sort(self, col):
"""Sort a column and fallback to sorting by name and runner."""
self.model.set_sort_func(col, sort_func, col)
model = self.get_model()
if model:
model.set_sort_func(col, sort_func, col)
def set_sort_with_column(self, col, sort_col):
"""Sort a column by using another column's data"""

View file

@ -8,7 +8,7 @@ from lutris.util.log import logger
from lutris.util.strings import get_formatted_playtime, gtk_safe
class PgaGame:
class GameItem:
"""Representation of a game for views
TODO: Fix overlap with Game class
"""
@ -22,7 +22,7 @@ class PgaGame:
return self.name
def __repr__(self):
return "<PgaGame id=%s slug=%s>" % (self.id, self.slug)
return "<GameItem id=%s slug=%s>" % (self.id, self.slug)
@property
def id(self): # pylint: disable=invalid-name

View file

@ -1,20 +1,19 @@
"""Store object for a list of games"""
# Standard Library
# pylint: disable=not-an-iterable
import concurrent.futures
# Third Party Libraries
from gi.repository import GLib, GObject, Gtk
from gi.repository.GdkPixbuf import Pixbuf
# Lutris Modules
from lutris import api, pga
from lutris.gui.views.pga_game import PgaGame
from lutris import api
from lutris.database.games import get_games_by_slug
from lutris.gui.views.pga_game import GameItem
from lutris.gui.widgets.utils import get_pixbuf_for_game
from lutris.util import system
from lutris.util.jobs import AsyncCall
from lutris.util.log import logger
from lutris.util.resources import download_media, get_icon_path, update_desktop_icons
from lutris.util.strings import gtk_safe
from . import (
COL_ICON, COL_ID, COL_INSTALLED, COL_INSTALLED_AT, COL_INSTALLED_AT_TEXT, COL_LASTPLAYED, COL_LASTPLAYED_TEXT,
@ -91,11 +90,11 @@ class GameStore(GObject.Object):
show_installed_first=False,
):
super(GameStore, self).__init__()
self.games = games or pga.get_games(show_installed_first=show_installed_first)
if not show_hidden_games:
# Check if the PGA contains game IDs that the user does not
# want to see
self.games = [game for game in self.games if game["id"] not in pga.get_hidden_ids()]
self.games = games
# if not show_hidden_games:
# # Check if the PGA contains game IDs that the user does not
# # want to see
# # self.games = [game for game in self.games if game["id"] not in pga.get_hidden_ids()]
self.search_mode = False
self.games_to_refresh = set()
@ -103,11 +102,9 @@ class GameStore(GObject.Object):
self.filters = {
"installed": filter_installed,
"text": None,
"runner": None,
"platform": None,
"category": None
}
self.show_installed_first = show_installed_first
self.games_in_category = [] # pga.get_games_in_category(self.filters["category"])
self.store = Gtk.ListStore(
int,
str,
@ -125,23 +122,16 @@ class GameStore(GObject.Object):
float,
str,
)
sort_col = COL_NAME
if show_installed_first:
sort_col = COL_INSTALLED
self.store.set_sort_column_id(sort_col, Gtk.SortType.DESCENDING)
self.sort_col = COL_INSTALLED
self.store.set_sort_column_id(self.sort_col, Gtk.SortType.DESCENDING)
else:
self.store.set_sort_column_id(sort_col, Gtk.SortType.ASCENDING)
self.prevent_sort_update = False # prevent recursion with signals
self.sort_col = COL_NAME
self.store.set_sort_column_id(self.sort_col, Gtk.SortType.ASCENDING)
self.modelfilter = self.store.filter_new()
self.modelfilter.set_visible_func(self.filter_view)
try:
self.modelsort = Gtk.TreeModelSort.sort_new_with_model(self.modelfilter)
except AttributeError:
# Apparently some API breaking changes on GTK minor versions.
self.modelsort = Gtk.TreeModelSort.new_with_model(self.modelfilter) # pylint: disable=no-member # NOQA
self.modelsort.connect("sort-column-changed", self.on_sort_column_changed)
self.modelsort.set_sort_func(sort_col, sort_func, sort_col)
self.sort_view(sort_key, sort_ascending)
self.prevent_sort_update = False # prevent recursion with signals
# self.sort_view(sort_key, sort_ascending)
self.medias = {"banner": {}, "icon": {}}
self.banner_misses = set()
self.icon_misses = set()
@ -215,49 +205,39 @@ class GameStore(GObject.Object):
filter_defs = {
"installed": lambda: not model.get_value(_iter, COL_INSTALLED),
"text": lambda: self.filters["text"].lower() not in model.get_value(_iter, COL_NAME).lower(),
"runner": lambda: self.filters["runner"] != model.get_value(_iter, COL_RUNNER),
"platform": lambda: self.filters["platform"] != model.get_value(_iter, COL_PLATFORM),
"category": lambda: (
model.get_value(_iter, COL_ID)
not in pga.get_games_in_category(self.filters["category"])
),
}
for filter_key in self.filters:
if self.filters[filter_key] and filter_defs[filter_key]():
return False
return True
def sort_view(self, key="name", ascending=True):
"""Sort the model on a given column name"""
try:
sort_column = self.sort_columns[key]
except KeyError:
logger.error("Invalid column name '%s'", key)
sort_column = COL_NAME
self.modelsort.set_sort_column_id(
sort_column,
Gtk.SortType.ASCENDING if ascending else Gtk.SortType.DESCENDING,
)
# def sort_view(self, key="name", ascending=True):
# """Sort the model on a given column name"""
# try:
# sort_column = self.sort_columns[key]
# except KeyError:
# logger.error("Invalid column name '%s'", key)
# sort_column = COL_NAME
# self.modelsort.set_sort_column_id(
# sort_column,
# Gtk.SortType.ASCENDING if ascending else Gtk.SortType.DESCENDING,
# )
def on_sort_column_changed(self, model):
if self.prevent_sort_update:
return
(col, direction) = model.get_sort_column_id()
key = next((c for c, k in self.sort_columns.items() if k == col), None)
ascending = direction == Gtk.SortType.ASCENDING
self.prevent_sort_update = True
if not key:
raise ValueError("Invalid sort key for col %s" % col)
self.sort_view(key, ascending)
self.prevent_sort_update = False
self.emit("sorting-changed", key, ascending)
# def on_sort_column_changed(self, model):
# if self.prevent_sort_update:
# return
# (col, direction) = model.get_sort_column_id()
# key = next((c for c, k in self.sort_columns.items() if k == col), None)
# ascending = direction == Gtk.SortType.ASCENDING
# self.prevent_sort_update = True
# if not key:
# raise ValueError("Invalid sort key for col %s" % col)
# self.sort_view(key, ascending)
# self.prevent_sort_update = False
# self.emit("sorting-changed", key, ascending)
def get_row_by_id(self, game_id, filtered=False):
if filtered:
store = self.modelsort
else:
store = self.store
for model_row in store:
for model_row in self.store:
if model_row[COL_ID] == int(game_id):
return model_row
@ -286,15 +266,9 @@ class GameStore(GObject.Object):
if row:
self.store.remove(row.iter)
def update_game_by_id(self, game_id):
pga_game = pga.get_game_by_field(game_id, "id")
if pga_game:
return self.update(pga_game)
return self.remove_game(game_id)
def update(self, pga_game):
"""Update game informations."""
game = PgaGame(pga_game)
game = GameItem(pga_game)
if self.search_mode:
row = self.get_row_by_slug(game.slug)
else:
@ -303,12 +277,12 @@ class GameStore(GObject.Object):
raise ValueError("No existing row for game %s" % game.slug)
row[COL_ID] = game.id
row[COL_SLUG] = game.slug
row[COL_NAME] = game.name
row[COL_NAME] = gtk_safe(game.name)
row[COL_ICON] = game.get_pixbuf(self.icon_type)
row[COL_YEAR] = game.year
row[COL_RUNNER] = game.runner
row[COL_RUNNER_HUMAN_NAME] = game.runner_text
row[COL_PLATFORM] = game.platform
row[COL_RUNNER_HUMAN_NAME] = gtk_safe(game.runner_text)
row[COL_PLATFORM] = gtk_safe(game.platform)
row[COL_LASTPLAYED] = game.lastplayed
row[COL_LASTPLAYED_TEXT] = game.lastplayed_text
row[COL_INSTALLED] = game.installed
@ -331,7 +305,7 @@ class GameStore(GObject.Object):
if self.search_mode:
GLib.idle_add(self.update_icon, game_slug)
return
for pga_game in pga.get_games_by_slug(game_slug):
for pga_game in get_games_by_slug(game_slug):
logger.debug("Updating %s", pga_game["id"])
GLib.idle_add(self.update, pga_game)
@ -388,16 +362,9 @@ class GameStore(GObject.Object):
if media_type == "icon":
update_desktop_icons()
def add_games_by_ids(self, game_ids):
self.add_games(pga.get_games_by_ids(game_ids))
def add_game_by_id(self, game_id):
"""Add a game into the store."""
return self.add_games_by_ids([game_id])
def add_game(self, pga_game):
"""Add a PGA game to the store"""
game = PgaGame(pga_game)
game = GameItem(pga_game)
self.games.append(pga_game)
self.store.append(
(
@ -408,7 +375,7 @@ class GameStore(GObject.Object):
game.year,
game.runner,
game.runner_text,
game.platform,
gtk_safe(game.platform),
game.lastplayed,
game.lastplayed_text,
game.installed,
@ -421,12 +388,6 @@ class GameStore(GObject.Object):
if not self.has_icon(game.slug):
self.refresh_icon(game.slug)
def add_or_update(self, game_id):
try:
self.update_game_by_id(game_id)
except ValueError:
self.add_game_by_id(game_id)
def set_icon_type(self, icon_type):
"""Change the icon type"""
if icon_type == self.icon_type:

View file

@ -2,7 +2,8 @@
from gi.repository import Gtk
class LogTextView(Gtk.TextView): # pylint: disable=no-member
class LogTextView(Gtk.TextView):
# pylint: disable=no-member
def __init__(self, buffer=None, autoscroll=True):
super().__init__()

View file

@ -185,6 +185,7 @@ class ServiceSyncBox(Gtk.Box):
else:
added_message = _("No games were added. ")
# XXX This is the problematic part, don't worry about this code
if skipped_import:
skipped_message = gettext.ngettext(
"%s game is already in the library",
@ -195,7 +196,10 @@ class ServiceSyncBox(Gtk.Box):
send_notification(_("Games imported"), added_message + skipped_message)
for game_id in added_games:
window.game_store.add_or_update(game_id)
try:
window.update_game_by_id(game_id)
except ValueError:
window.game_store.add_games([])
def on_switch_changed(self, switch, _data):
write_setting("sync_at_startup", switch.get_active(), self.identifier)

View file

@ -3,7 +3,9 @@ from gettext import gettext as _
from gi.repository import GObject, Gtk, Pango
from lutris import pga, platforms, runners
from lutris import platforms, runners
from lutris.database import categories as categories_db
from lutris.database import games as games_db
from lutris.game import Game
from lutris.gui.config.runner import RunnerConfigDialog
from lutris.gui.dialogs.runner_install import RunnerInstallDialog
@ -18,8 +20,20 @@ GAMECOUNT = 4
class SidebarRow(Gtk.ListBoxRow):
"""A row in the sidebar containing possible action buttons"""
MARGIN = 9
SPACING = 6
def __init__(self, id_, type_, name, icon, application=None):
"""Initialize the row
Parameters:
id_: identifier of the row
type: type of row to display (still used?)
name (str): Text displayed on the row
icon (GtkImage): icon displayed next to the label
application (GtkApplication): reference to the running application
"""
super().__init__()
self.application = application
self.type = type_
@ -27,30 +41,30 @@ class SidebarRow(Gtk.ListBoxRow):
self.btn_box = None
self.runner = None
self.box = Gtk.Box(spacing=6, margin_start=9, margin_end=9)
# Construct the left column icon space.
if icon:
self.box.add(icon)
else:
# Place a spacer if there is no loaded icon.
icon = Gtk.Box(spacing=6, margin_start=9, margin_end=9)
self.box.add(icon)
label = Gtk.Label(
label=name,
halign=Gtk.Align.START,
hexpand=True,
margin_top=6,
margin_bottom=6,
ellipsize=Pango.EllipsizeMode.END,
)
self.box.add(label)
self.box = Gtk.Box(spacing=self.SPACING, margin_start=self.MARGIN, margin_end=self.MARGIN)
self.add(self.box)
if not icon:
icon = Gtk.Box(spacing=self.SPACING, margin_start=self.MARGIN, margin_end=self.MARGIN)
self.box.add(icon)
self.box.add(
Gtk.Label(
label=name,
halign=Gtk.Align.START,
hexpand=True,
margin_top=self.SPACING,
margin_bottom=self.SPACING,
ellipsize=Pango.EllipsizeMode.END,
)
)
class RunnerSidebarRow(SidebarRow):
def _create_button_box(self):
self.btn_box = Gtk.Box(spacing=3, no_show_all=True, valign=Gtk.Align.CENTER, homogeneous=True)
self.box.add(self.btn_box)
# Creation is delayed because only installed runners can be imported
# and all visible boxes should be installed.
@ -73,8 +87,6 @@ class SidebarRow(Gtk.ListBoxRow):
btn.connect("clicked", entry[2])
self.btn_box.add(btn)
self.box.add(self.btn_box)
def on_configure_runner(self, *_args):
self.application.show_window(RunnerConfigDialog, runner=self.runner)
@ -95,6 +107,7 @@ class SidebarRow(Gtk.ListBoxRow):
class SidebarHeader(Gtk.Box):
"""Header shown on top of each sidebar section"""
def __init__(self, name):
super().__init__(orientation=Gtk.Orientation.VERTICAL)
@ -120,7 +133,7 @@ class SidebarHeader(Gtk.Box):
self.show_all()
class SidebarListBox(Gtk.ListBox):
class LutrisSidebar(Gtk.ListBox):
__gtype_name__ = "LutrisSidebar"
def __init__(self, application):
@ -128,10 +141,10 @@ class SidebarListBox(Gtk.ListBox):
self.application = application
self.get_style_context().add_class("sidebar")
self.installed_runners = []
self.active_platforms = pga.get_used_platforms()
self.active_platforms = games_db.get_used_platforms()
self.runners = sorted(runners.__all__)
self.platforms = sorted(platforms.__all__)
self.categories = pga.get_categories()
self.categories = categories_db.get_categories()
GObject.add_emission_hook(RunnersDialog, "runner-installed", self.update)
GObject.add_emission_hook(RunnersDialog, "runner-removed", self.update)
@ -140,17 +153,32 @@ class SidebarListBox(Gtk.ListBox):
load_icon_theme()
icon = Gtk.Image.new_from_icon_name("favorite-symbolic", Gtk.IconSize.MENU)
self.add(SidebarRow("favorite", "category", _("Favorites"), icon))
self.add(
SidebarRow(
"running",
"dynamic_category",
_("Running"),
Gtk.Image.new_from_icon_name("media-playback-start-symbolic", Gtk.IconSize.MENU)
)
)
all_row = SidebarRow(None, "runner", _("All"), None)
self.add(
SidebarRow(
"favorite",
"category",
_("Favorites"),
Gtk.Image.new_from_icon_name("favorite-symbolic", Gtk.IconSize.MENU)
)
)
all_row = RunnerSidebarRow(None, "runner", _("All"), None)
self.add(all_row)
self.select_row(all_row)
for runner_name in self.runners:
icon_name = runner_name.lower().replace(" ", "") + "-symbolic"
icon = Gtk.Image.new_from_icon_name(icon_name, Gtk.IconSize.MENU)
runner = runners.import_runner(runner_name)()
self.add(SidebarRow(runner_name, "runner", runner.human_name, icon, application=self.application))
self.add(RunnerSidebarRow(runner_name, "runner", runner.human_name, icon, application=self.application))
self.add(SidebarRow(None, "platform", _("All"), None))
for platform in self.platforms:
@ -164,7 +192,7 @@ class SidebarListBox(Gtk.ListBox):
self.show_all()
def _filter_func(self, row):
if not row or not row.id or row.type == "category":
if not row or not row.id or row.type in ("category", "dynamic_category"):
return True
if row.type == "runner":
if row.id is None:
@ -184,6 +212,6 @@ class SidebarListBox(Gtk.ListBox):
def update(self, *_args):
self.installed_runners = [runner.name for runner in runners.get_installed()]
self.active_platforms = pga.get_used_platforms()
self.active_platforms = games_db.get_used_platforms()
self.invalidate_filter()
return True

View file

@ -1,12 +1,10 @@
"""AppIndicator based tray icon"""
from gettext import gettext as _
# Third Party Libraries
import gi
from gi.repository import Gtk
# Lutris Modules
from lutris import pga
from lutris.database.games import get_games
from lutris.game import Game
from lutris.gui.widgets.utils import get_pixbuf_for_game
@ -101,7 +99,7 @@ class LutrisStatusIcon:
@staticmethod
def add_games():
"""Adds installed games in order of last use"""
installed_games = pga.get_games(filter_installed=True)
installed_games = get_games(filter_installed=True)
installed_games.sort(
key=lambda game: max(game["lastplayed"] or 0, game["installed_at"] or 0),
reverse=True,

View file

@ -4,8 +4,9 @@ import os
import yaml
from lutris import pga, settings
from lutris import settings
from lutris.config import LutrisConfig, make_game_config_id
from lutris.database.games import add_or_update, get_game_by_field
from lutris.game import Game
from lutris.installer.errors import ScriptingError
from lutris.installer.installer_file import InstallerFile
@ -51,7 +52,7 @@ class LutrisInstaller: # pylint: disable=too-many-instance-attributes
"""Return the ID of the game in the local DB if one exists"""
# If the game is in the library and uninstalled, the first installation
# updates it
existing_game = pga.get_game_by_field(self.game_slug, "slug")
existing_game = get_game_by_field(self.game_slug, "slug")
if existing_game and not existing_game["installed"]:
return existing_game["id"]
@ -250,7 +251,7 @@ class LutrisInstaller: # pylint: disable=too-many-instance-attributes
if self.requires:
# Load the base game config
required_game = pga.get_game_by_field(self.requires, field="installer_slug")
required_game = get_game_by_field(self.requires, field="installer_slug")
base_config = LutrisConfig(
runner_slug=self.runner, game_config_id=required_game["configpath"]
)
@ -258,7 +259,7 @@ class LutrisInstaller: # pylint: disable=too-many-instance-attributes
else:
config = {"game": {}}
self.game_id = pga.add_or_update(
self.game_id = add_or_update(
name=self.game_name,
runner=self.runner,
slug=self.game_slug,

View file

@ -1,9 +1,8 @@
"""Manipulates installer files"""
# Standard Library
import os
from urllib.parse import urlparse
from lutris import cache, pga, settings
from lutris import cache, settings
from lutris.installer.errors import ScriptingError
from lutris.util import system
from lutris.util.log import logger
@ -137,12 +136,6 @@ class InstallerFile:
if not system.path_exists(self.cache_path):
os.makedirs(self.cache_path)
def pga_uri(self):
"""Return the URI of the file stored in the PGA
This isn't used yet, it looks in the PGA sources
"""
return pga.check_for_file(self.game_slug, self.id)
def check_hash(self):
"""Checks the checksum of `file` and compare it to `value`

View file

@ -4,8 +4,9 @@ import os
import yaml
from gi.repository import GLib, GObject
from lutris import pga, settings
from lutris import settings
from lutris.config import LutrisConfig
from lutris.database.games import get_game_by_field
from lutris.gui.dialogs import WineNotInstalledWarning
from lutris.gui.dialogs.download import simple_downloader
from lutris.installer.commands import CommandsMixin
@ -115,10 +116,10 @@ class ScriptInterpreter(GObject.Object, CommandsMixin):
@staticmethod
def _get_installed_dependency(dependency):
"""Return whether a dependency is installed"""
game = pga.get_game_by_field(dependency, field="installer_slug")
game = get_game_by_field(dependency, field="installer_slug")
if not game:
game = pga.get_game_by_field(dependency, "slug")
game = get_game_by_field(dependency, "slug")
if bool(game) and bool(game["directory"]):
return game

View file

@ -1,6 +1,6 @@
# Lutris Modules
from lutris.database.games import get_games
from lutris.game import Game
from lutris.pga import get_games
def migrate():

View file

@ -1,6 +1,6 @@
# Lutris Modules
from lutris.pga import PGA_DB, get_games
from lutris.util import sql
from lutris.database import sql
from lutris.database.games import PGA_DB, get_games
from lutris.util.log import logger

View file

@ -1,6 +1,5 @@
# Lutris Modules
from lutris.pga import PGA_DB
from lutris.util.sql import cursor_execute, db_cursor
from lutris.database.schema import PGA_DB
from lutris.database.sql import cursor_execute, db_cursor
SQL_STATEMENTS = [
"""

View file

@ -1,7 +1,6 @@
"""Migrate MESS games to MAME"""
# Lutris Modules
from lutris.database.games import get_games
from lutris.game import Game
from lutris.pga import get_games
def migrate():

View file

@ -1,585 +0,0 @@
"""Personnal Game Archive module. Handle local database of user's games."""
# Standard Library
import math
import os
import time
from itertools import chain
# Lutris Modules
from lutris import settings
from lutris.util import sql, system
from lutris.util.log import logger
from lutris.util.strings import slugify
PGA_DB = settings.PGA_DB
DATABASE = {
"games": [
{
"name": "id",
"type": "INTEGER",
"indexed": True
},
{
"name": "name",
"type": "TEXT"
},
{
"name": "slug",
"type": "TEXT"
},
{
"name": "installer_slug",
"type": "TEXT"
},
{
"name": "parent_slug",
"type": "TEXT"
},
{
"name": "platform",
"type": "TEXT"
},
{
"name": "runner",
"type": "TEXT"
},
{
"name": "executable",
"type": "TEXT"
},
{
"name": "directory",
"type": "TEXT"
},
{
"name": "updated",
"type": "DATETIME"
},
{
"name": "lastplayed",
"type": "INTEGER"
},
{
"name": "installed",
"type": "INTEGER"
},
{
"name": "installed_at",
"type": "INTEGER"
},
{
"name": "year",
"type": "INTEGER"
},
{
"name": "steamid",
"type": "INTEGER"
},
{
"name": "gogid",
"type": "INTEGER"
},
{
"name": "humblestoreid",
"type": "TEXT"
},
{
"name": "configpath",
"type": "TEXT"
},
{
"name": "has_custom_banner",
"type": "INTEGER"
},
{
"name": "has_custom_icon",
"type": "INTEGER"
},
{
"name": "playtime",
"type": "REAL"
},
],
"store_games": [
{
"name": "id",
"type": "INTEGER",
"indexed": True
},
{
"name": "store",
"type": "TEXT"
},
{
"name": "appid",
"type": "TEXT"
},
{
"name": "name",
"type": "TEXT"
},
{
"name": "slug",
"type": "TEXT"
},
{
"name": "logo",
"type": "TEXT"
},
{
"name": "url",
"type": "TEXT"
},
{
"name": "details",
"type": "TEXT"
},
{
"name": "lutris_slug",
"type": "TEXT"
},
],
"sources": [
{"name": "id", "type": "INTEGER", "indexed": True},
{"name": "uri", "type": "TEXT UNIQUE"},
],
"categories": [
{"name": "id", "type": "INTEGER", "indexed": True},
{"name": "name", "type": "TEXT", "unique": True},
],
"games_categories": [
{"name": "game_id", "type": "INTEGER", "indexed": False},
{"name": "category_id", "type": "INTEGER", "indexed": False},
]
}
def get_schema(tablename):
"""
Fields:
- position
- name
- type
- not null
- default
- indexed
"""
tables = []
query = "pragma table_info('%s')" % tablename
with sql.db_cursor(PGA_DB) as cursor:
for row in cursor.execute(query).fetchall():
field = {
"name": row[1],
"type": row[2],
"not_null": row[3],
"default": row[4],
"indexed": row[5],
}
tables.append(field)
return tables
def field_to_string(name="", type="", indexed=False, unique=False): # pylint: disable=redefined-builtin
"""Converts a python based table definition to it's SQL statement"""
field_query = "%s %s" % (name, type)
if indexed:
field_query += " PRIMARY KEY"
if unique:
field_query += " UNIQUE"
return field_query
def create_table(name, schema):
"""Creates a new table in the database"""
fields = ", ".join([field_to_string(**f) for f in schema])
query = "CREATE TABLE IF NOT EXISTS %s (%s)" % (name, fields)
logger.debug("[PGAQuery] %s", query)
with sql.db_cursor(PGA_DB) as cursor:
cursor.execute(query)
def migrate(table, schema):
"""Compare a database table with the reference model and make necessary changes
This is very basic and only the needed features have been implemented (adding columns)
Args:
table (str): Name of the table to migrate
schema (dict): Reference schema for the table
Returns:
list: The list of column names that have been added
"""
existing_schema = get_schema(table)
migrated_fields = []
if existing_schema:
columns = [col["name"] for col in existing_schema]
for field in schema:
if field["name"] not in columns:
logger.info("Migrating %s field %s", table, field["name"])
migrated_fields.append(field["name"])
sql.add_field(PGA_DB, table, field)
else:
create_table(table, schema)
return migrated_fields
def syncdb():
"""Update the database to the current version, making necessary changes
for backwards compatibility."""
for table in DATABASE:
migrate(table, DATABASE[table])
def get_games(
name_filter=None,
filter_installed=False,
filter_runner=None,
select=None,
show_installed_first=False,
):
"""Get the list of every game in database."""
query = "select * from games"
params = []
filters = []
if select:
query = "select ? from games"
params.append(select)
if name_filter:
params.append(name_filter)
filters.append("name LIKE ?")
if filter_installed:
filters.append("installed = 1")
if filter_runner:
params.append(filter_runner)
filters.append("runner = ?")
if filters:
query += " WHERE " + " AND ".join(filters)
if show_installed_first:
query += " ORDER BY installed DESC, slug"
else:
query += " ORDER BY slug"
return sql.db_query(PGA_DB, query, tuple(params))
def get_game_ids():
"""Return a list of ids of games in the database."""
games = get_games()
return [game["id"] for game in games]
def get_games_where(**conditions):
"""
Query games table based on conditions
Args:
conditions (dict): named arguments with each field matches its desired value.
Special values for field names can be used:
<field>__isnull will return rows where `field` is NULL if the value is True
<field>__not will invert the condition using `!=` instead of `=`
<field>__in will match rows for every value of `value`, which should be an iterable
Returns:
list: Rows matching the query
"""
query = "select * from games"
condition_fields = []
condition_values = []
for field, value in conditions.items():
field, *extra_conditions = field.split("__")
if extra_conditions:
extra_condition = extra_conditions[0]
if extra_condition == "isnull":
condition_fields.append("{} is {} null".format(field, "" if value else "not"))
if extra_condition == "not":
condition_fields.append("{} != ?".format(field))
condition_values.append(value)
if extra_condition == "in":
if not hasattr(value, "__iter__"):
raise ValueError("Value should be an iterable (%s given)" % value)
if len(value) > 999:
raise ValueError("SQLite limnited to a maximum of 999 parameters.")
if value:
condition_fields.append("{} in ({})".format(field, ", ".join("?" * len(value)) or ""))
condition_values = list(chain(condition_values, value))
else:
condition_fields.append("{} = ?".format(field))
condition_values.append(value)
condition = " AND ".join(condition_fields)
if condition:
query = " WHERE ".join((query, condition))
else:
# Inspect and document why we should return
# an empty list when no condition is present.
return []
return sql.db_query(PGA_DB, query, tuple(condition_values))
def get_games_by_ids(game_ids):
# sqlite limits the number of query parameters to 999, to
# bypass that limitation, divide the query in chunks
size = 999
return list(
chain.from_iterable(
[
get_games_where(id__in=list(game_ids)[page * size:page * size + size])
for page in range(math.ceil(len(game_ids) / size))
]
)
)
def get_game_by_field(value, field="slug"):
"""Query a game based on a database field"""
if field not in ("slug", "installer_slug", "id", "configpath", "steamid"):
raise ValueError("Can't query by field '%s'" % field)
game_result = sql.db_select(PGA_DB, "games", condition=(field, value))
if game_result:
return game_result[0]
return {}
def get_games_by_runner(runner):
"""Return all games using a specific runner"""
return sql.db_select(PGA_DB, "games", condition=("runner", runner))
def get_games_by_slug(slug):
"""Return all games using a specific slug"""
return sql.db_select(PGA_DB, "games", condition=("slug", slug))
def add_game(name, **game_data):
"""Add a game to the PGA database."""
game_data["name"] = name
game_data["installed_at"] = int(time.time())
if "slug" not in game_data:
game_data["slug"] = slugify(name)
return sql.db_insert(PGA_DB, "games", game_data)
def add_games_bulk(games):
"""
Add a list of games to the PGA database.
The dicts must have an identical set of keys.
Args:
games (list): list of games in dict format
Returns:
list: List of inserted game ids
"""
return [sql.db_insert(PGA_DB, "games", game) for game in games]
def add_or_update(**params):
"""Add a game to the PGA or update an existing one
If an 'id' is provided in the parameters then it
will try to match it, otherwise it will try matching
by slug, creating one when possible.
"""
game_id = get_matching_game(params)
if game_id:
params["id"] = game_id
sql.db_update(PGA_DB, "games", params, ("id", game_id))
return game_id
return add_game(**params)
def get_matching_game(params):
"""Tries to match given parameters with an existing game"""
# Always match by ID if provided
if params.get("id"):
game = get_game_by_field(params["id"], "id")
if game:
return game["id"]
logger.warning("Game ID %s provided but couldn't be matched", params["id"])
slug = params.get("slug") or slugify(params.get("name"))
if not slug:
raise ValueError("Can't add or update without an identifier")
for game in get_games_by_slug(slug):
if game["installed"]:
if game["configpath"] == params.get("configpath"):
return game["id"]
else:
if (game["runner"] == params.get("runner") or not all([params.get("runner"), game["runner"]])):
return game["id"]
return None
def delete_game(game_id):
"""Delete a game from the PGA."""
sql.db_delete(PGA_DB, "games", "id", game_id)
def set_uninstalled(game_id):
sql.db_update(PGA_DB, "games", {"installed": 0, "runner": ""}, ("id", game_id))
def add_source(uri):
sql.db_insert(PGA_DB, "sources", {"uri": uri})
def delete_source(uri):
sql.db_delete(PGA_DB, "sources", "uri", uri)
def read_sources():
with sql.db_cursor(PGA_DB) as cursor:
rows = cursor.execute("select uri from sources")
results = rows.fetchall()
return [row[0] for row in results]
def write_sources(sources):
db_sources = read_sources()
for uri in db_sources:
if uri not in sources:
sql.db_delete(PGA_DB, "sources", "uri", uri)
for uri in sources:
if uri not in db_sources:
sql.db_insert(PGA_DB, "sources", {"uri": uri})
def check_for_file(game, file_id):
for source in read_sources():
if source.startswith("file://"):
source = source[7:]
else:
protocol = source[:7]
logger.warning("PGA source protocol %s not implemented", protocol)
continue
if not system.path_exists(source):
logger.info("PGA source %s unavailable", source)
continue
game_dir = os.path.join(source, game)
if not system.path_exists(game_dir):
continue
for game_file in os.listdir(game_dir):
game_base, _ext = os.path.splitext(game_file)
if game_base == file_id:
return os.path.join(game_dir, game_file)
return False
def get_used_runners():
"""Return a list of the runners in use by installed games."""
with sql.db_cursor(PGA_DB) as cursor:
query = "select distinct runner from games where runner is not null order by runner"
rows = cursor.execute(query)
results = rows.fetchall()
return [result[0] for result in results if result[0]]
def get_used_runners_game_count():
"""Return a dictionary listing for each runner in use, how many games are using it."""
with sql.db_cursor(PGA_DB) as cursor:
query = "select runner, count(*) from games where runner is not null group by runner order by runner"
rows = cursor.execute(query)
results = rows.fetchall()
return {result[0]: result[1] for result in results if result[0]}
def get_used_platforms():
"""Return a list of platforms currently in use"""
with sql.db_cursor(PGA_DB) as cursor:
query = (
"select distinct platform from games "
"where platform is not null and platform is not '' order by platform"
)
rows = cursor.execute(query)
results = rows.fetchall()
return [result[0] for result in results if result[0]]
def get_used_platforms_game_count():
"""Return a dictionary listing for each platform in use, how many games are using it."""
with sql.db_cursor(PGA_DB) as cursor:
# The extra check for 'installed is 1' is needed because
# the platform lists don't show uninstalled games, but the platform of a game
# is remembered even after the game is uninstalled.
query = (
"select platform, count(*) from games "
"where platform is not null and platform is not '' and installed is 1 "
"group by platform "
"order by platform"
)
rows = cursor.execute(query)
results = rows.fetchall()
return {result[0]: result[1] for result in results if result[0]}
def get_hidden_ids():
"""Return a list of game IDs to be excluded from the library view"""
# Load the ignore string and filter out empty strings to prevent issues
ignores_raw = settings.read_setting("library_ignores", section="lutris", default="").split(",")
ignores = [ignore for ignore in ignores_raw if not ignore == ""]
# Turn the strings into integers
return [int(game_id) for game_id in ignores]
def set_hidden_ids(games):
"""Writes a list of game IDs that are to be hidden into the config file"""
ignores_str = [str(game_id) for game_id in games]
settings.write_setting("library_ignores", ','.join(ignores_str), section="lutris")
def get_categories():
"""Get the list of every category in database."""
return sql.db_select(PGA_DB, "categories",)
def get_category(name):
"""Return a category by name"""
categories = sql.db_select(PGA_DB, "categories", condition=("name", name))
if categories:
return categories[0]
def get_games_in_category(category_name):
"""Get the ids of games in database."""
query = (
"select game_id from games_categories "
"JOIN categories ON categories.id = games_categories.category_id "
"WHERE categories.name=?"
)
return [
game["game_id"]
for game in sql.db_query(PGA_DB, query, (category_name, ))
]
def get_categories_in_game(game_id):
"""Get the categories of a game in database."""
query = (
"select categories.name from categories "
"JOIN games_categories ON categories.id = games_categories.category_id "
"JOIN games ON games.id = games_categories.game_id "
"WHERE games.id=?"
)
return [
category["name"]
for category in sql.db_query(PGA_DB, query, (game_id,))
]
def add_category(category_name):
"""Add a category to the database"""
return sql.db_insert(PGA_DB, "categories", {"name": category_name})
def add_game_to_category(game_id, category_id):
"""Add a category to a game"""
return sql.db_insert(PGA_DB, "games_categories", {"game_id": game_id, "category_id": category_id})
def remove_category_from_game(game_id, category_id):
"""Remove a category from a game"""
query = "DELETE FROM games_categories WHERE category_id=? AND game_id=?"
with sql.db_cursor(PGA_DB) as cursor:
sql.cursor_execute(cursor, query, (category_id, game_id))

View file

@ -31,7 +31,7 @@ if os.path.exists(runner_path):
info_path = get_default_config_path("info")
if not os.path.exists(info_path):
req = requests.get("http://buildbot.libretro.com/assets/frontend/info.zip", allow_redirects=True)
if req.status_code == requests.codes.ok:
if req.status_code == requests.codes.ok: # pylint: disable=no-member
open(get_default_config_path('info.zip'), 'wb').write(req.content)
with ZipFile(get_default_config_path('info.zip'), 'r') as info_zip:
info_zip.extractall(info_path)
@ -197,7 +197,7 @@ class libretro(Runner):
except (ValueError, TypeError):
firmware_count = 0
system_path = self.get_system_directory(retro_config)
notes = retro_config["notes"] or ""
notes = str(retro_config["notes"] or "")
checksums = {}
if notes.startswith("Suggested md5sums:"):
parts = notes.split("|")

View file

@ -1,5 +1,4 @@
"""Runner for the PICO-8 fantasy console"""
# Standard Library
import json
import math
import os
@ -7,10 +6,10 @@ import shutil
from gettext import gettext as _
from time import sleep
# Lutris Modules
from lutris import pga, settings
from lutris import settings
from lutris.database.games import get_game_by_field
from lutris.runners.runner import Runner
from lutris.util import datapath, downloader, system
from lutris.util import downloader, system
from lutris.util.log import logger
from lutris.util.strings import split_arguments
@ -233,7 +232,7 @@ class pico8(Runner):
launch_info = {}
launch_info["env"] = self.get_env(os_env=False)
game_data = pga.get_game_by_field(self.config.game_config_id, "configpath")
game_data = get_game_by_field(self.config.game_config_id, "configpath")
command = self.launch_args
@ -249,10 +248,10 @@ class pico8(Runner):
command.append("--name")
command.append(game_data.get("name") + " - PICO-8")
icon = datapath.get_icon_path(game_data.get("slug"))
if icon:
command.append("--icon")
command.append(icon)
# icon = datapath.get_icon_path(game_data.get("slug"))
# if icon:
# command.append("--icon")
# command.append(icon)
webargs = {
"cartridge": self.cart_path,

View file

@ -1,15 +1,13 @@
"""Base module for runners"""
# Standard Library
import os
from gettext import gettext as _
# Third Party Libraries
from gi.repository import Gtk
# Lutris Modules
from lutris import pga, runtime, settings
from lutris import runtime, settings
from lutris.command import MonitoredCommand
from lutris.config import LutrisConfig
from lutris.database.games import get_game_by_field
from lutris.exceptions import UnavailableLibraries
from lutris.gui import dialogs
from lutris.runners import RunnerInstallationError
@ -41,7 +39,7 @@ class Runner: # pylint: disable=too-many-public-methods
"""Initialize runner."""
self.config = config
if config:
self.game_data = pga.get_game_by_field(self.config.game_config_id, "configpath")
self.game_data = get_game_by_field(self.config.game_config_id, "configpath")
else:
self.game_data = {}
@ -300,7 +298,8 @@ class Runner: # pylint: disable=too-many-public-methods
from lutris.gui.dialogs import ErrorDialog
try:
if hasattr(self, "get_version"):
self.install(downloader=simple_downloader, version=self.get_version(use_default=False))
version = self.get_version(use_default=False) # pylint: disable=no-member
self.install(downloader=simple_downloader, version=version)
else:
self.install(downloader=simple_downloader)
except RunnerInstallationError as ex:

View file

@ -1,12 +1,11 @@
"""Run web based games"""
# Standard Library
import os
import string
from gettext import gettext as _
from urllib.parse import urlparse
# Lutris Modules
from lutris import pga, settings
from lutris import settings
from lutris.database.games import get_game_by_field
from lutris.runners.runner import Runner
from lutris.util import datapath, resources, system
from lutris.util.strings import split_arguments
@ -205,7 +204,7 @@ class web(Runner):
}
url = "file://" + url
game_data = pga.get_game_by_field(self.config.game_config_id, "configpath")
game_data = get_game_by_field(self.config.game_config_id, "configpath")
# keep the old behavior from browser runner, but with support for extra arguments!
if self.runner_config.get("external_browser"):

View file

@ -1,13 +1,12 @@
"""Module for handling the GOG service"""
# Standard Library
import json
import os
import time
from gettext import gettext as _
from urllib.parse import parse_qsl, urlencode, urlparse
# Lutris Modules
from lutris import api, pga, settings
from lutris import api, settings
from lutris.database.games import add_or_update, get_game_by_field
from lutris.gui.dialogs import WebConnectDialog
from lutris.services import AuthenticationError, UnavailableGame
from lutris.services.base import OnlineService
@ -335,7 +334,7 @@ class GOGSyncer:
lutris_games = api.get_api_games(gog_ids, query_type="gogid")
added_games = []
for game in lutris_games:
lutris_data = pga.get_game_by_field(game["slug"], field="slug") or {}
lutris_data = get_game_by_field(game["slug"], field="slug") or {}
game_data = {
"name": game["name"],
"slug": game["slug"],
@ -345,7 +344,7 @@ class GOGSyncer:
"updated": game["updated"],
"gogid": game.get("gogid"), # GOG IDs will be added at a later stage in the API
}
added_games.append(pga.add_or_update(**game_data))
added_games.append(add_or_update(**game_data))
if not full:
return added_games, games
return added_games, []

View file

@ -1,12 +1,11 @@
"""Manage Humble Bundle libraries"""
# Standard Library
import json
import os
from gettext import gettext as _
from urllib.parse import urlparse
# Lutris Modules
from lutris import api, pga, settings
from lutris import api, settings
from lutris.database.games import add_or_update
from lutris.gui.dialogs import WebConnectDialog
from lutris.services.base import OnlineService
from lutris.services.service_game import ServiceGame
@ -228,7 +227,7 @@ class HumbleBundleSyncer:
"updated": game["updated"],
"humblestoreid": game["humblestoreid"],
}
added_games.append(pga.add_or_update(**game_data))
added_games.append(add_or_update(**game_data))
if not full:
return added_games, games
return added_games, []

View file

@ -5,9 +5,9 @@ import re
from configparser import ConfigParser
from gettext import gettext as _
# Lutris Modules
from lutris import pga
from lutris.config import LutrisConfig, make_game_config_id
# Lutris Modules
from lutris.database.games import add_or_update, get_games_where
from lutris.util import system
from lutris.util.log import logger
from lutris.util.strings import slugify
@ -24,7 +24,7 @@ def mark_as_installed(scummvm_id, name, path):
logger.info("Setting %s as installed", name)
slug = slugify(name)
config_id = make_game_config_id(slug)
game_id = pga.add_or_update(
game_id = add_or_update(
name=name,
runner="scummvm",
installer_slug=INSTALLER_SLUG,
@ -60,7 +60,7 @@ def sync_with_lutris():
"""Sync the ScummVM games to Lutris"""
scummvm_games = {
game["slug"]: game
for game in pga.get_games_where(runner="scummvm", installer_slug=INSTALLER_SLUG, installed=1)
for game in get_games_where(runner="scummvm", installer_slug=INSTALLER_SLUG, installed=1)
}
seen = set()
@ -70,4 +70,4 @@ def sync_with_lutris():
if slug not in scummvm_games.keys():
mark_as_installed(scummvm_id, name, path)
for slug in set(scummvm_games.keys()).difference(seen):
return pga.add_or_update(id=scummvm_games[slug]["id"], installed=0)
return add_or_update(id=scummvm_games[slug]["id"], installed=0)

View file

@ -1,6 +1,5 @@
"""Communicates between third party services games and Lutris games"""
# Lutris Modules
from lutris import pga
from lutris.database.games import add_or_update
class ServiceGame:
@ -52,7 +51,7 @@ class ServiceGame:
else:
name = self.name
slug = self.slug
self.game_id = pga.add_or_update(
self.game_id = add_or_update(
id=self.game_id,
name=name,
runner=self.runner,
@ -67,7 +66,7 @@ class ServiceGame:
def uninstall(self):
"""Uninstall a game from Lutris"""
return pga.add_or_update(id=self.game_id, installed=0)
return add_or_update(id=self.game_id, installed=0)
def create_config(self):
"""Implement this in subclasses to properly create the game config"""

View file

@ -1,12 +1,10 @@
"""Steam service"""
# Standard Library
import os
import re
from gettext import gettext as _
# Lutris Modules
from lutris import pga
from lutris.config import LutrisConfig, make_game_config_id
from lutris.database.games import get_games_where
from lutris.services.service_game import ServiceGame
from lutris.util.steam.appmanifest import AppManifest, get_appmanifests
from lutris.util.steam.config import get_steamapps_paths
@ -84,7 +82,7 @@ class SteamSyncer:
def lutris_games(self):
"""Return all Steam games present in the Lutris library"""
if not self._lutris_games:
self._lutris_games = pga.get_games_where(steamid__isnull=False, steamid__not="")
self._lutris_games = get_games_where(steamid__isnull=False, steamid__not="")
return self._lutris_games
@property

View file

@ -1,18 +1,14 @@
"""XDG applications service"""
# Standard Library
import os
import re
import shlex
import subprocess
from gettext import gettext as _
# Third Party Libraries
from gi.repository import Gio
# Lutris Modules
from lutris import pga
from lutris.config import LutrisConfig
from lutris.database.games import get_games_where
from lutris.services.service_game import ServiceGame
from lutris.util import system
from lutris.util.log import logger
@ -137,7 +133,7 @@ class XDGSyncer:
@property
def lutris_games(self):
"""Iterates through Lutris games imported from XDG"""
for game in pga.get_games_where(runner=XDGGame.runner, installer_slug=XDGGame.installer_slug, installed=1):
for game in get_games_where(runner=XDGGame.runner, installer_slug=XDGGame.installer_slug, installed=1):
yield game
@classmethod

View file

@ -1,9 +1,10 @@
"""Check to run at program start"""
# pylint: disable=no-member
import os
from gettext import gettext as _
from lutris import pga, runners, settings
from lutris import runners, settings
from lutris.database.games import get_games
from lutris.database.schema import syncdb
from lutris.game import Game
from lutris.gui.dialogs import DontShowAgainDialog
from lutris.runners.json import load_json_runners
@ -37,7 +38,7 @@ def init_dirs():
def init_db():
"""Initialize the SQLite DB"""
pga.syncdb()
syncdb()
def init_lutris():
@ -61,6 +62,7 @@ def check_driver():
gpu_info = drivers.get_nvidia_gpu_info(gpu_id)
logger.info("GPU: %s", gpu_info.get("Model"))
elif LINUX_SYSTEM.glxinfo:
# pylint: disable=no-member
logger.info("Using %s", LINUX_SYSTEM.glxinfo.opengl_vendor)
if hasattr(LINUX_SYSTEM.glxinfo, "GLX_MESA_query_renderer"):
logger.info(
@ -134,7 +136,7 @@ def fill_missing_platforms():
"""Sets the platform on games where it's missing.
This should never happen.
"""
pga_games = pga.get_games(filter_installed=True)
pga_games = get_games(filter_installed=True)
for pga_game in pga_games:
if pga_game.get("platform") or not pga_game["runner"]:
continue

View file

@ -1,6 +1,6 @@
"""Synchronization of the game library with server and local data."""
# Lutris Modules
from lutris import api, pga
from lutris import api
from lutris.database.games import add_games_bulk, add_or_update, get_game_by_field, get_games
from lutris.util import resources
from lutris.util.log import logger
@ -27,7 +27,7 @@ def sync_missing_games(not_in_local, remote_library):
"steamid": remote_game["steamid"],
}
)
missing_ids = pga.add_games_bulk(missing)
missing_ids = add_games_bulk(missing)
logger.debug("%d games added", len(missing))
return set(missing_ids)
@ -44,7 +44,7 @@ def sync_game_details(remote_library):
for remote_game in remote_library:
slug = remote_game["slug"]
sync_required = False
local_game = pga.get_game_by_field(slug, "slug")
local_game = get_game_by_field(slug, "slug")
if not local_game:
continue
if local_game["updated"] and remote_game["updated"] > local_game["updated"]:
@ -55,7 +55,7 @@ def sync_game_details(remote_library):
continue
logger.debug("Syncing details for %s", slug)
game_id = pga.add_or_update(
game_id = add_or_update(
id=local_game["id"],
name=local_game["name"],
runner=local_game["runner"],
@ -88,7 +88,7 @@ def sync_from_remote():
remote_library = api.get_library()
remote_slugs = {game["slug"] for game in remote_library}
local_slugs = {game["slug"] for game in pga.get_games()}
local_slugs = {game["slug"] for game in get_games()}
missing_slugs = remote_slugs.difference(local_slugs)
added = sync_missing_games(missing_slugs, remote_library)

View file

@ -28,6 +28,7 @@ def read_button(device):
"""Reference function for reading controller buttons and axis values.
Not to be used as is.
"""
# pylint: disable=no-member
for event in device.read_loop():
if event.type == evdev.ecodes.EV_KEY and event.value == 0:
print("button %s (%s): %s" % (event.code, hex(event.code), event.value))

View file

@ -720,6 +720,15 @@
<property name="visible">True</property>
<property name="can_focus">True</property>
<property name="vexpand">True</property>
<child>
<object class="GtkViewport" id="games_viewport">
<property name="visible">True</property>
<property name="can_focus">False</property>
<child>
<placeholder/>
</child>
</object>
</child>
</object>
<packing>
<property name="index">-1</property>

View file

@ -5,10 +5,10 @@ import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from lutris.database import games as games_db
from lutris.game import Game
from lutris.startup import init_lutris
# from lutris import settings
from lutris import pga
from lutris.gui.config.common import GameDialogCommon
from lutris.gui.config.add_game import AddGameDialog
from lutris.gui.application import Application
@ -98,7 +98,7 @@ class TestGameDialog(TestCase):
add_button = self.get_buttons().get_children()[1]
add_button.clicked()
pga_game = pga.get_game_by_field('test-game', 'slug')
pga_game = games_db.get_game_by_field('test-game', 'slug')
self.assertTrue(pga_game)
game = Game(pga_game['id'])
self.assertEqual(game.name, 'Test game')

View file

@ -1,18 +1,20 @@
import unittest
import os
from sqlite3 import OperationalError
from lutris import pga
from lutris.util import sql
from lutris.database import schema
from lutris.database import games as games_db
from lutris.database import sql
TEST_PGA_PATH = os.path.join(os.path.dirname(__file__), 'pga.db')
class DatabaseTester(unittest.TestCase):
def setUp(self):
pga.PGA_DB = TEST_PGA_PATH
schema.PGA_DB = TEST_PGA_PATH
games_db.PGA_DB = TEST_PGA_PATH
if os.path.exists(TEST_PGA_PATH):
os.remove(TEST_PGA_PATH)
pga.syncdb()
schema.syncdb()
def tearDown(self):
if os.path.exists(TEST_PGA_PATH):
@ -22,55 +24,55 @@ class DatabaseTester(unittest.TestCase):
class TestPersonnalGameArchive(DatabaseTester):
def setUp(self):
super(TestPersonnalGameArchive, self).setUp()
self.game_id = pga.add_game(name="LutrisTest", runner="Linux")
self.game_id = games_db.add_game(name="LutrisTest", runner="Linux")
def test_add_game(self):
game_list = pga.get_games()
game_list = games_db.get_games()
game_names = [item['name'] for item in game_list]
self.assertTrue("LutrisTest" in game_names)
def test_delete_game(self):
pga.delete_game(self.game_id)
game_list = pga.get_games()
games_db.delete_game(self.game_id)
game_list = games_db.get_games()
self.assertEqual(len(game_list), 0)
self.game_id = pga.add_game(name="LutrisTest", runner="Linux")
self.game_id = games_db.add_game(name="LutrisTest", runner="Linux")
def test_get_game_list(self):
game_list = pga.get_games()
game_list = games_db.get_games()
self.assertEqual(game_list[0]['id'], self.game_id)
self.assertEqual(game_list[0]['slug'], 'lutristest')
self.assertEqual(game_list[0]['name'], 'LutrisTest')
self.assertEqual(game_list[0]['runner'], 'Linux')
def test_filter(self):
pga.add_game(name="foobar", runner="Linux")
pga.add_game(name="bang", runner="Linux")
game_list = pga.get_games(name_filter='bang')
games_db.add_game(name="foobar", runner="Linux")
games_db.add_game(name="bang", runner="Linux")
game_list = games_db.get_games(name_filter='bang')
self.assertEqual(len(game_list), 1)
self.assertEqual(game_list[0]['name'], 'bang')
def test_can_filter_by_installed_games(self):
pga.add_game(name="installed_game", runner="Linux", installed=1)
pga.add_game(name="bang", runner="Linux", installed=0)
game_list = pga.get_games(filter_installed=True)
games_db.add_game(name="installed_game", runner="Linux", installed=1)
games_db.add_game(name="bang", runner="Linux", installed=0)
game_list = games_db.get_games(filter_installed=True)
self.assertEqual(len(game_list), 1)
self.assertEqual(game_list[0]['name'], 'installed_game')
def test_game_with_same_slug_is_updated(self):
pga.add_game(name="some game", runner="linux")
game = pga.get_game_by_field("some-game", "slug")
games_db.add_game(name="some game", runner="linux")
game = games_db.get_game_by_field("some-game", "slug")
self.assertFalse(game['directory'])
pga.add_or_update(name="some game", runner='linux', directory="/foo")
game = pga.get_game_by_field("some-game", "slug")
games_db.add_or_update(name="some game", runner='linux', directory="/foo")
game = games_db.get_game_by_field("some-game", "slug")
self.assertEqual(game['directory'], '/foo')
class TestDbCreator(DatabaseTester):
def test_can_generate_fields(self):
text_field = pga.field_to_string('name', 'TEXT')
text_field = schema.field_to_string('name', 'TEXT')
self.assertEqual(text_field, "name TEXT")
id_field = pga.field_to_string('id', 'INTEGER', indexed=True)
id_field = schema.field_to_string('id', 'INTEGER', indexed=True)
self.assertEqual(id_field, "id INTEGER PRIMARY KEY")
def test_can_create_table(self):
@ -78,7 +80,7 @@ class TestDbCreator(DatabaseTester):
{'name': 'id', 'type': 'INTEGER', 'indexed': True},
{'name': 'name', 'type': 'TEXT'}
]
pga.create_table('testing', fields)
schema.create_table('testing', fields)
sql.db_insert(TEST_PGA_PATH, 'testing', {'name': "testok"})
results = sql.db_select(TEST_PGA_PATH, 'testing',
fields=['id', 'name'])
@ -88,7 +90,7 @@ class TestDbCreator(DatabaseTester):
class TestMigration(DatabaseTester):
def setUp(self):
super(TestMigration, self).setUp()
pga.syncdb()
schema.syncdb()
self.tablename = "basetable"
self.schema = [
{
@ -103,15 +105,15 @@ class TestMigration(DatabaseTester):
]
def create_table(self):
pga.create_table(self.tablename, self.schema)
schema.create_table(self.tablename, self.schema)
def test_get_schema(self):
self.create_table()
schema = pga.get_schema(self.tablename)
self.assertEqual(schema[0]['name'], 'id')
self.assertEqual(schema[0]['type'], 'INTEGER')
self.assertEqual(schema[1]['name'], 'name')
self.assertEqual(schema[1]['type'], 'TEXT')
_schema = schema.get_schema(self.tablename)
self.assertEqual(_schema[0]['name'], 'id')
self.assertEqual(_schema[0]['type'], 'INTEGER')
self.assertEqual(_schema[1]['name'], 'name')
self.assertEqual(_schema[1]['type'], 'TEXT')
def test_add_field(self):
self.create_table()
@ -120,9 +122,9 @@ class TestMigration(DatabaseTester):
'type': 'INTEGER'
}
sql.add_field(TEST_PGA_PATH, self.tablename, field)
schema = pga.get_schema(self.tablename)
self.assertEqual(schema[2]['name'], 'counter')
self.assertEqual(schema[2]['type'], 'INTEGER')
_schema = schema.get_schema(self.tablename)
self.assertEqual(_schema[2]['name'], 'counter')
self.assertEqual(_schema[2]['type'], 'INTEGER')
def test_cant_add_existing_field(self):
self.create_table()
@ -135,17 +137,17 @@ class TestMigration(DatabaseTester):
def test_cant_create_empty_table(self):
with self.assertRaises(OperationalError):
pga.create_table('emptytable', [])
schema.create_table('emptytable', [])
def test_can_know_if_table_exists(self):
self.create_table()
self.assertTrue(pga.get_schema(self.tablename))
self.assertFalse(pga.get_schema('notatable'))
self.assertTrue(schema.get_schema(self.tablename))
self.assertFalse(schema.get_schema('notatable'))
def test_can_migrate(self):
self.create_table()
self.schema.append({'name': 'new_field', 'type': 'TEXT'})
migrated = pga.migrate(self.tablename, self.schema)
schema = pga.get_schema(self.tablename)
self.assertEqual(schema[2]['name'], 'new_field')
migrated = schema.migrate(self.tablename, self.schema)
_schema = schema.get_schema(self.tablename)
self.assertEqual(_schema[2]['name'], 'new_field')
self.assertEqual(migrated, ['new_field'])