Give up on using iterators; add a TokenReader class that can go backwards, allowing lookahead.

This commit is contained in:
Daniel Johnson 2024-04-14 11:30:04 -04:00 committed by Mathieu Comandon
parent 3c50d7d208
commit 5112d3c60c

View file

@ -85,6 +85,24 @@ def implicitly_join_tokens(tokens: Iterable[str]) -> Iterable[str]:
return filter(lambda t: t and not t.isspace(), _join())
class _TokenReader:
def __init__(self, tokens: Iterator[str]) -> None:
self.tokens = tokens
self.putback_buffer = []
def get_token(self) -> Optional[str]:
if self.putback_buffer:
return self.putback_buffer.pop()
try:
return next(self.tokens)
except StopIteration:
return None
def putback(self, token: str):
self.putback_buffer.append(token)
def clean_token(to_clean: str) -> str:
if to_clean.startswith('"'):
if to_clean.endswith('"'):
@ -128,48 +146,53 @@ class BaseSearch:
def get_predicates(self) -> List[Callable]:
if self.predicates is None:
predicates = []
if self.text:
joined_tokens = implicitly_join_tokens(tokenize_search(self.text))
it = iter(joined_tokens)
token = next(it, None)
if token:
predicates = self._parse_term(token, it)
self.predicates = predicates
tokens = _TokenReader(iter(joined_tokens))
self.predicates = self._parse_term(tokens)
else:
self.predicates = []
return self.predicates
def _parse_term(self, token: str, rest: Iterator[str]) -> List[Callable]:
buffer = self._parse_item(token, rest)
def _parse_term(self, tokens: _TokenReader) -> List[Callable]:
buffer = self._parse_item(tokens)
if not buffer:
return buffer
while True:
token = next(rest, "")
token = tokens.get_token()
if not token:
return buffer
if token == "OR": # case-sensitive!
next_token = next(rest, None)
if next_token:
buffer[-1] = _make_or(buffer[-1:], self._parse_item(next_token, rest))
right = self._parse_item(tokens)
if right:
buffer = [_make_or(buffer, right)] if buffer else right
else:
buffer.extend(self._parse_item(token, rest))
buffer.extend(self._parse_item(tokens))
def _parse_item(self, tokens: _TokenReader) -> List[Callable]:
token = tokens.get_token()
if token is None:
return []
if token == "OR":
tokens.putback(token)
return []
def _parse_item(self, token: str, rest: Iterator[str]) -> List[Callable]:
if token == "-":
next_token = next(rest, "")
next_token = tokens.get_token()
if next_token:
inner = self._parse_item(next_token, rest)
inner = self._parse_item(tokens)
return [lambda *a, i=i: not i(*a) for i in inner]
if token.startswith('"'):
return [self.get_text_predicate(clean_token(token))]
if token.endswith(":"):
arg_token = next(rest, "")
arg_token = tokens.get_token()
if arg_token:
name = token[:-1].casefold()
value = clean_token(arg_token)