Rely more heavily on the TokenReader to handle whitespace and conjoined tokens.

This way 'playtime: 20 hours' can be parsed, as this tag along can read the rest of the text to the next stop.
This commit is contained in:
Daniel Johnson 2024-04-17 18:34:48 -04:00 committed by Mathieu Comandon
parent dc1a9dfebc
commit 67f5078079
2 changed files with 74 additions and 73 deletions

View file

@ -12,12 +12,11 @@ from lutris.util.strings import parse_playtime, strip_accents
from lutris.util.tokenization import (
TokenReader,
clean_token,
implicitly_join_tokens,
tokenize_search,
)
ITEM_STOP_TOKENS = set(["OR", "AND", ")"])
ISOLATED_TOKENS = ITEM_STOP_TOKENS | set(["-", "(", "<", ">"])
ISOLATED_TOKENS = ITEM_STOP_TOKENS | set([":", "-", "(", "<", ">"])
SearchPredicate = Callable[[Any], bool]
@ -31,8 +30,8 @@ class InvalidSearchTermError(ValueError):
self.message = message
def read_flag_token(reader: TokenReader) -> Optional[bool]:
token = clean_token(reader.get_token())
def read_flag_token(tokens: TokenReader) -> Optional[bool]:
token = tokens.get_cleaned_token() or ""
folded = token.casefold()
if folded in FLAG_TEXTS:
return FLAG_TEXTS[folded]
@ -79,18 +78,19 @@ class BaseSearch:
def has_component(self, component_name: str) -> bool:
if component_name in self.tags:
match_token = component_name + ":"
for token in tokenize_search(self.text, ISOLATED_TOKENS, self.tags):
if token.casefold() == match_token:
return True
prev_token = None
for token in tokenize_search(self.text, ISOLATED_TOKENS):
if not token.isspace():
if token == ":" and prev_token and prev_token.casefold() == component_name:
return True
prev_token = token
return False
def get_predicate(self) -> SearchPredicate:
if self.predicate is None:
if self.text:
raw_tokens = tokenize_search(self.text, ISOLATED_TOKENS, self.tags)
joined_tokens = implicitly_join_tokens(raw_tokens, ISOLATED_TOKENS)
tokens = TokenReader(list(joined_tokens))
raw_tokens = tokenize_search(self.text, ISOLATED_TOKENS)
tokens = TokenReader(list(raw_tokens))
self.predicate = self._parse_or(tokens) or TRUE_PREDICATE
else:
self.predicate = TRUE_PREDICATE
@ -133,31 +133,38 @@ class BaseSearch:
if not token or token in ITEM_STOP_TOKENS:
return None
tokens.get_token() # actually consume it
if token.startswith('"'):
tokens.get_token() # consume token
return self.get_text_predicate(clean_token(token))
if token == "(":
if tokens.consume("("):
return self._parse_or(tokens)
if token == "-":
if tokens.consume("-"):
inner = self._parse_items(tokens)
if inner:
return lambda *a: not inner(*a)
if token.startswith('"'):
return self.get_text_predicate(clean_token(token))
saved_index = tokens.index
if token.endswith(":") and not tokens.is_end_of_tokens():
name = token[:-1].casefold()
tokens.get_token() # consume tag name
if tokens.consume(":"):
name = token.casefold()
if name in self.tags:
saved_index = tokens.index
try:
return self.get_part_predicate(name, tokens)
except InvalidSearchTermError:
# If the tag is no good, we'll rewind and fall back on a
# literal text predicate
tokens.index = saved_index
pass
return self.get_text_predicate(token)
# If the tag is no good, we'll rewind and fall back on a
# literal text predicate for the whole thing
tokens.index = saved_index
text_token = tokens.get_cleaned_token_sequence(stop_tokens=ISOLATED_TOKENS)
if text_token:
return self.get_text_predicate(text_token)
return None
def with_predicate(self, predicate: Callable):
old_predicate = self.get_predicate() # force generation of predicate
@ -190,15 +197,15 @@ class GameSearch(BaseSearch):
def get_part_predicate(self, name: str, tokens: TokenReader) -> Callable:
if name == "category":
category = clean_token(tokens.get_token())
category = tokens.get_cleaned_token() or ""
return self.get_category_predicate(category)
if name == "runner":
runner_name = clean_token(tokens.get_token())
runner_name = tokens.get_cleaned_token() or ""
return self.get_runner_predicate(runner_name)
if name == "platform":
platform = clean_token(tokens.get_token())
platform = tokens.get_cleaned_token() or ""
return self.get_platform_predicate(platform)
if name == "playtime":
@ -251,11 +258,14 @@ class GameSearch(BaseSearch):
else:
matcher = match_playtime
token = clean_token(tokens.get_token())
playtime_text = tokens.get_cleaned_token_sequence(stop_tokens=ISOLATED_TOKENS)
if not playtime_text:
raise InvalidSearchTermError("A blank is not a valid playtime.")
try:
playtime = parse_playtime(token)
playtime = parse_playtime(playtime_text)
except ValueError as ex:
raise InvalidSearchTermError(f"'{token}' is not a valid playtime.") from ex
raise InvalidSearchTermError(f"'{playtime_text}' is not a valid playtime.") from ex
return matcher

View file

@ -13,14 +13,13 @@ def clean_token(to_clean: Optional[str]) -> str:
return to_clean.strip()
def tokenize_search(text: str, isolated_characters: Set[str], tags: Set[str]) -> Iterable[str]:
def tokenize_search(text: str, isolated_tokens: Set[str]) -> Iterable[str]:
"""Iterates through a text and breaks in into tokens. Every character of the text is present
in exactly one token returned, all in order, so the original text can be reconstructed by concatenating the
tokens.
Tokens are separated by whitespace, but also certain characters (isolated_characters) are kept as separate tokens.
Double-quoted text are protected from further tokenization. Tokens that start with any of the 'tags', followed by
a ':' are also separated."""
Double-quoted text are protected from further tokenization."""
def _tokenize():
buffer = ""
@ -34,16 +33,12 @@ def tokenize_search(text: str, isolated_characters: Set[str], tags: Set[str]) ->
yield buffer
buffer = ""
if ch in isolated_characters:
# TODO: Support longer tokens here
if ch in isolated_tokens:
yield buffer
yield ch
buffer = ""
continue
elif ch == ":" and buffer.casefold() in tags:
buffer += ch
yield buffer
buffer = ""
continue
elif ch == '"':
yield buffer
@ -69,40 +64,6 @@ def tokenize_search(text: str, isolated_characters: Set[str], tags: Set[str]) ->
return filter(lambda t: len(t) > 0, _tokenize())
def implicitly_join_tokens(tokens: Iterable[str], isolated_tokens: Set[str]) -> Iterable[str]:
"""Iterates the tokens, but joins together consecutive tokens that aren't quoted and aren't
'special'; tags (ending with ':') are protected, along with the tag argument (its next token);
any tokens matching 'isolated_tokens' also won't be joined."""
def is_isolated(t: str):
return t.startswith('"') or t in isolated_tokens
def _join():
buffer = ""
isolate_next = False
for token in tokens:
if token.endswith(":"):
# If a tag is found, yield it separately, but remember to
# yield the next token separately too.
yield buffer
yield token
buffer = ""
isolate_next = True
continue
if isolate_next or is_isolated(token):
yield buffer
yield token
buffer = ""
else:
buffer += token
isolate_next = False
yield buffer
# Since we blindly return empty buffers, we must now filter them out
return filter(lambda t: t and not t.isspace(), _join())
class TokenReader:
"""TokenReader reads through a list of tokens, like an iterator. But it can also peek ahead, and you
can save and store your 'place' in the token list via the 'index' member."""
@ -115,9 +76,14 @@ class TokenReader:
"""True if get_token() and peek_token() will return None."""
return self.index >= len(self.tokens)
def get_token(self) -> Optional[str]:
def get_token(self, skip_space: bool = True) -> Optional[str]:
"""Returns the next token, and advances one token in the list. Returns None if
the end of tokens has been reached."""
if skip_space:
while self.index < len(self.tokens) and self.tokens[self.index].isspace():
self.index += 1
if self.index >= len(self.tokens):
return None
@ -125,6 +91,31 @@ class TokenReader:
self.index += 1
return token
def get_cleaned_token(self) -> Optional[str]:
token = self.get_token()
if token:
return clean_token(token)
return None
def get_cleaned_token_sequence(self, stop_tokens: Set[str]) -> Optional[str]:
buffer = ""
while True:
token = self.get_token(skip_space=False)
if token is None:
break
if token in stop_tokens:
self.index -= 1
break
if token.startswith('"'):
if buffer:
self.index -= 1
else:
buffer = token
break
buffer += token
return buffer if buffer else None
def peek_token(self) -> Optional[str]:
"""Returns the next token, or None if the end of tokens has been reached. However,
will not advance - repeated calls return the same token."""