Rename PGA_DB to DB_PATH

This commit is contained in:
Mathieu Comandon 2024-02-17 20:12:46 -08:00
parent c464ece487
commit 7c61eb4184
17 changed files with 54 additions and 63 deletions

View file

@ -20,12 +20,12 @@ def is_reserved_category(name):
def get_categories():
"""Get the list of every category in database."""
return sql.db_select(settings.PGA_DB, "categories", )
return sql.db_select(settings.DB_PATH, "categories", )
def get_category(name):
"""Return a category by name"""
categories = sql.db_select(settings.PGA_DB, "categories", condition=("name", name))
categories = sql.db_select(settings.DB_PATH, "categories", condition=("name", name))
if categories:
return categories[0]
@ -39,7 +39,7 @@ def get_game_ids_for_category(category_name):
)
return [
game["game_id"]
for game in sql.db_query(settings.PGA_DB, query, (category_name,))
for game in sql.db_query(settings.DB_PATH, query, (category_name,))
]
@ -53,24 +53,24 @@ def get_categories_in_game(game_id):
)
return [
category["name"]
for category in sql.db_query(settings.PGA_DB, query, (game_id,))
for category in sql.db_query(settings.DB_PATH, query, (game_id,))
]
def add_category(category_name):
"""Add a category to the database"""
return sql.db_insert(settings.PGA_DB, "categories", {"name": category_name})
return sql.db_insert(settings.DB_PATH, "categories", {"name": category_name})
def add_game_to_category(game_id, category_id):
"""Add a category to a game"""
return sql.db_insert(settings.PGA_DB, "games_categories", {"game_id": game_id, "category_id": category_id})
return sql.db_insert(settings.DB_PATH, "games_categories", {"game_id": game_id, "category_id": category_id})
def remove_category_from_game(game_id, category_id):
"""Remove a category from a game"""
query = "DELETE FROM games_categories WHERE category_id=? AND game_id=?"
with sql.db_cursor(settings.PGA_DB) as cursor:
with sql.db_cursor(settings.DB_PATH) as cursor:
sql.cursor_execute(cursor, query, (category_id, game_id))
@ -82,11 +82,11 @@ def remove_unused_categories():
"WHERE games_categories.category_id IS NULL"
)
empty_categories = sql.db_query(settings.PGA_DB, query)
empty_categories = sql.db_query(settings.DB_PATH, query)
for category in empty_categories:
if category['name'] == 'favorite':
continue
query = "DELETE FROM categories WHERE categories.id=?"
with sql.db_cursor(settings.PGA_DB) as cursor:
with sql.db_cursor(settings.DB_PATH) as cursor:
sql.cursor_execute(cursor, query, (category['id'],))

View file

@ -18,7 +18,7 @@ def get_games(
sorts=None
):
return sql.filtered_query(
settings.PGA_DB,
settings.DB_PATH,
"games",
searches=searches,
filters=filters,
@ -76,7 +76,7 @@ def get_games_where(**conditions):
# Inspect and document why we should return
# an empty list when no condition is present.
return []
return sql.db_query(settings.PGA_DB, query, tuple(condition_values))
return sql.db_query(settings.DB_PATH, query, tuple(condition_values))
def get_games_by_ids(game_ids):
@ -130,7 +130,7 @@ def get_game_by_field(value, field="slug"):
"""Query a game based on a database field"""
if field not in ("slug", "installer_slug", "id", "configpath", "name"):
raise ValueError("Can't query by field '%s'" % field)
game_result = sql.db_select(settings.PGA_DB, "games", condition=(field, value))
game_result = sql.db_select(settings.DB_PATH, "games", condition=(field, value))
if game_result:
return game_result[0]
return {}
@ -138,12 +138,12 @@ def get_game_by_field(value, field="slug"):
def get_games_by_runner(runner):
"""Return all games using a specific runner"""
return sql.db_select(settings.PGA_DB, "games", condition=("runner", runner))
return sql.db_select(settings.DB_PATH, "games", condition=("runner", runner))
def get_games_by_slug(slug):
"""Return all games using a specific slug"""
return sql.db_select(settings.PGA_DB, "games", condition=("slug", slug))
return sql.db_select(settings.DB_PATH, "games", condition=("slug", slug))
def add_game(**game_data):
@ -151,7 +151,7 @@ def add_game(**game_data):
game_data["installed_at"] = int(time.time())
if "slug" not in game_data:
game_data["slug"] = slugify(game_data["name"])
return sql.db_insert(settings.PGA_DB, "games", game_data)
return sql.db_insert(settings.DB_PATH, "games", game_data)
def add_games_bulk(games):
@ -164,7 +164,7 @@ def add_games_bulk(games):
Returns:
list: List of inserted game ids
"""
return [sql.db_insert(settings.PGA_DB, "games", game) for game in games]
return [sql.db_insert(settings.DB_PATH, "games", game) for game in games]
def add_or_update(**params):
@ -187,7 +187,7 @@ def update_existing(**params):
game_id = get_matching_game(params)
if game_id:
params["id"] = game_id
sql.db_update(settings.PGA_DB, "games", params, {"id": game_id})
sql.db_update(settings.DB_PATH, "games", params, {"id": game_id})
return game_id
return None
@ -215,12 +215,12 @@ def get_matching_game(params):
def delete_game(game_id):
"""Delete a game from the PGA."""
sql.db_delete(settings.PGA_DB, "games", "id", game_id)
sql.db_delete(settings.DB_PATH, "games", "id", game_id)
def get_used_runners():
"""Return a list of the runners in use by installed games."""
with sql.db_cursor(settings.PGA_DB) as cursor:
with sql.db_cursor(settings.DB_PATH) as cursor:
query = "select distinct runner from games where runner is not null order by runner"
rows = cursor.execute(query)
results = rows.fetchall()
@ -229,7 +229,7 @@ def get_used_runners():
def get_used_platforms():
"""Return a list of platforms currently in use"""
with sql.db_cursor(settings.PGA_DB) as cursor:
with sql.db_cursor(settings.DB_PATH) as cursor:
query = (
"select distinct platform from games "
"where platform is not null and platform is not '' order by platform"
@ -240,6 +240,6 @@ def get_used_platforms():
def get_game_count(param, value):
res = sql.db_select(settings.PGA_DB, "games", fields=("COUNT(id)",), condition=(param, value))
res = sql.db_select(settings.DB_PATH, "games", fields=("COUNT(id)",), condition=(param, value))
if res:
return res[0]["COUNT(id)"]

View file

@ -172,7 +172,7 @@ def get_schema(tablename):
"""
tables = []
query = "pragma table_info('%s')" % tablename
with sql.db_cursor(settings.PGA_DB) as cursor:
with sql.db_cursor(settings.DB_PATH) as cursor:
for row in cursor.execute(query).fetchall():
field = {
"name": row[1],
@ -200,7 +200,7 @@ def create_table(name, schema):
fields = ", ".join([field_to_string(**f) for f in schema])
query = "CREATE TABLE IF NOT EXISTS %s (%s)" % (name, fields)
logger.debug("[PGAQuery] %s", query)
with sql.db_cursor(settings.PGA_DB) as cursor:
with sql.db_cursor(settings.DB_PATH) as cursor:
cursor.execute(query)
@ -225,7 +225,7 @@ def migrate(table, schema):
if field["name"] not in columns:
logger.info("Migrating %s field %s", table, field["name"])
migrated_fields.append(field["name"])
sql.add_field(settings.PGA_DB, table, field)
sql.add_field(settings.DB_PATH, table, field)
else:
create_table(table, schema)
return migrated_fields

View file

@ -14,7 +14,7 @@ class ServiceGameCollection:
sorts=None
):
return sql.filtered_query(
settings.PGA_DB,
settings.DB_PATH,
"service_games",
searches=searches,
filters=filters,
@ -26,7 +26,7 @@ class ServiceGameCollection:
def get_for_service(cls, service):
if not service:
raise ValueError("No service provided")
return sql.filtered_query(settings.PGA_DB, "service_games", filters={"service": service})
return sql.filtered_query(settings.DB_PATH, "service_games", filters={"service": service})
@classmethod
def get_game(cls, service, appid):
@ -35,7 +35,7 @@ class ServiceGameCollection:
raise ValueError("No service provided")
if not appid:
raise ValueError("No appid provided")
results = sql.filtered_query(settings.PGA_DB, "service_games", filters={"service": service, "appid": appid})
results = sql.filtered_query(settings.DB_PATH, "service_games", filters={"service": service, "appid": appid})
if not results:
return
if len(results) > 1:

View file

@ -7,15 +7,15 @@ from lutris.util.log import logger
def add_source(uri):
sql.db_insert(settings.PGA_DB, "sources", {"uri": uri})
sql.db_insert(settings.DB_PATH, "sources", {"uri": uri})
def delete_source(uri):
sql.db_delete(settings.PGA_DB, "sources", "uri", uri)
sql.db_delete(settings.DB_PATH, "sources", "uri", uri)
def read_sources():
with sql.db_cursor(settings.PGA_DB) as cursor:
with sql.db_cursor(settings.DB_PATH) as cursor:
rows = cursor.execute("select uri from sources")
results = rows.fetchall()
return [row[0] for row in results]
@ -25,10 +25,10 @@ def write_sources(sources):
db_sources = read_sources()
for uri in db_sources:
if uri not in sources:
sql.db_delete(settings.PGA_DB, "sources", "uri", uri)
sql.db_delete(settings.DB_PATH, "sources", "uri", uri)
for uri in sources:
if uri not in db_sources:
sql.db_insert(settings.PGA_DB, "sources", {"uri": uri})
sql.db_insert(settings.DB_PATH, "sources", {"uri": uri})
def check_for_file(game, file_id):

View file

@ -356,7 +356,7 @@ class Game(GObject.Object):
delete_files (bool): Delete the game files
no_signal (bool): Don't emit game-removed signal
"""
sql.db_update(settings.PGA_DB, "games", {"installed": 0, "runner": ""}, {"id": self.id})
sql.db_update(settings.DB_PATH, "games", {"installed": 0, "runner": ""}, {"id": self.id})
if self.config:
self.config.remove()
xdgshortcuts.remove_launcher(self.slug, self.id, desktop=True, menu=True)

View file

@ -192,7 +192,7 @@ class GameStore(GObject.Object):
def on_game_updated(self, game):
if self.service:
db_games = sql.filtered_query(
settings.PGA_DB,
settings.DB_PATH,
"service_games",
filters=({
"service": self.service_media.service,
@ -201,7 +201,7 @@ class GameStore(GObject.Object):
)
else:
db_games = sql.filtered_query(
settings.PGA_DB,
settings.DB_PATH,
"games",
filters=({
"id": game.id

View file

@ -13,7 +13,7 @@ def migrate():
for game in games:
if 'sortname' not in game.keys() or game['sortname'] is None:
sql.db_update(
settings.PGA_DB,
settings.DB_PATH,
"games",
{"sortname": ""},
{"slug": game['slug']}

View file

@ -12,7 +12,7 @@ def migrate():
continue
print("Migrating Steam game %s" % game["name"])
sql.db_update(
settings.PGA_DB,
settings.DB_PATH,
"games",
{"service": "steam", "service_id": game["steamid"]},
{"id": game["id"]}

View file

@ -18,7 +18,7 @@ def migrate():
continue
sql.db_update(
settings.PGA_DB,
settings.DB_PATH,
"games",
{"discord_id": game['discord_id']},
{"slug": game['slug']}

View file

@ -24,8 +24,6 @@ from lutris.util.jobs import AsyncCall
from lutris.util.log import logger
from lutris.util.strings import slugify
PGA_DB = settings.PGA_DB
class AuthTokenExpiredError(Exception):
"""Exception raised when a token is no longer valid; the sidebar will
@ -176,7 +174,7 @@ class BaseService(GObject.Object):
def wipe_game_cache(self):
logger.debug("Deleting games from service-games for %s", self.id)
sql.db_delete(PGA_DB, "service_games", "service", self.id)
sql.db_delete(settings.DB_PATH, "service_games", "service", self.id)
def get_update_installers(self, db_game):
return []
@ -214,7 +212,7 @@ class BaseService(GObject.Object):
if not service_game:
return
sql.db_update(
PGA_DB,
settings.DB_PATH,
"service_games",
{"lutris_slug": lutris_game["slug"]},
conditions={"appid": service_game["appid"], "service": self.id}
@ -227,7 +225,7 @@ class BaseService(GObject.Object):
for game in unmatched_lutris_games:
logger.debug("Updating unmatched game %s", game)
sql.db_update(
PGA_DB,
settings.DB_PATH,
"games",
{"service": self.id, "service_id": service_game["appid"]},
conditions={"id": game["id"]}
@ -260,7 +258,7 @@ class BaseService(GObject.Object):
game.service = self.id
game.save(no_signal=no_signal)
service_game = ServiceGameCollection.get_game(self.id, appid)
sql.db_update(PGA_DB, "service_games", {"lutris_slug": game.slug}, {"id": service_game["id"]})
sql.db_update(settings.DB_PATH, "service_games", {"lutris_slug": game.slug}, {"id": service_game["id"]})
return game
def get_installers_from_api(self, appid):

View file

@ -4,8 +4,6 @@ from lutris.database import sql
from lutris.database.services import ServiceGameCollection
from lutris.services.service_media import ServiceMedia
PGA_DB = settings.PGA_DB
class ServiceGame:
"""Representation of a game from a 3rd party service"""
@ -39,6 +37,6 @@ class ServiceGame:
}
existing_game = ServiceGameCollection.get_game(self.service, self.appid)
if existing_game:
sql.db_update(PGA_DB, "service_games", game_data, {"id": existing_game["id"]})
sql.db_update(settings.DB_PATH, "service_games", game_data, {"id": existing_game["id"]})
else:
sql.db_insert(PGA_DB, "service_games", game_data)
sql.db_insert(settings.DB_PATH, "service_games", game_data)

View file

@ -3,14 +3,11 @@ import os
import random
import time
from lutris import settings
from lutris.database.services import ServiceGameCollection
from lutris.util import system
from lutris.util.http import HTTPError, download_file
from lutris.util.log import logger
PGA_DB = settings.PGA_DB
class ServiceMedia:
"""Information about the service's media format"""

View file

@ -20,8 +20,6 @@ from lutris.util.steam.appmanifest import AppManifest, get_appmanifests
from lutris.util.steam.config import get_active_steamid64, get_steam_library, get_steamapps_dirs
from lutris.util.strings import slugify
PGA_DB = settings.PGA_DB
class SteamBanner(ServiceMedia):
service = "steam"
@ -116,7 +114,7 @@ class SteamService(BaseService):
for game in get_games(filters={"service": self.id, "service_id": service_game["appid"]}):
steam_game_playtime = json.loads(service_game["details"]).get("playtime_forever")
playtime = steam_game_playtime / 60
sql.db_update(PGA_DB, "games", {"playtime": playtime}, conditions={"id": game["id"]})
sql.db_update(settings.DB_PATH, "games", {"playtime": playtime}, conditions={"id": game["id"]})
def get_installer_files(self, installer, _installer_file_id, _selected_extras):
steam_uri = "$STEAM:%s:."

View file

@ -44,9 +44,9 @@ RUNTIME_VERSIONS_PATH = os.path.join(CACHE_DIR, "versions.json")
ICON_PATH = os.path.join(GLib.get_user_data_dir(), "icons", "hicolor", "128x128", "apps")
if "nosetests" in sys.argv[0] or "nose2" in sys.argv[0] or "pytest" in sys.argv[0]:
PGA_DB = "/tmp/pga.db"
DB_PATH = "/tmp/pga.db"
else:
PGA_DB = sio.read_setting("pga_path") or os.path.join(DATA_DIR, "pga.db")
DB_PATH = sio.read_setting("pga_path") or os.path.join(DATA_DIR, "pga.db")
SITE_URL = sio.read_setting("website") or "https://lutris.net"

View file

@ -130,7 +130,7 @@ def init_lutris():
except sqlite3.DatabaseError as err:
raise RuntimeError(
_("Failed to open database file in %s. Try renaming this file and relaunch Lutris") %
settings.PGA_DB
settings.DB_PATH
) from err
for service in DEFAULT_SERVICES:
if not settings.read_setting(service, section="services"):

View file

@ -12,8 +12,8 @@ setup_test_environment()
class DatabaseTester(unittest.TestCase):
def setUp(self):
if os.path.exists(settings.PGA_DB):
os.remove(settings.PGA_DB)
if os.path.exists(settings.DB_PATH):
os.remove(settings.DB_PATH)
schema.syncdb()
@ -75,8 +75,8 @@ class TestDbCreator(DatabaseTester):
{'name': 'name', 'type': 'TEXT'}
]
schema.create_table('testing', fields)
sql.db_insert(settings.PGA_DB, 'testing', {'name': "testok"})
results = sql.db_select(settings.PGA_DB, 'testing',
sql.db_insert(settings.DB_PATH, 'testing', {'name': "testok"})
results = sql.db_select(settings.DB_PATH, 'testing',
fields=['id', 'name'])
self.assertEqual(results[0]['name'], "testok")
@ -114,7 +114,7 @@ class TestMigration(DatabaseTester):
'name': 'counter',
'type': 'INTEGER'
}
sql.add_field(settings.PGA_DB, self.tablename, field)
sql.add_field(settings.DB_PATH, self.tablename, field)
_schema = schema.get_schema(self.tablename)
self.assertEqual(_schema[2]['name'], 'counter')
self.assertEqual(_schema[2]['type'], 'INTEGER')
@ -126,7 +126,7 @@ class TestMigration(DatabaseTester):
'type': 'TEXT'
}
with self.assertRaises(OperationalError):
sql.add_field(settings.PGA_DB, self.tablename, field)
sql.add_field(settings.DB_PATH, self.tablename, field)
def test_cant_create_empty_table(self):
with self.assertRaises(OperationalError):