summaryrefslogtreecommitdiff
path: root/searx/query.py
diff options
context:
space:
mode:
authorAlexandre Flament <alex@al-f.net>2021-02-22 18:13:50 +0100
committerAlexandre Flament <alex@al-f.net>2021-03-01 19:12:32 +0100
commit63f17d2e4c735767bfcb2d4fdb77d8c9ad8d9265 (patch)
tree4a4b3cae4bc9223949694d77c98e6e7443c04a65 /searx/query.py
parent4fa1290c11402f173c39a89324fe45c1b659fd9d (diff)
downloadsearxng-63f17d2e4c735767bfcb2d4fdb77d8c9ad8d9265.tar.gz
searxng-63f17d2e4c735767bfcb2d4fdb77d8c9ad8d9265.zip
[enh] autocomplete refactoring, autocomplete on external bangs
Diffstat (limited to 'searx/query.py')
-rw-r--r--searx/query.py404
1 files changed, 286 insertions, 118 deletions
diff --git a/searx/query.py b/searx/query.py
index 38cb03ffe..2e6a2aa4c 100644
--- a/searx/query.py
+++ b/searx/query.py
@@ -1,162 +1,330 @@
-#!/usr/bin/env python
-
-'''
-searx is free software: you can redistribute it and/or modify
-it under the terms of the GNU Affero General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-searx is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU Affero General Public License for more details.
-
-You should have received a copy of the GNU Affero General Public License
-along with searx. If not, see < http://www.gnu.org/licenses/ >.
-
-(C) 2014 by Thomas Pointhuber, <thomas.pointhuber@gmx.at>
-'''
+# SPDX-License-Identifier: AGPL-3.0-or-later
+from abc import abstractmethod, ABC
import re
from searx.languages import language_codes
from searx.engines import categories, engines, engine_shortcuts
+from searx.external_bang import get_bang_definition_and_autocomplete
from searx.search import EngineRef
from searx.webutils import VALID_LANGUAGE_CODE
+class QueryPartParser(ABC):
+
+ __slots__ = "raw_text_query", "enable_autocomplete"
+
+ @staticmethod
+ @abstractmethod
+ def check(raw_value):
+ """Check if raw_value can be parsed"""
+
+ def __init__(self, raw_text_query, enable_autocomplete):
+ self.raw_text_query = raw_text_query
+ self.enable_autocomplete = enable_autocomplete
+
+ @abstractmethod
+ def __call__(self, raw_value):
+ """Try to parse raw_value: set the self.raw_text_query properties
+
+ return True if raw_value has been parsed
+
+ self.raw_text_query.autocomplete_list is also modified
+ if self.enable_autocomplete is True
+ """
+
+ def _add_autocomplete(self, value):
+ if value not in self.raw_text_query.autocomplete_list:
+ self.raw_text_query.autocomplete_list.append(value)
+
+
+class TimeoutParser(QueryPartParser):
+
+ @staticmethod
+ def check(raw_value):
+ return raw_value[0] == '<'
+
+ def __call__(self, raw_value):
+ value = raw_value[1:]
+ found = self._parse(value) if len(value) > 0 else False
+ if self.enable_autocomplete and not value:
+ self._autocomplete()
+ return found
+
+ def _parse(self, value):
+ if not value.isdigit():
+ return False
+ raw_timeout_limit = int(value)
+ if raw_timeout_limit < 100:
+ # below 100, the unit is the second ( <3 = 3 seconds timeout )
+ self.raw_text_query.timeout_limit = float(raw_timeout_limit)
+ else:
+ # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
+ self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
+ return True
+
+ def _autocomplete(self):
+ for suggestion in ['<3', '<850']:
+ self._add_autocomplete(suggestion)
+
+
+class LanguageParser(QueryPartParser):
+
+ @staticmethod
+ def check(raw_value):
+ return raw_value[0] == ':'
+
+ def __call__(self, raw_value):
+ value = raw_value[1:].lower().replace('_', '-')
+ found = self._parse(value) if len(value) > 0 else False
+ if self.enable_autocomplete and not found:
+ self._autocomplete(value)
+ return found
+
+ def _parse(self, value):
+ found = False
+ # check if any language-code is equal with
+ # declared language-codes
+ for lc in language_codes:
+ lang_id, lang_name, country, english_name = map(str.lower, lc)
+
+ # if correct language-code is found
+ # set it as new search-language
+
+ if (value == lang_id
+ or value == lang_name
+ or value == english_name
+ or value.replace('-', ' ') == country)\
+ and value not in self.raw_text_query.languages:
+ found = True
+ lang_parts = lang_id.split('-')
+ if len(lang_parts) == 2:
+ self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
+ else:
+ self.raw_text_query.languages.append(lang_id)
+ # to ensure best match (first match is not necessarily the best one)
+ if value == lang_id:
+ break
+
+ # user may set a valid, yet not selectable language
+ if VALID_LANGUAGE_CODE.match(value):
+ lang_parts = value.split('-')
+ if len(lang_parts) > 1:
+ value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
+ if value not in self.raw_text_query.languages:
+ self.raw_text_query.languages.append(value)
+ found = True
+
+ return found
+
+ def _autocomplete(self, value):
+ if not value:
+ # show some example queries
+ for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
+ self.raw_text_query.autocomplete_list.append(lang)
+ return
+
+ for lc in language_codes:
+ lang_id, lang_name, country, english_name = map(str.lower, lc)
+
+ # check if query starts with language-id
+ if lang_id.startswith(value):
+ if len(value) <= 2:
+ self._add_autocomplete(':' + lang_id.split('-')[0])
+ else:
+ self._add_autocomplete(':' + lang_id)
+
+ # check if query starts with language name
+ if lang_name.startswith(value) or english_name.startswith(value):
+ self._add_autocomplete(':' + lang_name)
+
+ # check if query starts with country
+ # here "new_zealand" is "new-zealand" (see __call__)
+ if country.startswith(value.replace('-', ' ')):
+ self._add_autocomplete(':' + country.replace(' ', '_'))
+
+
+class ExternalBangParser(QueryPartParser):
+
+ @staticmethod
+ def check(raw_value):
+ return raw_value.startswith('!!')
+
+ def __call__(self, raw_value):
+ value = raw_value[2:]
+ found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
+ if self.enable_autocomplete:
+ self._autocomplete(bang_ac_list)
+ return found
+
+ def _parse(self, value):
+ found = False
+ bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
+ if bang_definition is not None:
+ self.raw_text_query.external_bang = value
+ found = True
+ return found, bang_ac_list
+
+ def _autocomplete(self, bang_ac_list):
+ if not bang_ac_list:
+ bang_ac_list = ['g', 'ddg', 'bing']
+ for external_bang in bang_ac_list:
+ self._add_autocomplete('!!' + external_bang)
+
+
+class BangParser(QueryPartParser):
+
+ @staticmethod
+ def check(raw_value):
+ return raw_value[0] == '!' or raw_value[0] == '?'
+
+ def __call__(self, raw_value):
+ value = raw_value[1:].replace('-', ' ').replace('_', ' ')
+ found = self._parse(value) if len(value) > 0 else False
+ if found and raw_value[0] == '!':
+ self.raw_text_query.specific = True
+ if self.enable_autocomplete:
+ self._autocomplete(raw_value[0], value)
+ return found
+
+ def _parse(self, value):
+ # check if prefix is equal with engine shortcut
+ if value in engine_shortcuts:
+ value = engine_shortcuts[value]
+
+ # check if prefix is equal with engine name
+ if value in engines:
+ self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
+ return True
+
+ # check if prefix is equal with categorie name
+ if value in categories:
+ # using all engines for that search, which
+ # are declared under that categorie name
+ self.raw_text_query.enginerefs.extend(EngineRef(engine.name, value)
+ for engine in categories[value]
+ if (engine.name, value) not in self.raw_text_query.disabled_engines)
+ return True
+
+ return False
+
+ def _autocomplete(self, first_char, value):
+ if not value:
+ # show some example queries
+ for suggestion in ['images', 'wikipedia', 'osm']:
+ if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
+ self._add_autocomplete(first_char + suggestion)
+ return
+
+ # check if query starts with categorie name
+ for category in categories:
+ if category.startswith(value):
+ self._add_autocomplete(first_char + category)
+
+ # check if query starts with engine name
+ for engine in engines:
+ if engine.startswith(value):
+ self._add_autocomplete(first_char + engine.replace(' ', '_'))
+
+ # check if query starts with engine shortcut
+ for engine_shortcut in engine_shortcuts:
+ if engine_shortcut.startswith(value):
+ self._add_autocomplete(first_char + engine_shortcut)
+
+
class RawTextQuery:
"""parse raw text query (the value from the html input)"""
+ PARSER_CLASSES = [
+ TimeoutParser, # this force the timeout
+ LanguageParser, # this force a language
+ ExternalBangParser, # external bang (must be before BangParser)
+ BangParser # this force a engine or category
+ ]
+
def __init__(self, query, disabled_engines):
assert isinstance(query, str)
+ # input parameters
self.query = query
- self.disabled_engines = []
-
- if disabled_engines:
- self.disabled_engines = disabled_engines
-
- self.query_parts = []
- self.user_query_parts = []
+ self.disabled_engines = disabled_engines if disabled_engines else []
+ # parsed values
self.enginerefs = []
self.languages = []
self.timeout_limit = None
self.external_bang = None
self.specific = False
+ self.autocomplete_list = []
+ # internal properties
+ self.query_parts = [] # use self.getFullQuery()
+ self.user_query_parts = [] # use self.getQuery()
+ self.autocomplete_location = None
self._parse_query()
- # parse query, if tags are set, which
- # change the search engine or search-language
def _parse_query(self):
- self.query_parts = []
+ """
+ parse self.query, if tags are set, which
+ change the search engine or search-language
+ """
# split query, including whitespaces
raw_query_parts = re.split(r'(\s+)', self.query)
- for query_part in raw_query_parts:
- searx_query_part = False
+ last_index_location = None
+ autocomplete_index = len(raw_query_parts) - 1
+ for i, query_part in enumerate(raw_query_parts):
# part does only contain spaces, skip
if query_part.isspace()\
or query_part == '':
continue
- # this force the timeout
- if query_part[0] == '<':
- try:
- raw_timeout_limit = int(query_part[1:])
- if raw_timeout_limit < 100:
- # below 100, the unit is the second ( <3 = 3 seconds timeout )
- self.timeout_limit = float(raw_timeout_limit)
- else:
- # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
- self.timeout_limit = raw_timeout_limit / 1000.0
- searx_query_part = True
- except ValueError:
- # error not reported to the user
- pass
-
- # this force a language
- if query_part[0] == ':' and len(query_part) > 1:
- lang = query_part[1:].lower().replace('_', '-')
-
- # check if any language-code is equal with
- # declared language-codes
- for lc in language_codes:
- lang_id, lang_name, country, english_name = map(str.lower, lc)
-
- # if correct language-code is found
- # set it as new search-language
- if (lang == lang_id
- or lang == lang_name
- or lang == english_name
- or lang.replace('-', ' ') == country)\
- and lang not in self.languages:
- searx_query_part = True
- lang_parts = lang_id.split('-')
- if len(lang_parts) == 2:
- self.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
- else:
- self.languages.append(lang_id)
- # to ensure best match (first match is not necessarily the best one)
- if lang == lang_id:
- break
-
- # user may set a valid, yet not selectable language
- if VALID_LANGUAGE_CODE.match(lang):
- lang_parts = lang.split('-')
- if len(lang_parts) > 1:
- lang = lang_parts[0].lower() + '-' + lang_parts[1].upper()
- if lang not in self.languages:
- self.languages.append(lang)
- searx_query_part = True
-
- # external bang
- if query_part[0:2] == "!!":
- self.external_bang = query_part[2:]
- searx_query_part = True
- continue
- # this force a engine or category
- if query_part[0] == '!' or query_part[0] == '?':
- prefix = query_part[1:].replace('-', ' ').replace('_', ' ')
-
- # check if prefix is equal with engine shortcut
- if prefix in engine_shortcuts:
- searx_query_part = True
- engine_name = engine_shortcuts[prefix]
- if engine_name in engines:
- self.enginerefs.append(EngineRef(engine_name, 'none'))
-
- # check if prefix is equal with engine name
- elif prefix in engines:
- searx_query_part = True
- self.enginerefs.append(EngineRef(prefix, 'none'))
-
- # check if prefix is equal with categorie name
- elif prefix in categories:
- # using all engines for that search, which
- # are declared under that categorie name
- searx_query_part = True
- self.enginerefs.extend(EngineRef(engine.name, prefix)
- for engine in categories[prefix]
- if (engine.name, prefix) not in self.disabled_engines)
-
- if query_part[0] == '!':
- self.specific = True
+ # parse special commands
+ special_part = False
+ for parser_class in RawTextQuery.PARSER_CLASSES:
+ if parser_class.check(query_part):
+ special_part = parser_class(self, i == autocomplete_index)(query_part)
+ break
# append query part to query_part list
- if searx_query_part:
- self.query_parts.append(query_part)
- else:
- self.user_query_parts.append(query_part)
+ qlist = self.query_parts if special_part else self.user_query_parts
+ qlist.append(query_part)
+ last_index_location = (qlist, len(qlist) - 1)
+
+ self.autocomplete_location = last_index_location
+
+ def get_autocomplete_full_query(self, text):
+ qlist, position = self.autocomplete_location
+ qlist[position] = text
+ return self.getFullQuery()
def changeQuery(self, query):
self.user_query_parts = query.strip().split()
+ self.query = self.getFullQuery()
+ self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
+ self.autocomplete_list = []
return self
def getQuery(self):
return ' '.join(self.user_query_parts)
def getFullQuery(self):
- # get full querry including whitespaces
- return '{0} {1}'.format(''.join(self.query_parts), self.getQuery()).strip()
+ """
+ get full querry including whitespaces
+ """
+ return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
+
+ def __str__(self):
+ return self.getFullQuery()
+
+ def __repr__(self):
+ return f"<{self.__class__.__name__} " \
+ + f"query={self.query!r} " \
+ + f"disabled_engines={self.disabled_engines!r}\n " \
+ + f"languages={self.languages!r} " \
+ + f"timeout_limit={self.timeout_limit!r} "\
+ + f"external_bang={self.external_bang!r} " \
+ + f"specific={self.specific!r} " \
+ + f"enginerefs={self.enginerefs!r}\n " \
+ + f"autocomplete_list={self.autocomplete_list!r}\n " \
+ + f"query_parts={self.query_parts!r}\n " \
+ + f"user_query_parts={self.user_query_parts!r} >"