diff options
author | Alexandre Flament <alex@al-f.net> | 2021-05-05 13:08:54 +0200 |
---|---|---|
committer | Alexandre Flament <alex@al-f.net> | 2021-05-05 13:12:42 +0200 |
commit | 8c1a65d32fb6a0859c0052d668d01f08325f11ad (patch) | |
tree | 8837e952d67fb8a4755ce2c732ada76474da75c2 /searx/search | |
parent | d36adfa59f242a8775ad74245c696d62b7727a36 (diff) | |
download | searxng-8c1a65d32fb6a0859c0052d668d01f08325f11ad.tar.gz searxng-8c1a65d32fb6a0859c0052d668d01f08325f11ad.zip |
[mod] multithreading only in searx.search.* packages
it prepares the new architecture change,
everything about multithreading in moved in the searx.search.* packages
previously the call to the "init" function of the engines was done in searx.engines:
* the network was not set (request not sent using the defined proxy)
* it requires to monkey patch the code to avoid HTTP requests during the tests
Diffstat (limited to 'searx/search')
-rw-r--r-- | searx/search/__init__.py | 14 | ||||
-rw-r--r-- | searx/search/checker/__main__.py | 6 | ||||
-rw-r--r-- | searx/search/checker/background.py | 4 | ||||
-rw-r--r-- | searx/search/processors/__init__.py | 37 | ||||
-rw-r--r-- | searx/search/processors/abstract.py | 17 | ||||
-rw-r--r-- | searx/search/processors/online.py | 15 |
6 files changed, 69 insertions, 24 deletions
diff --git a/searx/search/__init__.py b/searx/search/__init__.py index 9b26f38de..acc97d1e9 100644 --- a/searx/search/__init__.py +++ b/searx/search/__init__.py @@ -29,9 +29,11 @@ from searx.results import ResultContainer from searx import logger from searx.plugins import plugins from searx.search.models import EngineRef, SearchQuery -from searx.search.processors import processors, initialize as initialize_processors -from searx.search.checker import initialize as initialize_checker +from searx.engines import load_engines +from searx.network import initialize as initialize_network from searx.metrics import initialize as initialize_metrics, counter_inc, histogram_observe_time +from searx.search.processors import PROCESSORS, initialize as initialize_processors +from searx.search.checker import initialize as initialize_checker logger = logger.getChild('search') @@ -50,8 +52,10 @@ else: def initialize(settings_engines=None, enable_checker=False): settings_engines = settings_engines or settings['engines'] - initialize_processors(settings_engines) + load_engines(settings_engines) + initialize_network(settings_engines, settings['outgoing']) initialize_metrics([engine['name'] for engine in settings_engines]) + initialize_processors(settings_engines) if enable_checker: initialize_checker() @@ -106,7 +110,7 @@ class Search: # start search-reqest for all selected engines for engineref in self.search_query.engineref_list: - processor = processors[engineref.name] + processor = PROCESSORS[engineref.name] # stop the request now if the engine is suspend if processor.extend_container_if_suspended(self.result_container): @@ -152,7 +156,7 @@ class Search: for engine_name, query, request_params in requests: th = threading.Thread( - target=processors[engine_name].search, + target=PROCESSORS[engine_name].search, args=(query, request_params, self.result_container, self.start_time, self.actual_timeout), name=search_id, ) diff --git a/searx/search/checker/__main__.py b/searx/search/checker/__main__.py index 0d7d1b8ed..7f6de8f8b 100644 --- a/searx/search/checker/__main__.py +++ b/searx/search/checker/__main__.py @@ -8,7 +8,7 @@ import logging import searx.search import searx.search.checker -from searx.search import processors +from searx.search import PROCESSORS from searx.engines import engine_shortcuts @@ -41,13 +41,13 @@ def iter_processor(engine_name_list): if len(engine_name_list) > 0: for name in engine_name_list: name = engine_shortcuts.get(name, name) - processor = processors.get(name) + processor = PROCESSORS.get(name) if processor is not None: yield name, processor else: stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RED}Engine does not exist{RESET_SEQ}') else: - for name, processor in searx.search.processors.items(): + for name, processor in searx.search.PROCESSORS.items(): yield name, processor diff --git a/searx/search/checker/background.py b/searx/search/checker/background.py index c3292d9ac..276426fa7 100644 --- a/searx/search/checker/background.py +++ b/searx/search/checker/background.py @@ -9,7 +9,7 @@ import signal from searx import logger, settings, searx_debug from searx.exceptions import SearxSettingsException -from searx.search.processors import processors +from searx.search.processors import PROCESSORS from searx.search.checker import Checker from searx.shared import schedule, storage @@ -55,7 +55,7 @@ def run(): 'status': 'ok', 'engines': {} } - for name, processor in processors.items(): + for name, processor in PROCESSORS.items(): logger.debug('Checking %s engine', name) checker = Checker(processor) checker.run() diff --git a/searx/search/processors/__init__.py b/searx/search/processors/__init__.py index caac74e65..d5ebdb70c 100644 --- a/searx/search/processors/__init__.py +++ b/searx/search/processors/__init__.py @@ -11,9 +11,11 @@ __all__ = [ 'OnlineProcessor', 'OnlineDictionaryProcessor', 'OnlineCurrencyProcessor', - 'processors', + 'PROCESSORS', ] +import threading + from searx import logger import searx.engines as engines @@ -24,7 +26,7 @@ from .online_currency import OnlineCurrencyProcessor from .abstract import EngineProcessor logger = logger.getChild('search.processors') -processors = {} +PROCESSORS = {} """Cache request processores, stored by *engine-name* (:py:func:`initialize`)""" def get_processor_class(engine_type): @@ -34,6 +36,7 @@ def get_processor_class(engine_type): return c return None + def get_processor(engine, engine_name): """Return processor instance that fits to ``engine.engine.type``)""" engine_type = getattr(engine, 'engine_type', 'online') @@ -42,12 +45,26 @@ def get_processor(engine, engine_name): return processor_class(engine, engine_name) return None + +def initialize_processor(processor): + """Initialize one processor + + Call the init function of the engine + """ + if processor.has_initialize_function: + t = threading.Thread(target=processor.initialize, daemon=True) + t.start() + + def initialize(engine_list): - """Initialize all engines and store a processor for each engine in :py:obj:`processors`.""" - engines.initialize_engines(engine_list) - for engine_name, engine in engines.engines.items(): - processor = get_processor(engine, engine_name) - if processor is None: - logger.error('Error get processor for engine %s', engine_name) - else: - processors[engine_name] = processor + """Initialize all engines and store a processor for each engine in :py:obj:`PROCESSORS`.""" + for engine_data in engine_list: + engine_name = engine_data['name'] + engine = engines.engines.get(engine_name) + if engine: + processor = get_processor(engine, engine_name) + initialize_processor(processor) + if processor is None: + logger.error('Error get processor for engine %s', engine_name) + else: + PROCESSORS[engine_name] = processor diff --git a/searx/search/processors/abstract.py b/searx/search/processors/abstract.py index 38811d87c..81724f052 100644 --- a/searx/search/processors/abstract.py +++ b/searx/search/processors/abstract.py @@ -13,7 +13,8 @@ from searx import logger from searx.engines import settings from searx.network import get_time_for_thread, get_network from searx.metrics import histogram_observe, counter_inc, count_exception, count_error -from searx.exceptions import SearxEngineAccessDeniedException +from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException +from searx.utils import get_engine_from_settings logger = logger.getChild('searx.search.processor') SUSPENDED_STATUS = {} @@ -66,6 +67,20 @@ class EngineProcessor(ABC): key = id(key) if key else self.engine_name self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus()) + def initialize(self): + try: + self.engine.init(get_engine_from_settings(self.engine_name)) + except SearxEngineResponseException as exc: + logger.warn('%s engine: Fail to initialize // %s', self.engine_name, exc) + except Exception: # pylint: disable=broad-except + logger.exception('%s engine: Fail to initialize', self.engine_name) + else: + logger.debug('%s engine: Initialized', self.engine_name) + + @property + def has_initialize_function(self): + return hasattr(self.engine, 'init') + def handle_exception(self, result_container, exception_or_message, suspend=False): # update result_container if isinstance(exception_or_message, BaseException): diff --git a/searx/search/processors/online.py b/searx/search/processors/online.py index 93a9c6cbf..48a514e8a 100644 --- a/searx/search/processors/online.py +++ b/searx/search/processors/online.py @@ -5,7 +5,7 @@ """ -from time import time +from timeit import default_timer import asyncio import httpx @@ -40,6 +40,15 @@ class OnlineProcessor(EngineProcessor): engine_type = 'online' + def initialize(self): + # set timeout for all HTTP requests + searx.network.set_timeout_for_thread(self.engine.timeout, start_time=default_timer()) + # reset the HTTP total time + searx.network.reset_time_for_thread() + # set the network + searx.network.set_context_network_name(self.engine_name) + super().initialize() + def get_params(self, search_query, engine_category): params = super().get_params(search_query, engine_category) if params is None: @@ -139,7 +148,7 @@ class OnlineProcessor(EngineProcessor): self.handle_exception(result_container, e, suspend=True) logger.error("engine {0} : HTTP requests timeout" "(search duration : {1} s, timeout: {2} s) : {3}" - .format(self.engine_name, time() - start_time, + .format(self.engine_name, default_timer() - start_time, timeout_limit, e.__class__.__name__)) except (httpx.HTTPError, httpx.StreamError) as e: @@ -147,7 +156,7 @@ class OnlineProcessor(EngineProcessor): self.handle_exception(result_container, e, suspend=True) logger.exception("engine {0} : requests exception" "(search duration : {1} s, timeout: {2} s) : {3}" - .format(self.engine_name, time() - start_time, + .format(self.engine_name, default_timer() - start_time, timeout_limit, e)) except SearxEngineCaptchaException as e: |