Merge pull request #4690 from GoGoOtaku/master

Add itch.io service
This commit is contained in:
Mathieu Comandon 2023-01-28 11:44:29 -08:00 committed by GitHub
commit d5eaf58214
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 574 additions and 13 deletions

View file

@ -34,6 +34,7 @@ def get_games_where(**conditions):
Args:
conditions (dict): named arguments with each field matches its desired value.
Special values for field names can be used:
<field>__lessthan will return rows where `field` is less than the value
<field>__isnull will return rows where `field` is NULL if the value is True
<field>__not will invert the condition using `!=` instead of `=`
<field>__in will match rows for every value of `value`, which should be an iterable
@ -49,6 +50,9 @@ def get_games_where(**conditions):
field, *extra_conditions = field.split("__")
if extra_conditions:
extra_condition = extra_conditions[0]
if extra_condition == "lessthan":
condition_fields.append("{} < ?".format(field))
condition_values.append(value)
if extra_condition == "isnull":
condition_fields.append("{} is {} null".format(field, "" if value else "not"))
if extra_condition == "not":

View file

@ -176,7 +176,7 @@ class Game(GObject.Object):
@property
def is_updatable(self):
"""Return whether the game can be upgraded"""
return self.service == "gog"
return self.service in ["gog", "itchio"]
@property
def is_favorite(self):

View file

@ -53,7 +53,7 @@ class InstallerFileBox(Gtk.VBox):
"url": self.installer_file.url,
"dest": self.installer_file.dest_file,
"referer": self.installer_file.referer
})
}, downloader=self.installer_file.downloader)
download_progress.connect("complete", self.on_download_complete)
download_progress.connect("cancel", self.on_download_cancelled)
download_progress.show()

View file

@ -634,3 +634,23 @@ class CommandsMixin:
"executable": file_id,
"args": args
})
def install_or_extract(self, file_id):
"""Runs if file is executable or extracts if file is archive"""
file_path = self._get_file_path(file_id)
runner = self.installer.runner
if runner != "wine":
raise ScriptingError(_("install_or_extract only works with wine!"))
if file_path.endswith(".exe"):
params = {
"name": "wineexec",
"executable": file_id
}
return self.task(params)
slug = self.installer.game_slug
params = {
"file": file_id,
"dst": f"$GAMEDIR/drive_c/{slug}"
}
return self.extract(params)

View file

@ -57,6 +57,8 @@ class LutrisInstaller: # pylint: disable=too-many-instance-attributes
return SERVICES["humblebundle"]()
if "gog" in version and "gog" in SERVICES:
return SERVICES["gog"]()
if "itch.io" in version and "itchio" in SERVICES:
return SERVICES["itchio"]()
def get_appid(self, installer, initial=None):
if installer.get("is_dlc"):
@ -65,13 +67,19 @@ class LutrisInstaller: # pylint: disable=too-many-instance-attributes
return initial
if not self.service:
return
service_id = None
if self.service.id == "steam":
return installer.get("steamid") or installer.get("service_id")
service_id = installer.get("steamid") or installer.get("service_id")
game_config = self.script.get("game", {})
if self.service.id == "gog":
return game_config.get("gogid") or installer.get("gogid") or installer.get("service_id")
service_id = game_config.get("gogid") or installer.get("gogid") or installer.get("service_id")
if self.service.id == "humblebundle":
return game_config.get("humbleid") or installer.get("humblestoreid") or installer.get("service_id")
service_id = game_config.get("humbleid") or installer.get("humblestoreid") or installer.get("service_id")
if self.service.id == "itchio":
service_id = game_config.get("itchid") or installer.get("itchid") or installer.get("service_id")
if service_id:
return service_id
return
@property
def script_pretty(self):

View file

@ -65,6 +65,14 @@ class InstallerFile:
if isinstance(self._file_meta, dict):
return self._file_meta.get("referer")
@property
def downloader(self):
if isinstance(self._file_meta, dict):
dl = self._file_meta.get("downloader")
if dl and not dl.dest:
dl.dest = self.dest_file
return dl
@property
def checksum(self):
if isinstance(self._file_meta, dict):

View file

@ -84,6 +84,8 @@ class ScriptInterpreter(GObject.Object, CommandsMixin):
if not self.installer.script:
raise ScriptingError(_("This installer doesn't have a 'script' section"))
if not self.service and self.installer.service:
self.service = self.installer.service
script_errors = self.installer.get_errors()
if script_errors:
raise ScriptingError(

View file

@ -31,6 +31,7 @@ def get_services():
"gog": GOGService,
"humblebundle": HumbleBundleService,
"egs": EpicGamesStoreService,
"itchio": ItchIoService,
"origin": OriginService,
"ubisoft": UbisoftConnectService,
"amazon": AmazonService,
@ -52,7 +53,6 @@ SERVICES = get_services()
# Those services are not yet ready to be used
WIP_SERVICES = {
"battlenet": BattleNetService,
"itchio": ItchIoService,
"mame": MAMEService,
"flathub": FlathubService
}

View file

@ -1,14 +1,532 @@
"""Itch.io service.
Not ready yet.
"""
"""itch.io service"""
import datetime
import json
import os
from gettext import gettext as _
from urllib.parse import quote_plus, urlencode
from lutris import settings
from lutris.database import games as games_db
from lutris.installer import AUTO_ELF_EXE, AUTO_WIN32_EXE
from lutris.installer.installer_file import InstallerFile
from lutris.services.base import OnlineService
from lutris.services.service_game import ServiceGame
from lutris.services.service_media import ServiceMedia
from lutris.util import linux
from lutris.util.downloader import Downloader
from lutris.util.http import HTTPError, Request
from lutris.util.log import logger
from lutris.util.strings import slugify
class ItchIoCover(ServiceMedia):
"""itch.io game cover"""
service = "itchio"
size = (315, 250)
dest_path = os.path.join(settings.CACHE_DIR, "itchio/cover")
file_pattern = "%s.png"
file_format = "png"
def get_media_url(self, details):
"""Extract cover from API"""
# Animated (gif) covers have an extra field with a png version of the cover
if "still_cover_url" in details:
if details["still_cover_url"]:
return details["still_cover_url"]
if "cover_url" in details:
if details["cover_url"]:
return details["cover_url"]
else:
logger.warning("No field 'cover_url' in API game %s", details)
return
class ItchIoGame(ServiceGame):
"""itch.io Game"""
service = "itchio"
@classmethod
def new(cls, igame):
"""Return a Itch.io game instance from the API info"""
service_game = ItchIoGame()
service_game.appid = str(igame["id"])
service_game.slug = slugify(igame["title"])
service_game.name = igame["title"]
service_game.details = json.dumps(igame)
return service_game
class ItchIoGameTraits():
"""Game Traits Helper Class"""
def __init__(self, traits):
self._traits = traits
self.windows = bool("p_windows" in traits)
self.linux = bool("p_linux" in traits)
self.can_be_bought = bool("can_be_bought" in traits)
self.has_demo = bool("has_demo" in traits)
def has_supported_platform(self):
return self.windows or self.linux
class ItchIoService(OnlineService):
"""Service class for Itch.io"""
"""Service class for itch.io"""
id = "itchio"
name = _("Itch.io (Not implemented)")
# According to their branding, "itch.io" is supposed to be all lowercase
name = _("itch.io")
icon = "itchio"
online = True
drm_free = True
has_extras = True
medias = {
"banner": ItchIoCover,
}
default_format = "banner"
api_url = "https://api.itch.io"
login_url = "https://itch.io/login"
redirect_uri = "https://itch.io/dashboard"
cookies_path = os.path.join(settings.CACHE_DIR, ".itchio.auth")
cache_path = os.path.join(settings.CACHE_DIR, "itchio/api/")
key_cache_file = os.path.join(cache_path, "profile/owned-keys.json")
games_cache_path = os.path.join(cache_path, "games/")
key_cache = {}
supported_platforms = ("p_linux", "p_windows")
extra_types = (
"soundtrack",
"book",
"video",
"documentation",
"mod",
"audio_assets",
"graphical_assets",
"sourcecode",
"other"
)
is_loading = False
def login_callback(self, url):
"""Called after the user has logged in successfully"""
self.emit("service-login")
def is_connected(self):
"""Check if service is connected and can call the API"""
if not self.is_authenticated():
return False
try:
profile = self.fetch_profile()
except HTTPError:
logger.warning("Not connected to itch.io account.")
return False
return profile and "user" in profile
def load(self):
"""Load the user's itch.io library"""
if self.is_loading:
logger.info("itch.io games are already loading")
return
if not self.is_connected():
logger.error("User not connected to itch.io")
return
self.is_loading = True
library = self.get_games()
games = []
seen = set()
for game in library:
if game["title"] in seen:
continue
_game = ItchIoGame.new(game)
games.append(_game)
_game.save()
seen.add(game["title"])
self.is_loading = False
return games
def make_api_request(self, path, query=None):
"""Make API request"""
url = "{}/{}".format(self.api_url, path)
if query is not None and isinstance(query, dict):
url += "?{}".format(urlencode(query, quote_via=quote_plus))
request = Request(url, cookies=self.load_cookies())
request.get()
return request.json
def fetch_profile(self):
"""Do API request to get users online profile"""
return self.make_api_request("profile")
def fetch_owned_keys(self, query=None):
"""Do API request to get games owned by user (paginated)"""
return self.make_api_request("profile/owned-keys", query)
def fetch_game(self, game_id):
"""Do API request to get game info"""
return self.make_api_request(f"games/{game_id}")
def fetch_uploads(self, game_id, dl_key):
"""Do API request to get downloadables of a game."""
query = None
if dl_key is not None:
query = {"download_key_id": dl_key}
return self.make_api_request(f"games/{game_id}/uploads", query)
def fetch_upload(self, upload, dl_key):
"""Do API request to get downloadable of a game"""
query = None
if dl_key is not None:
query = {"download_key_id": dl_key}
return self.make_api_request(f"uploads/{upload}", query)
def fetch_build_patches(self, installed, target, dl_key):
"""Do API request to get game patches"""
query = None
if dl_key is not None:
query = {"download_key_id": dl_key}
return self.make_api_request(f"builds/{installed}/upgrade-paths/{target}", query)
def get_download_link(self, upload_id, dl_key):
"""Create download link for installation"""
url = "{}/{}".format(self.api_url, f"uploads/{upload_id}/download")
if dl_key is not None:
query = {"download_key_id": dl_key}
url += "?{}".format(urlencode(query, quote_via=quote_plus))
return url
def get_game_cache(self, appid):
"""Create basic cache key based on game slug and appid"""
return os.path.join(self.games_cache_path, f"{appid}.json")
def _cache_games(self, games):
"""Store information about owned keys in cache"""
os.makedirs(self.games_cache_path, exist_ok=True)
for game in games:
filename = self.get_game_cache(game["id"])
key_path = os.path.join(self.games_cache_path, filename)
with open(key_path, "w", encoding="utf-8") as cache_file:
json.dump(game, cache_file)
def get_owned_games(self, force_load=False):
"""Get all owned library keys from itch.io"""
owned_keys = []
fresh_data = True
if (not force_load) and os.path.exists(self.key_cache_file):
with open(self.key_cache_file, "r", encoding="utf-8") as key_file:
owned_keys = json.load(key_file)
fresh_data = False
else:
query = {"page": 1}
# Basic security; I'm pretty sure itch.io will block us before that tho
safety = 65507
while safety:
response = self.fetch_owned_keys(query)
if isinstance(response["owned_keys"], list):
owned_keys += response["owned_keys"]
if len(response["owned_keys"]) == int(response["per_page"]):
query["page"] += 1
else:
break
else:
break
safety -= 1
os.makedirs(os.path.join(self.cache_path, "profile/"), exist_ok=True)
with open(self.key_cache_file, "w", encoding="utf-8") as key_file:
json.dump(owned_keys, key_file)
games = []
for key in owned_keys:
game = key.get("game", {})
game["download_key_id"] = key["id"]
games.append(game)
if fresh_data:
self._cache_games(games)
return games
def get_games(self):
"""Return games from the user's library"""
games = self.get_owned_games()
filtered_games = []
for game in games:
traits = game.get("traits", {})
if any(platform in traits for platform in self.supported_platforms):
filtered_games.append(game)
return filtered_games
def get_key(self, appid):
"""Retrieve cache information on a key"""
game_filename = self.get_game_cache(appid)
game = {}
if os.path.exists(game_filename):
with open(game_filename, "r", encoding="utf-8") as game_file:
game = json.load(game_file)
else:
game = self.fetch_game(appid).get("game", {})
self._cache_games([game])
traits = game.get("traits", [])
if "can_be_bought" not in traits:
# If game can not be bought it can not have a key
return
if "download_key_id" in game:
# Return cached key
return game["download_key_id"]
if not game.get("min_price", 0):
# We have no key but the game can be played for free
return
# Reload whole key library to check if a key was added
library = self.get_owned_games(True)
game = next((x for x in library if x["id"] == appid), game)
if "download_key_id" in game:
return game["download_key_id"]
return
def get_extras(self, appid):
"""Return a list of bonus content for itch.io game."""
key = self.get_key(appid)
uploads = self.fetch_uploads(appid, key)
all_extras = {}
extras = []
for upload in uploads["uploads"]:
if upload["type"] not in self.extra_types:
continue
extras.append(
{
"name": upload.get("filename", "").strip().capitalize(),
"type": upload.get("type", "").strip(),
"total_size": upload.get("size", 0),
"id": str(upload["id"]),
}
)
if len(extras) > 0:
all_extras["Bonus Content"] = extras
return all_extras
def generate_installer(self, db_game):
"""Auto generate installer for itch.io game"""
details = json.loads(db_game["details"])
if "p_linux" in details["traits"]:
runner = "linux"
game_config = {"exe": AUTO_ELF_EXE}
script = [
{"extract": {"file": "itchupload", "dst": "$CACHE"}},
{"merge": {"src": "$CACHE", "dst": "$GAMEDIR"}},
]
elif "p_windows" in details["traits"]:
runner = "wine"
game_config = {"exe": AUTO_WIN32_EXE}
script = [
{"task": {"name": "create_prefix"}},
{"install_or_extract": "itchupload"}
]
else:
logger.warning("No supported platforms found")
return {}
return {
"name": db_game["name"],
"version": "itch.io",
"slug": db_game["slug"],
"game_slug": db_game["slug"],
"runner": runner,
"itchid": db_game["appid"],
"script": {
"files": [
{"itchupload": "N/A:Select the installer from itch.io"}
],
"game": game_config,
"installer": script,
}
}
def get_update_installers(self, db_game):
"""Check for updates"""
patch_installers = []
key = self.get_key(db_game["service_id"])
upload = None
stamp = None
patch_url = None
info_filename = os.path.join(db_game["directory"], ".lutrisgame.json")
if os.path.exists(info_filename):
info = {}
with open(info_filename, encoding="utf-8") as info_file:
info = json.load(info_file)
if "upload" in info:
# TODO: Implement wharf patching
# if "build" in info and info["build"]:
# upload = self.fetch_upload(info["upload"], key)
# patches = self.fetch_build_patches(info["build"], upload["build_id"], key)
# patch_urls = []
# for build in patches["upgrade_path"]["builds"]:
# patch_urls.append("builds/{}/download/patch/default".format(build["id"]))
# else:
# Do overinstall of upload / Full build url
patch_url = self.get_download_link(info["upload"], key)
upload = self.fetch_upload(info["upload"], key)
if upload:
uploads = [upload["upload"] if "upload" in upload else upload]
else:
uploads = self.fetch_uploads(db_game["service_id"], key)
if "uploads" in uploads:
uploads = uploads["uploads"]
stamp = 0
for _upload in uploads:
_s = _upload["updated_at"]
# Python does ootb not fully comply with RFC3999; Cut after seconds
_s = datetime.datetime.fromisoformat(_s[:_s.rfind(".")]).timestamp()
if (not stamp) or (_s < stamp):
stamp = _s
outdated = False
if stamp:
dbg = games_db.get_games_where(
installed_at__lessthan=stamp,
service=self.id,
service_id=db_game["service_id"]
)
outdated = len(dbg)
if outdated:
installer = {
"name": db_game["name"],
"slug": db_game["installer_slug"],
"game_slug": db_game["slug"],
"runner": db_game["runner"],
"script": {
"extends": db_game["installer_slug"],
"files": [],
"installer": [
{"extract": {"file": "itchupload", "dst": "$CACHE"}},
]
}
}
if patch_url:
installer["script"]["files"] = [
{"itchupload": patch_url}
]
else:
installer["script"]["files"] = [
{"itchupload": "N/A:Select the installer from itch.io"}
]
if db_game["runner"] == "linux":
installer["script"]["installer"].append(
{"merge": {"src": "$CACHE", "dst": "$GAMEDIR"}},
)
elif db_game["runner"] == "wine":
installer["script"]["installer"].append(
{"merge": {"src": "$CACHE", "dst": "$GAMEDIR/drive_c/%s" % db_game["slug"]}}
)
patch_installers.append(installer)
return patch_installers
def get_dlc_installers_runner(self, db_game, runner, only_owned=True):
"""itch.io does currently not officially support dlc"""
return []
def get_installer_files(self, installer, installer_file_id, selected_extras):
"""Replace the user provided file with download links from itch.io"""
key = self.get_key(installer.service_appid)
uploads = self.fetch_uploads(installer.service_appid, key)
filtered = []
extras = []
files = []
for upload in uploads["uploads"]:
if selected_extras and (upload["type"] in self.extra_types):
extras.append(upload)
continue
# default = games/tools ("executables")
if upload["type"] == "default" and (installer.runner in ("linux", "wine")):
is_linux = installer.runner == "linux" and "p_linux" in upload["traits"]
is_windows = installer.runner == "wine" and "p_windows" in upload["traits"]
is_demo = "demo" in upload["traits"]
if not (is_linux or is_windows):
continue
upload["Weight"] = self.get_file_weight(upload["filename"], is_demo)
if upload["Weight"] == 0xFF:
continue
filtered.append(upload)
continue
# TODO: Implement embedded types: flash, unity, java, html
# I have not found keys for embdedded games
# but people COULD write custom installers.
# So far embedded games can be played directly on itch.io
if len(filtered) > 0:
filtered.sort(key=lambda upload: upload["Weight"])
# Lutris does not support installer selection
upload = filtered[0]
data = {
"service": self.id,
"appid": installer.service_appid,
"upload": str(upload["id"]),
"slug": installer.game_slug,
"runner": installer.runner
}
if "build_id" in upload:
data["build"] = str(upload["build_id"])
# Adding a file with some basic info for e.g. patching
installer.script["installer"].append({"write_json": {
"data": data,
"file": "$GAMEDIR/.lutrisgame.json",
"merge": False
}})
link = self.get_download_link(upload["id"], key)
files.append(
InstallerFile(installer.game_slug, installer_file_id, {
"url": link,
"filename": upload["filename"],
"downloader": Downloader(link, None, overwrite=True, cookies=self.load_cookies()),
})
)
for extra in extras:
if str(extra["id"]) not in selected_extras:
continue
link = self.get_download_link(extra["id"], key)
files.append(
InstallerFile(installer.game_slug, str(extra["id"]), {
"url": link,
"filename": extra["filename"],
"downloader": Downloader(link, None, overwrite=True, cookies=self.load_cookies()),
})
)
return files
def get_file_weight(self, name, demo):
if name.endswith(".rpm"):
return 0xFF # Not supported as an extractor
weight = 0x0
if name.endswith(".deb"):
weight |= 0x01
if linux.LINUX_SYSTEM.is_64_bit:
if "386" in name or "32" in name:
weight |= 0x08
else:
if "64" in name:
weight |= 0x10
if demo:
weight |= 0x40
return weight

View file

@ -32,9 +32,10 @@ class Downloader:
COMPLETED
) = list(range(5))
def __init__(self, url, dest, overwrite=False, referer=None):
def __init__(self, url, dest, overwrite=False, referer=None, cookies=None):
self.url = url
self.dest = dest
self.cookies = cookies
self.overwrite = overwrite
self.referer = referer
self.stop_request = None
@ -137,7 +138,7 @@ class Downloader:
headers["User-Agent"] = "Lutris/%s" % __version__
if self.referer:
headers["Referer"] = self.referer
response = requests.get(self.url, headers=headers, stream=True, timeout=30)
response = requests.get(self.url, headers=headers, stream=True, timeout=30, cookies=self.cookies)
if response.status_code != 200:
logger.info("%s returned a %s error", self.url, response.status_code)
response.raise_for_status()