diff options
author | Alexandre Flament <alex@al-f.net> | 2021-05-05 18:33:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-05 18:33:16 +0200 |
commit | 5b13786abb194dba3e562854de8fbb5a3fe8c2df (patch) | |
tree | 50002f4c1c31d7e93897b70bc8a8aac946df0b6a /searx | |
parent | d36adfa59f242a8775ad74245c696d62b7727a36 (diff) | |
parent | fa0d05c3131041eb44542e0b505eaed1833bf86e (diff) | |
download | searxng-5b13786abb194dba3e562854de8fbb5a3fe8c2df.tar.gz searxng-5b13786abb194dba3e562854de8fbb5a3fe8c2df.zip |
Merge pull request #58 from searxng/mod-multithreading
[mod] multithreading only in searx.search.* packages
Diffstat (limited to 'searx')
-rw-r--r-- | searx/engines/__init__.py | 23 | ||||
-rw-r--r-- | searx/search/__init__.py | 14 | ||||
-rw-r--r-- | searx/search/checker/__main__.py | 8 | ||||
-rw-r--r-- | searx/search/checker/background.py | 14 | ||||
-rw-r--r-- | searx/search/processors/__init__.py | 37 | ||||
-rw-r--r-- | searx/search/processors/abstract.py | 17 | ||||
-rw-r--r-- | searx/search/processors/online.py | 15 |
7 files changed, 77 insertions, 51 deletions
diff --git a/searx/engines/__init__.py b/searx/engines/__init__.py index 6c3ac7a42..15212afd9 100644 --- a/searx/engines/__init__.py +++ b/searx/engines/__init__.py @@ -167,26 +167,3 @@ def load_engines(engine_list): if engine is not None: engines[engine.name] = engine return engines - - -def initialize_engines(engine_list): - load_engines(engine_list) - initialize_network(engine_list, settings['outgoing']) - - def engine_init(engine_name, init_fn): - try: - set_context_network_name(engine_name) - init_fn(get_engine_from_settings(engine_name)) - except SearxEngineResponseException as exc: - logger.warn('%s engine: Fail to initialize // %s', engine_name, exc) - except Exception: - logger.exception('%s engine: Fail to initialize', engine_name) - else: - logger.debug('%s engine: Initialized', engine_name) - - for engine_name, engine in engines.items(): - if hasattr(engine, 'init'): - init_fn = getattr(engine, 'init') - if init_fn: - logger.debug('%s engine: Starting background initialization', engine_name) - threading.Thread(target=engine_init, args=(engine_name, init_fn)).start() diff --git a/searx/search/__init__.py b/searx/search/__init__.py index 9b26f38de..acc97d1e9 100644 --- a/searx/search/__init__.py +++ b/searx/search/__init__.py @@ -29,9 +29,11 @@ from searx.results import ResultContainer from searx import logger from searx.plugins import plugins from searx.search.models import EngineRef, SearchQuery -from searx.search.processors import processors, initialize as initialize_processors -from searx.search.checker import initialize as initialize_checker +from searx.engines import load_engines +from searx.network import initialize as initialize_network from searx.metrics import initialize as initialize_metrics, counter_inc, histogram_observe_time +from searx.search.processors import PROCESSORS, initialize as initialize_processors +from searx.search.checker import initialize as initialize_checker logger = logger.getChild('search') @@ -50,8 +52,10 @@ else: def initialize(settings_engines=None, enable_checker=False): settings_engines = settings_engines or settings['engines'] - initialize_processors(settings_engines) + load_engines(settings_engines) + initialize_network(settings_engines, settings['outgoing']) initialize_metrics([engine['name'] for engine in settings_engines]) + initialize_processors(settings_engines) if enable_checker: initialize_checker() @@ -106,7 +110,7 @@ class Search: # start search-reqest for all selected engines for engineref in self.search_query.engineref_list: - processor = processors[engineref.name] + processor = PROCESSORS[engineref.name] # stop the request now if the engine is suspend if processor.extend_container_if_suspended(self.result_container): @@ -152,7 +156,7 @@ class Search: for engine_name, query, request_params in requests: th = threading.Thread( - target=processors[engine_name].search, + target=PROCESSORS[engine_name].search, args=(query, request_params, self.result_container, self.start_time, self.actual_timeout), name=search_id, ) diff --git a/searx/search/checker/__main__.py b/searx/search/checker/__main__.py index 0d7d1b8ed..7a85347cc 100644 --- a/searx/search/checker/__main__.py +++ b/searx/search/checker/__main__.py @@ -1,4 +1,6 @@ # SPDX-License-Identifier: AGPL-3.0-or-later +# lint: pylint +# pylint: disable=missing-module-docstring, missing-function-docstring import sys import io @@ -8,7 +10,7 @@ import logging import searx.search import searx.search.checker -from searx.search import processors +from searx.search import PROCESSORS from searx.engines import engine_shortcuts @@ -41,13 +43,13 @@ def iter_processor(engine_name_list): if len(engine_name_list) > 0: for name in engine_name_list: name = engine_shortcuts.get(name, name) - processor = processors.get(name) + processor = PROCESSORS.get(name) if processor is not None: yield name, processor else: stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RED}Engine does not exist{RESET_SEQ}') else: - for name, processor in searx.search.processors.items(): + for name, processor in searx.search.PROCESSORS.items(): yield name, processor diff --git a/searx/search/checker/background.py b/searx/search/checker/background.py index c3292d9ac..4c2750b44 100644 --- a/searx/search/checker/background.py +++ b/searx/search/checker/background.py @@ -1,4 +1,6 @@ # SPDX-License-Identifier: AGPL-3.0-or-later +# lint: pylint +# pylint: disable=missing-module-docstring, missing-function-docstring import json import random @@ -9,7 +11,7 @@ import signal from searx import logger, settings, searx_debug from searx.exceptions import SearxSettingsException -from searx.search.processors import processors +from searx.search.processors import PROCESSORS from searx.search.checker import Checker from searx.shared import schedule, storage @@ -34,7 +36,7 @@ def _get_every(): return _get_interval(every, 'checker.scheduling.every is not a int or list') -def get_result(): +def get_result(): # pylint: disable=inconsistent-return-statements serialized_result = storage.get_str(CHECKER_RESULT) if serialized_result is not None: return json.loads(serialized_result) @@ -47,7 +49,7 @@ def _set_result(result, include_timestamp=True): def run(): - if not running.acquire(blocking=False): + if not running.acquire(blocking=False): # pylint: disable=consider-using-with return try: logger.info('Starting checker') @@ -55,7 +57,7 @@ def run(): 'status': 'ok', 'engines': {} } - for name, processor in processors.items(): + for name, processor in PROCESSORS.items(): logger.debug('Checking %s engine', name) checker = Checker(processor) checker.run() @@ -66,7 +68,7 @@ def run(): _set_result(result) logger.info('Check done') - except Exception: + except Exception: # pylint: disable=broad-except _set_result({'status': 'error'}) logger.exception('Error while running the checker') finally: @@ -87,7 +89,7 @@ def _start_scheduling(): run() -def _signal_handler(signum, frame): +def _signal_handler(_signum, _frame): t = threading.Thread(target=run) t.daemon = True t.start() diff --git a/searx/search/processors/__init__.py b/searx/search/processors/__init__.py index caac74e65..d5ebdb70c 100644 --- a/searx/search/processors/__init__.py +++ b/searx/search/processors/__init__.py @@ -11,9 +11,11 @@ __all__ = [ 'OnlineProcessor', 'OnlineDictionaryProcessor', 'OnlineCurrencyProcessor', - 'processors', + 'PROCESSORS', ] +import threading + from searx import logger import searx.engines as engines @@ -24,7 +26,7 @@ from .online_currency import OnlineCurrencyProcessor from .abstract import EngineProcessor logger = logger.getChild('search.processors') -processors = {} +PROCESSORS = {} """Cache request processores, stored by *engine-name* (:py:func:`initialize`)""" def get_processor_class(engine_type): @@ -34,6 +36,7 @@ def get_processor_class(engine_type): return c return None + def get_processor(engine, engine_name): """Return processor instance that fits to ``engine.engine.type``)""" engine_type = getattr(engine, 'engine_type', 'online') @@ -42,12 +45,26 @@ def get_processor(engine, engine_name): return processor_class(engine, engine_name) return None + +def initialize_processor(processor): + """Initialize one processor + + Call the init function of the engine + """ + if processor.has_initialize_function: + t = threading.Thread(target=processor.initialize, daemon=True) + t.start() + + def initialize(engine_list): - """Initialize all engines and store a processor for each engine in :py:obj:`processors`.""" - engines.initialize_engines(engine_list) - for engine_name, engine in engines.engines.items(): - processor = get_processor(engine, engine_name) - if processor is None: - logger.error('Error get processor for engine %s', engine_name) - else: - processors[engine_name] = processor + """Initialize all engines and store a processor for each engine in :py:obj:`PROCESSORS`.""" + for engine_data in engine_list: + engine_name = engine_data['name'] + engine = engines.engines.get(engine_name) + if engine: + processor = get_processor(engine, engine_name) + initialize_processor(processor) + if processor is None: + logger.error('Error get processor for engine %s', engine_name) + else: + PROCESSORS[engine_name] = processor diff --git a/searx/search/processors/abstract.py b/searx/search/processors/abstract.py index 38811d87c..81724f052 100644 --- a/searx/search/processors/abstract.py +++ b/searx/search/processors/abstract.py @@ -13,7 +13,8 @@ from searx import logger from searx.engines import settings from searx.network import get_time_for_thread, get_network from searx.metrics import histogram_observe, counter_inc, count_exception, count_error -from searx.exceptions import SearxEngineAccessDeniedException +from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException +from searx.utils import get_engine_from_settings logger = logger.getChild('searx.search.processor') SUSPENDED_STATUS = {} @@ -66,6 +67,20 @@ class EngineProcessor(ABC): key = id(key) if key else self.engine_name self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus()) + def initialize(self): + try: + self.engine.init(get_engine_from_settings(self.engine_name)) + except SearxEngineResponseException as exc: + logger.warn('%s engine: Fail to initialize // %s', self.engine_name, exc) + except Exception: # pylint: disable=broad-except + logger.exception('%s engine: Fail to initialize', self.engine_name) + else: + logger.debug('%s engine: Initialized', self.engine_name) + + @property + def has_initialize_function(self): + return hasattr(self.engine, 'init') + def handle_exception(self, result_container, exception_or_message, suspend=False): # update result_container if isinstance(exception_or_message, BaseException): diff --git a/searx/search/processors/online.py b/searx/search/processors/online.py index 93a9c6cbf..48a514e8a 100644 --- a/searx/search/processors/online.py +++ b/searx/search/processors/online.py @@ -5,7 +5,7 @@ """ -from time import time +from timeit import default_timer import asyncio import httpx @@ -40,6 +40,15 @@ class OnlineProcessor(EngineProcessor): engine_type = 'online' + def initialize(self): + # set timeout for all HTTP requests + searx.network.set_timeout_for_thread(self.engine.timeout, start_time=default_timer()) + # reset the HTTP total time + searx.network.reset_time_for_thread() + # set the network + searx.network.set_context_network_name(self.engine_name) + super().initialize() + def get_params(self, search_query, engine_category): params = super().get_params(search_query, engine_category) if params is None: @@ -139,7 +148,7 @@ class OnlineProcessor(EngineProcessor): self.handle_exception(result_container, e, suspend=True) logger.error("engine {0} : HTTP requests timeout" "(search duration : {1} s, timeout: {2} s) : {3}" - .format(self.engine_name, time() - start_time, + .format(self.engine_name, default_timer() - start_time, timeout_limit, e.__class__.__name__)) except (httpx.HTTPError, httpx.StreamError) as e: @@ -147,7 +156,7 @@ class OnlineProcessor(EngineProcessor): self.handle_exception(result_container, e, suspend=True) logger.exception("engine {0} : requests exception" "(search duration : {1} s, timeout: {2} s) : {3}" - .format(self.engine_name, time() - start_time, + .format(self.engine_name, default_timer() - start_time, timeout_limit, e)) except SearxEngineCaptchaException as e: |