1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
|
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=invalid-name, missing-module-docstring, missing-class-docstring
from abc import abstractmethod, ABC
import re
from searx import settings
from searx.sxng_locales import sxng_locales
from searx.engines import categories, engines, engine_shortcuts
from searx.external_bang import get_bang_definition_and_autocomplete
from searx.search import EngineRef
from searx.webutils import VALID_LANGUAGE_CODE
class QueryPartParser(ABC):
__slots__ = "raw_text_query", "enable_autocomplete"
@staticmethod
@abstractmethod
def check(raw_value):
"""Check if raw_value can be parsed"""
def __init__(self, raw_text_query, enable_autocomplete):
self.raw_text_query = raw_text_query
self.enable_autocomplete = enable_autocomplete
@abstractmethod
def __call__(self, raw_value):
"""Try to parse raw_value: set the self.raw_text_query properties
return True if raw_value has been parsed
self.raw_text_query.autocomplete_list is also modified
if self.enable_autocomplete is True
"""
def _add_autocomplete(self, value):
if value not in self.raw_text_query.autocomplete_list:
self.raw_text_query.autocomplete_list.append(value)
class TimeoutParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value[0] == '<'
def __call__(self, raw_value):
value = raw_value[1:]
found = self._parse(value) if len(value) > 0 else False
if self.enable_autocomplete and not value:
self._autocomplete()
return found
def _parse(self, value):
if not value.isdigit():
return False
raw_timeout_limit = int(value)
if raw_timeout_limit < 100:
# below 100, the unit is the second ( <3 = 3 seconds timeout )
self.raw_text_query.timeout_limit = float(raw_timeout_limit)
else:
# 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
return True
def _autocomplete(self):
for suggestion in ['<3', '<850']:
self._add_autocomplete(suggestion)
class LanguageParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value[0] == ':'
def __call__(self, raw_value):
value = raw_value[1:].lower().replace('_', '-')
found = self._parse(value) if len(value) > 0 else False
if self.enable_autocomplete and not found:
self._autocomplete(value)
return found
def _parse(self, value):
found = False
# check if any language-code is equal with
# declared language-codes
for lc in sxng_locales:
lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
# if correct language-code is found
# set it as new search-language
if (
value == lang_id or value == lang_name or value == english_name or value.replace('-', ' ') == country
) and value not in self.raw_text_query.languages:
found = True
lang_parts = lang_id.split('-')
if len(lang_parts) == 2:
self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
else:
self.raw_text_query.languages.append(lang_id)
# to ensure best match (first match is not necessarily the best one)
if value == lang_id:
break
# user may set a valid, yet not selectable language
if VALID_LANGUAGE_CODE.match(value) or value == 'auto':
lang_parts = value.split('-')
if len(lang_parts) > 1:
value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
if value not in self.raw_text_query.languages:
self.raw_text_query.languages.append(value)
found = True
return found
def _autocomplete(self, value):
if not value:
# show some example queries
if len(settings['search']['languages']) < 10:
for lang in settings['search']['languages']:
self.raw_text_query.autocomplete_list.append(':' + lang)
else:
for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
self.raw_text_query.autocomplete_list.append(lang)
return
for lc in sxng_locales:
if lc[0] not in settings['search']['languages']:
continue
lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
# check if query starts with language-id
if lang_id.startswith(value):
if len(value) <= 2:
self._add_autocomplete(':' + lang_id.split('-')[0])
else:
self._add_autocomplete(':' + lang_id)
# check if query starts with language name
if lang_name.startswith(value) or english_name.startswith(value):
self._add_autocomplete(':' + lang_name)
# check if query starts with country
# here "new_zealand" is "new-zealand" (see __call__)
if country.startswith(value.replace('-', ' ')):
self._add_autocomplete(':' + country.replace(' ', '_'))
class ExternalBangParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value.startswith('!!') and len(raw_value) > 2
def __call__(self, raw_value):
value = raw_value[2:]
found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
if self.enable_autocomplete:
self._autocomplete(bang_ac_list)
return found
def _parse(self, value):
found = False
bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
if bang_definition is not None:
self.raw_text_query.external_bang = value
found = True
return found, bang_ac_list
def _autocomplete(self, bang_ac_list):
if not bang_ac_list:
bang_ac_list = ['g', 'ddg', 'bing']
for external_bang in bang_ac_list:
self._add_autocomplete('!!' + external_bang)
class BangParser(QueryPartParser):
@staticmethod
def check(raw_value):
# make sure it's not any bang with double '!!'
return raw_value[0] == '!' and (len(raw_value) < 2 or raw_value[1] != '!')
def __call__(self, raw_value):
value = raw_value[1:].replace('-', ' ').replace('_', ' ')
found = self._parse(value) if len(value) > 0 else False
if found and raw_value[0] == '!':
self.raw_text_query.specific = True
if self.enable_autocomplete:
self._autocomplete(raw_value[0], value)
return found
def _parse(self, value):
# check if prefix is equal with engine shortcut
if value in engine_shortcuts: # pylint: disable=consider-using-get
value = engine_shortcuts[value]
# check if prefix is equal with engine name
if value in engines:
self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
return True
# check if prefix is equal with category name
if value in categories:
# using all engines for that search, which
# are declared under that category name
self.raw_text_query.enginerefs.extend(
EngineRef(engine.name, value)
for engine in categories[value]
if (engine.name, value) not in self.raw_text_query.disabled_engines
)
return True
return False
def _autocomplete(self, first_char, value):
if not value:
# show some example queries
for suggestion in ['images', 'wikipedia', 'osm']:
if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
self._add_autocomplete(first_char + suggestion)
return
# check if query starts with category name
for category in categories:
if category.startswith(value):
self._add_autocomplete(first_char + category.replace(' ', '_'))
# check if query starts with engine name
for engine in engines:
if engine.startswith(value):
self._add_autocomplete(first_char + engine.replace(' ', '_'))
# check if query starts with engine shortcut
for engine_shortcut in engine_shortcuts:
if engine_shortcut.startswith(value):
self._add_autocomplete(first_char + engine_shortcut)
class FeelingLuckyParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value == '!!'
def __call__(self, raw_value):
self.raw_text_query.redirect_to_first_result = True
return True
class RawTextQuery:
"""parse raw text query (the value from the html input)"""
PARSER_CLASSES = [
TimeoutParser, # force the timeout
LanguageParser, # force a language
ExternalBangParser, # external bang (must be before BangParser)
BangParser, # force an engine or category
FeelingLuckyParser, # redirect to the first link in the results list
]
def __init__(self, query, disabled_engines):
assert isinstance(query, str)
# input parameters
self.query = query
self.disabled_engines = disabled_engines if disabled_engines else []
# parsed values
self.enginerefs = []
self.languages = []
self.timeout_limit = None
self.external_bang = None
self.specific = False
self.autocomplete_list = []
# internal properties
self.query_parts = [] # use self.getFullQuery()
self.user_query_parts = [] # use self.getQuery()
self.autocomplete_location = None
self.redirect_to_first_result = False
self._parse_query()
def _parse_query(self):
"""
parse self.query, if tags are set, which
change the search engine or search-language
"""
# split query, including whitespaces
raw_query_parts = re.split(r'(\s+)', self.query)
last_index_location = None
autocomplete_index = len(raw_query_parts) - 1
for i, query_part in enumerate(raw_query_parts):
# part does only contain spaces, skip
if query_part.isspace() or query_part == '':
continue
# parse special commands
special_part = False
for parser_class in RawTextQuery.PARSER_CLASSES:
if parser_class.check(query_part):
special_part = parser_class(self, i == autocomplete_index)(query_part)
break
# append query part to query_part list
qlist = self.query_parts if special_part else self.user_query_parts
qlist.append(query_part)
last_index_location = (qlist, len(qlist) - 1)
self.autocomplete_location = last_index_location
def get_autocomplete_full_query(self, text):
qlist, position = self.autocomplete_location
qlist[position] = text
return self.getFullQuery()
def changeQuery(self, query):
self.user_query_parts = query.strip().split()
self.query = self.getFullQuery()
self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
self.autocomplete_list = []
return self
def getQuery(self):
return ' '.join(self.user_query_parts)
def getFullQuery(self):
"""
get full query including whitespaces
"""
return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
def __str__(self):
return self.getFullQuery()
def __repr__(self):
return (
f"<{self.__class__.__name__} "
+ f"query={self.query!r} "
+ f"disabled_engines={self.disabled_engines!r}\n "
+ f"languages={self.languages!r} "
+ f"timeout_limit={self.timeout_limit!r} "
+ f"external_bang={self.external_bang!r} "
+ f"specific={self.specific!r} "
+ f"enginerefs={self.enginerefs!r}\n "
+ f"autocomplete_list={self.autocomplete_list!r}\n "
+ f"query_parts={self.query_parts!r}\n "
+ f"user_query_parts={self.user_query_parts!r} >\n"
+ f"redirect_to_first_result={self.redirect_to_first_result!r}"
)
|