Move tokenization code into its own module, down in 'lutris.util'.

This commit is contained in:
Daniel Johnson 2024-04-14 18:40:33 -04:00 committed by Mathieu Comandon
parent 05e85634c3
commit c43b890982
2 changed files with 137 additions and 131 deletions

View file

@ -1,5 +1,5 @@
import copy
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional
from typing import Any, Callable, Dict, Iterator, List, Optional
from lutris.database import games
from lutris.database.categories import (
@ -9,127 +9,15 @@ from lutris.database.categories import (
)
from lutris.runners.runner import Runner
from lutris.util.strings import strip_accents
from lutris.util.tokenization import (
TokenReader,
clean_token,
implicitly_join_tokens,
tokenize_search,
)
def tokenize_search(text: str, tags: Iterable[str]) -> Iterable[str]:
tag_set = set(tags)
def _tokenize():
buffer = ""
it = iter(text)
while True:
ch = next(it, None)
if ch is None:
break
if ch.isspace() != buffer.isspace():
yield buffer
buffer = ""
if ch == "-" or ch == "(" or ch == ")":
yield buffer
yield ch
buffer = ""
continue
elif ch == ":" and buffer.casefold() in tag_set:
buffer += ch
yield buffer
buffer = ""
continue
elif ch == '"':
yield buffer
buffer = ch
while True:
ch = next(it, None)
if ch is None:
break
buffer += ch
if ch == '"':
break
yield buffer
buffer = ""
continue
buffer += ch
yield buffer
return filter(lambda t: len(t) > 0, _tokenize())
ITEM_STOP_TOKENS = ["OR", "AND", ")"]
ISOLATED_TOKENS = ITEM_STOP_TOKENS + ["-", "("]
def implicitly_join_tokens(tokens: Iterable[str]) -> Iterable[str]:
def is_isolated(t: str):
return t.startswith('"') or t in ISOLATED_TOKENS
def _join():
buffer = ""
isolate_next = False
for token in tokens:
if token.endswith(":"):
yield buffer
yield token
buffer = ""
isolate_next = True
continue
if isolate_next or is_isolated(token):
yield buffer
yield token
buffer = ""
else:
buffer += token
isolate_next = False
yield buffer
return filter(lambda t: t and not t.isspace(), _join())
class _TokenReader:
def __init__(self, tokens: List[str]) -> None:
self.tokens = tokens
self.index = 0
def is_end_of_tokens(self):
return self.index >= len(self.tokens)
def get_token(self) -> Optional[str]:
if self.index >= len(self.tokens):
return None
token = self.tokens[self.index]
self.index += 1
return token
def peek_token(self) -> Optional[str]:
if self.index >= len(self.tokens):
return None
return self.tokens[self.index]
def consume(self, candidate: str) -> bool:
token = self.peek_token()
if token == candidate:
self.index += 1
return True
return False
def clean_token(to_clean: Optional[str]) -> str:
if to_clean is None:
return ""
if to_clean.startswith('"'):
return to_clean[1:-1] if to_clean.endswith('"') else to_clean[1:]
return to_clean.strip()
ITEM_STOP_TOKENS = set(["OR", "AND", ")"])
ISOLATED_TOKENS = ITEM_STOP_TOKENS | set(["-", "("])
SearchPredicate = Callable[[Any], bool]
@ -183,22 +71,23 @@ class BaseSearch:
def get_predicate(self) -> SearchPredicate:
if self.predicate is None:
if self.text:
joined_tokens = implicitly_join_tokens(tokenize_search(self.text, self.tags))
tokens = _TokenReader(list(joined_tokens))
raw_tokens = tokenize_search(self.text, self.tags)
joined_tokens = implicitly_join_tokens(raw_tokens, ISOLATED_TOKENS)
tokens = TokenReader(list(joined_tokens))
self.predicate = self._parse_or(tokens) or TRUE_PREDICATE
else:
self.predicate = TRUE_PREDICATE
return self.predicate
def _parse_or(self, tokens: _TokenReader) -> Optional[SearchPredicate]:
def _parse_or(self, tokens: TokenReader) -> Optional[SearchPredicate]:
parts = list(self._parse_chain("OR", self._parse_and, tokens))
return or_predicates(parts)
def _parse_and(self, tokens: _TokenReader) -> Optional[SearchPredicate]:
def _parse_and(self, tokens: TokenReader) -> Optional[SearchPredicate]:
parts = list(self._parse_chain("AND", self._parse_items, tokens))
return and_predicates(parts)
def _parse_chain(self, conjunction: str, next_parser: Callable, tokens: _TokenReader) -> Iterator[SearchPredicate]:
def _parse_chain(self, conjunction: str, next_parser: Callable, tokens: TokenReader) -> Iterator[SearchPredicate]:
parsed = next_parser(tokens)
if parsed:
yield parsed
@ -210,7 +99,7 @@ class BaseSearch:
yield more
def _parse_items(self, tokens: _TokenReader) -> Optional[SearchPredicate]:
def _parse_items(self, tokens: TokenReader) -> Optional[SearchPredicate]:
buffer = []
while True:
parsed = self._parse_item(tokens)
@ -221,7 +110,7 @@ class BaseSearch:
return and_predicates(buffer)
def _parse_item(self, tokens: _TokenReader) -> Optional[SearchPredicate]:
def _parse_item(self, tokens: TokenReader) -> Optional[SearchPredicate]:
token = tokens.peek_token()
if not token or token in ITEM_STOP_TOKENS:
@ -257,7 +146,7 @@ class BaseSearch:
new_search.predicate = lambda *a: old_predicate(*a) and predicate(*a)
return new_search
def get_part_predicate(self, name: str, tokens: _TokenReader) -> Optional[Callable]:
def get_part_predicate(self, name: str, tokens: TokenReader) -> Optional[Callable]:
return None
def matches(self, candidate: Any) -> bool:
@ -284,7 +173,7 @@ class GameSearch(BaseSearch):
def get_candidate_text(self, candidate: Any) -> str:
return candidate["name"]
def get_part_predicate(self, name: str, tokens: _TokenReader) -> Optional[Callable]:
def get_part_predicate(self, name: str, tokens: TokenReader) -> Optional[Callable]:
if name == "category":
category = clean_token(tokens.get_token())
return self.get_category_predicate(category)
@ -389,7 +278,7 @@ class RunnerSearch(BaseSearch):
def get_candidate_text(self, candidate: Any) -> str:
return f"{candidate.name}\n{candidate.description}"
def get_part_predicate(self, name: str, tokens: _TokenReader) -> Optional[Callable]:
def get_part_predicate(self, name: str, tokens: TokenReader) -> Optional[Callable]:
if name == "installed":
value = clean_token(tokens.get_token())
if value not in self.flag_texts:

117
lutris/util/tokenization.py Normal file
View file

@ -0,0 +1,117 @@
from typing import Iterable, List, Optional, Set
def clean_token(to_clean: Optional[str]) -> str:
if to_clean is None:
return ""
if to_clean.startswith('"'):
return to_clean[1:-1] if to_clean.endswith('"') else to_clean[1:]
return to_clean.strip()
def tokenize_search(text: str, tags: Iterable[str]) -> Iterable[str]:
tag_set = set(tags)
def _tokenize():
buffer = ""
it = iter(text)
while True:
ch = next(it, None)
if ch is None:
break
if ch.isspace() != buffer.isspace():
yield buffer
buffer = ""
if ch == "-" or ch == "(" or ch == ")":
yield buffer
yield ch
buffer = ""
continue
elif ch == ":" and buffer.casefold() in tag_set:
buffer += ch
yield buffer
buffer = ""
continue
elif ch == '"':
yield buffer
buffer = ch
while True:
ch = next(it, None)
if ch is None:
break
buffer += ch
if ch == '"':
break
yield buffer
buffer = ""
continue
buffer += ch
yield buffer
return filter(lambda t: len(t) > 0, _tokenize())
def implicitly_join_tokens(tokens: Iterable[str], isolated_tokens: Set[str]) -> Iterable[str]:
def is_isolated(t: str):
return t.startswith('"') or t in isolated_tokens
def _join():
buffer = ""
isolate_next = False
for token in tokens:
if token.endswith(":"):
yield buffer
yield token
buffer = ""
isolate_next = True
continue
if isolate_next or is_isolated(token):
yield buffer
yield token
buffer = ""
else:
buffer += token
isolate_next = False
yield buffer
return filter(lambda t: t and not t.isspace(), _join())
class TokenReader:
def __init__(self, tokens: List[str]) -> None:
self.tokens = tokens
self.index = 0
def is_end_of_tokens(self):
return self.index >= len(self.tokens)
def get_token(self) -> Optional[str]:
if self.index >= len(self.tokens):
return None
token = self.tokens[self.index]
self.index += 1
return token
def peek_token(self) -> Optional[str]:
if self.index >= len(self.tokens):
return None
return self.tokens[self.index]
def consume(self, candidate: str) -> bool:
token = self.peek_token()
if token == candidate:
self.index += 1
return True
return False