summaryrefslogtreecommitdiff
path: root/searx
diff options
context:
space:
mode:
Diffstat (limited to 'searx')
-rw-r--r--searx/search/checker/__main__.py54
-rw-r--r--searx/search/checker/impl.py46
2 files changed, 67 insertions, 33 deletions
diff --git a/searx/search/checker/__main__.py b/searx/search/checker/__main__.py
index 37b7e6cda..75b37e6c5 100644
--- a/searx/search/checker/__main__.py
+++ b/searx/search/checker/__main__.py
@@ -1,8 +1,10 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import sys
+import io
import os
import argparse
+import logging
import searx.search
import searx.search.checker
@@ -10,6 +12,14 @@ from searx.search import processors
from searx.engines import engine_shortcuts
+# configure logging
+root = logging.getLogger()
+handler = logging.StreamHandler(sys.stdout)
+for h in root.handlers:
+ root.removeHandler(h)
+root.addHandler(handler)
+
+# color only for a valid terminal
if sys.stdout.isatty() and os.environ.get('TERM') not in ['dumb', 'unknown']:
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
@@ -21,7 +31,12 @@ else:
BOLD_SEQ = ""
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = "", "", "", "", "", "", "", ""
+# equivalent of 'python -u' (unbuffered stdout, stderr)
+stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)
+stderr = io.TextIOWrapper(open(sys.stderr.fileno(), 'wb', 0), write_through=True)
+
+# iterator of processors
def iter_processor(engine_name_list):
if len(engine_name_list) > 0:
for name in engine_name_list:
@@ -30,38 +45,49 @@ def iter_processor(engine_name_list):
if processor is not None:
yield name, processor
else:
- print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, RED, ' Not found ', RESET_SEQ)
+ stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RED}Engine does not exist{RESET_SEQ}')
else:
for name, processor in searx.search.processors.items():
yield name, processor
-def run(engine_name_list):
+# actual check & display
+def run(engine_name_list, verbose):
searx.search.initialize()
- broken_urls = []
for name, processor in iter_processor(engine_name_list):
- if sys.stdout.isatty():
- print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, WHITE, ' Checking', RESET_SEQ)
+ stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}Checking\n')
+ if not sys.stdout.isatty():
+ stderr.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}Checking\n')
checker = searx.search.checker.Checker(processor)
checker.run()
if checker.test_results.succesfull:
- print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, GREEN, ' OK', RESET_SEQ)
+ stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{GREEN}OK{RESET_SEQ}\n')
+ if verbose:
+ stdout.write(f' {"found languages":15}: {" ".join(sorted(list(checker.test_results.languages)))}\n')
else:
- errors = [test_name + ': ' + error for test_name, error in checker.test_results]
- print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, RED, ' Error ', str(errors), RESET_SEQ)
-
- broken_urls += checker.test_results.broken_urls
-
- for url in broken_urls:
- print('Error fetching', url)
+ stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RESET_SEQ}{RED}Error{RESET_SEQ}')
+ if not verbose:
+ errors = [test_name + ': ' + error for test_name, error in checker.test_results]
+ stdout.write(f'{RED}Error {str(errors)}{RESET_SEQ}\n')
+ else:
+ stdout.write('\n')
+ stdout.write(f' {"found languages":15}: {" ".join(sorted(list(checker.test_results.languages)))}\n')
+ for test_name, logs in checker.test_results.logs.items():
+ for log in logs:
+ stdout.write(f' {test_name:15}: {RED}{" ".join(log)}{RESET_SEQ}\n')
+# call by setup.py
def main():
parser = argparse.ArgumentParser(description='Check searx engines.')
parser.add_argument('engine_name_list', metavar='engine name', type=str, nargs='*',
help='engines name or shortcut list. Empty for all engines.')
+ parser.add_argument('--verbose', '-v',
+ action='store_true', dest='verbose',
+ help='Display details about the test results',
+ default=False)
args = parser.parse_args()
- run(args.engine_name_list)
+ run(args.engine_name_list, args.verbose)
if __name__ == '__main__':
diff --git a/searx/search/checker/impl.py b/searx/search/checker/impl.py
index abef5f8e9..71a941f73 100644
--- a/searx/search/checker/impl.py
+++ b/searx/search/checker/impl.py
@@ -17,6 +17,8 @@ from searx.search.models import SearchQuery, EngineRef
from searx.search.processors import EngineProcessor
+logger = logger.getChild('searx.search.checker')
+
HTML_TAGS = [
'embed', 'iframe', 'object', 'param', 'picture', 'source', 'svg', 'math', 'canvas', 'noscript', 'script',
'del', 'ins', 'area', 'audio', 'img', 'map', 'track', 'video', 'a', 'abbr', 'b', 'bdi', 'bdo', 'br', 'cite',
@@ -121,20 +123,25 @@ def _search_query_diff(sq1: SearchQuery, sq2: SearchQuery)\
class TestResults:
- __slots__ = 'errors', 'broken_urls'
+ __slots__ = 'errors', 'logs', 'languages'
def __init__(self):
self.errors: typing.Dict[str, typing.List[str]] = {}
- self.broken_urls = []
+ self.logs: typing.Dict[str, typing.List[typing.Any]] = {}
+ self.languages: typing.Set[str] = set()
- def add_error(self, test, message):
+ def add_error(self, test, message, *args):
+ # message to self.errors
errors_for_test = self.errors.setdefault(test, [])
if message not in errors_for_test:
errors_for_test.append(message)
+ # (message, *args) to self.logs
+ logs_for_test = self.logs.setdefault(test, [])
+ if (message, *args) not in logs_for_test:
+ logs_for_test.append((message, *args))
- def add_broken_url(self, url):
- if url not in self.broken_urls:
- self.broken_urls.append(url)
+ def add_language(self, language):
+ self.languages.add(language)
@property
def succesfull(self):
@@ -167,20 +174,23 @@ class ResultContainerTests:
results = self.result_container.get_ordered_results()
return [result['url'] for result in results]
- def _record_error(self, message: str) -> None:
- self.test_results.add_error(self.test_name, message)
+ def _record_error(self, message: str, *args) -> None:
+ sq = _search_query_to_dict(self.search_query)
+ sqstr = ' '.join(['{}={!r}'.format(k, v) for k, v in sq.items()])
+ self.test_results.add_error(self.test_name, message, *args, '(' + sqstr + ')')
def _add_language(self, text: str) -> typing.Optional[str]:
r = cld3.get_language(str(text)) # pylint: disable=E1101
- if r is not None and r.probability >= 0.9 and r.is_reliable:
+ if r is not None and r.probability >= 0.98 and r.is_reliable:
self.languages.add(r.language)
+ self.test_results.add_language(r.language)
return None
def _check_result(self, result):
if not _check_no_html(result.get('title', '')):
- self._record_error('HTML in title')
+ self._record_error('HTML in title', repr(result.get('title', '')))
if not _check_no_html(result.get('content', '')):
- self._record_error('HTML in content')
+ self._record_error('HTML in content', repr(result.get('content', '')))
self._add_language(result.get('title', ''))
self._add_language(result.get('content', ''))
@@ -198,13 +208,11 @@ class ResultContainerTests:
thumbnail_src = result.get('thumbnail_src')
if thumbnail_src is not None:
if not _is_url_image(thumbnail_src):
- self.test_results.add_broken_url(thumbnail_src)
- self._record_error('thumbnail_src URL is invalid')
+ self._record_error('thumbnail_src URL is invalid', thumbnail_src)
elif not _is_url_image(result.get('img_src')):
- self.test_results.add_broken_url(result.get('img_src'))
- self._record_error('img_src URL is invalid')
+ self._record_error('img_src URL is invalid', result.get('img_src'))
if template == 'videos.html' and not _is_url_image(result.get('thumbnail')):
- self._record_error('thumbnail URL is invalid')
+ self._record_error('thumbnail URL is invalid', result.get('img_src'))
def _check_results(self, results: list):
for result in results:
@@ -213,16 +221,16 @@ class ResultContainerTests:
def _check_answers(self, answers):
for answer in answers:
if not _check_no_html(answer):
- self._record_error('HTML in answer')
+ self._record_error('HTML in answer', answer)
def _check_infoboxes(self, infoboxes):
for infobox in infoboxes:
if not _check_no_html(infobox.get('content', '')):
- self._record_error('HTML in infobox content')
+ self._record_error('HTML in infobox content', infobox.get('content', ''))
self._add_language(infobox.get('content', ''))
for attribute in infobox.get('attributes', {}):
if not _check_no_html(attribute.get('value', '')):
- self._record_error('HTML in infobox attribute value')
+ self._record_error('HTML in infobox attribute value', attribute.get('value', ''))
def check_basic(self):
if len(self.result_container.unresponsive_engines) > 0: