diff options
author | Alexandre Flament <alex@al-f.net> | 2022-07-15 18:38:32 +0200 |
---|---|---|
committer | Alexandre Flament <alex@al-f.net> | 2022-11-05 12:04:50 +0100 |
commit | fe419e355bf1527c51e3aee98495d08b89510320 (patch) | |
tree | 0ed03d5e3d77e68ac5a5834e5890f5f27fe29fe7 /searx/search | |
parent | d764d94a70b0b10291105a867227975d59af5675 (diff) | |
download | searxng-fe419e355bf1527c51e3aee98495d08b89510320.tar.gz searxng-fe419e355bf1527c51e3aee98495d08b89510320.zip |
The checker requires Redis
Remove the abstraction in searx.shared.SharedDict.
Implement a basic and dedicated scheduler for the checker using a Redis script.
Diffstat (limited to 'searx/search')
-rw-r--r-- | searx/search/checker/__init__.py | 2 | ||||
-rw-r--r-- | searx/search/checker/background.py | 116 | ||||
-rw-r--r-- | searx/search/checker/scheduler.lua | 36 | ||||
-rw-r--r-- | searx/search/checker/scheduler.py | 57 |
4 files changed, 152 insertions, 59 deletions
diff --git a/searx/search/checker/__init__.py b/searx/search/checker/__init__.py index 85b9178df..7d779a282 100644 --- a/searx/search/checker/__init__.py +++ b/searx/search/checker/__init__.py @@ -2,3 +2,5 @@ from .impl import Checker from .background import initialize, get_result + +__all__ = ('Checker', 'initialize', 'get_result') diff --git a/searx/search/checker/background.py b/searx/search/checker/background.py index 3908245f8..e5bd642c0 100644 --- a/searx/search/checker/background.py +++ b/searx/search/checker/background.py @@ -1,26 +1,28 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # lint: pylint # pylint: disable=missing-module-docstring -# pyright: strict +# pyright: basic import json -import random import time import threading import os import signal -from typing import Dict, Union, List, Any, Tuple +from typing import Dict, Union, List, Any, Tuple, Optional from typing_extensions import TypedDict, Literal +import redis.exceptions + from searx import logger, settings, searx_debug +from searx.shared.redisdb import client as get_redis_client from searx.exceptions import SearxSettingsException from searx.search.processors import PROCESSORS from searx.search.checker import Checker -from searx.shared import schedule, storage # pyright: ignore +from searx.search.checker.scheduler import scheduler_function -CHECKER_RESULT = 'CHECKER_RESULT' -running = threading.Lock() +REDIS_RESULT_KEY = 'SearXNG_checker_result' +REDIS_LOCK_KEY = 'SearXNG_checker_lock' CheckerResult = Union['CheckerOk', 'CheckerErr', 'CheckerOther'] @@ -77,20 +79,24 @@ def _get_interval(every: Any, error_msg: str) -> Tuple[int, int]: return (every[0], every[1]) -def _get_every(): - every = settings.get('checker', {}).get('scheduling', {}).get('every', (300, 1800)) - return _get_interval(every, 'checker.scheduling.every is not a int or list') - - def get_result() -> CheckerResult: - serialized_result = storage.get_str(CHECKER_RESULT) - if serialized_result is not None: - return json.loads(serialized_result) - return {'status': 'unknown'} + client = get_redis_client() + if client is None: + # without Redis, the checker is disabled + return {'status': 'disabled'} + serialized_result: Optional[bytes] = client.get(REDIS_RESULT_KEY) + if serialized_result is None: + # the Redis key does not exist + return {'status': 'unknown'} + return json.loads(serialized_result) def _set_result(result: CheckerResult): - storage.set_str(CHECKER_RESULT, json.dumps(result)) + client = get_redis_client() + if client is None: + # without Redis, the function does nothing + return + client.set(REDIS_RESULT_KEY, json.dumps(result)) def _timestamp(): @@ -98,41 +104,29 @@ def _timestamp(): def run(): - if not running.acquire(blocking=False): # pylint: disable=consider-using-with - return try: - logger.info('Starting checker') - result: CheckerOk = {'status': 'ok', 'engines': {}, 'timestamp': _timestamp()} - for name, processor in PROCESSORS.items(): - logger.debug('Checking %s engine', name) - checker = Checker(processor) - checker.run() - if checker.test_results.successful: - result['engines'][name] = {'success': True} - else: - result['engines'][name] = {'success': False, 'errors': checker.test_results.errors} - - _set_result(result) - logger.info('Check done') + # use a Redis lock to make sure there is no checker running at the same time + # (this should not happen, this is a safety measure) + with get_redis_client().lock(REDIS_LOCK_KEY, blocking_timeout=60, timeout=3600): + logger.info('Starting checker') + result: CheckerOk = {'status': 'ok', 'engines': {}, 'timestamp': _timestamp()} + for name, processor in PROCESSORS.items(): + logger.debug('Checking %s engine', name) + checker = Checker(processor) + checker.run() + if checker.test_results.successful: + result['engines'][name] = {'success': True} + else: + result['engines'][name] = {'success': False, 'errors': checker.test_results.errors} + + _set_result(result) + logger.info('Check done') + except redis.exceptions.LockError: + _set_result({'status': 'error', 'timestamp': _timestamp()}) + logger.exception('Error while running the checker') except Exception: # pylint: disable=broad-except _set_result({'status': 'error', 'timestamp': _timestamp()}) logger.exception('Error while running the checker') - finally: - running.release() - - -def _run_with_delay(): - every = _get_every() - delay = random.randint(0, every[1] - every[0]) - logger.debug('Start checker in %i seconds', delay) - time.sleep(delay) - run() - - -def _start_scheduling(): - every = _get_every() - if schedule(every[0], _run_with_delay): - run() def _signal_handler(_signum: int, _frame: Any): @@ -147,27 +141,31 @@ def initialize(): logger.info('Send SIGUSR1 signal to pid %i to start the checker', os.getpid()) signal.signal(signal.SIGUSR1, _signal_handler) - # disabled by default - _set_result({'status': 'disabled'}) - # special case when debug is activate - if searx_debug and settings.get('checker', {}).get('off_when_debug', True): + if searx_debug and settings['checker']['off_when_debug']: logger.info('debug mode: checker is disabled') return # check value of checker.scheduling.every now - scheduling = settings.get('checker', {}).get('scheduling', None) + scheduling = settings['checker']['scheduling'] if scheduling is None or not scheduling: logger.info('Checker scheduler is disabled') return - # - _set_result({'status': 'unknown'}) + # make sure there is a Redis connection + if get_redis_client() is None: + logger.error('The checker requires Redis') + return - start_after = scheduling.get('start_after', (300, 1800)) - start_after = _get_interval(start_after, 'checker.scheduling.start_after is not a int or list') - delay = random.randint(start_after[0], start_after[1]) - logger.info('Start checker in %i seconds', delay) - t = threading.Timer(delay, _start_scheduling) + # start the background scheduler + every_range = _get_interval(scheduling.get('every', (300, 1800)), 'checker.scheduling.every is not a int or list') + start_after_range = _get_interval( + scheduling.get('start_after', (300, 1800)), 'checker.scheduling.start_after is not a int or list' + ) + t = threading.Thread( + target=scheduler_function, + args=(start_after_range[0], start_after_range[1], every_range[0], every_range[1], run), + name='checker_scheduler', + ) t.daemon = True t.start() diff --git a/searx/search/checker/scheduler.lua b/searx/search/checker/scheduler.lua new file mode 100644 index 000000000..b3c6023fe --- /dev/null +++ b/searx/search/checker/scheduler.lua @@ -0,0 +1,36 @@ +-- SPDX-License-Identifier: AGPL-3.0-or-later +-- +-- This script is not a string in scheduler.py, so editors can provide syntax highlighting. + +-- The Redis KEY is defined here and not in Python on purpose: +-- only this LUA script can read and update this key to avoid lock and concurrency issues. +local redis_key = 'SearXNG_checker_next_call_ts' + +local now = redis.call('TIME')[1] +local start_after_from = ARGV[1] +local start_after_to = ARGV[2] +local every_from = ARGV[3] +local every_to = ARGV[4] + +local next_call_ts = redis.call('GET', redis_key) + +if (next_call_ts == false or next_call_ts == nil) then + -- the scheduler has never run on this Redis instance, so: + -- 1/ the scheduler does not run now + -- 2/ the next call is a random time between start_after_from and start_after_to + local delay = start_after_from + math.random(start_after_to - start_after_from) + redis.call('SET', redis_key, now + delay) + return { false, delay } +end + +-- next_call_ts is defined +-- --> if now is lower than next_call_ts then we don't run the embedded checker +-- --> if now is higher then we update next_call_ts and ask to run the embedded checker now. +local call_now = next_call_ts <= now +if call_now then + -- the checker runs now, define the timestamp of the next call: + -- this is a random delay between every_from and every_to + local periodic_delay = every_from + math.random(every_to - every_from) + next_call_ts = redis.call('INCRBY', redis_key, periodic_delay) +end +return { call_now, next_call_ts - now } diff --git a/searx/search/checker/scheduler.py b/searx/search/checker/scheduler.py new file mode 100644 index 000000000..1ae635951 --- /dev/null +++ b/searx/search/checker/scheduler.py @@ -0,0 +1,57 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# lint: pylint +# pylint: disable=missing-module-docstring +"""Lame scheduler which use Redis as a source of truth: +* the Redis key SearXNG_checker_next_call_ts contains the next time the embedded checker should run. +* to avoid lock, a unique Redis script reads and updates the Redis key SearXNG_checker_next_call_ts. +* this Redis script returns a list of two elements: + * the first one is a boolean. If True, the embedded checker must run now in this worker. + * the second element is the delay in second to wait before the next call to the Redis script. + +This scheduler is not generic on purpose: if more feature are required, a dedicate scheduler must be used +(= a better scheduler should not use the web workers) +""" + +import logging +import time +import importlib +from typing import Callable + +from searx.shared.redisdb import client as get_redis_client +from searx.redislib import lua_script_storage + + +logger = logging.getLogger('searx.search.checker') + + +def scheduler_function(start_after_from: int, start_after_to: int, every_from: int, every_to: int, callback: Callable): + """Run the checker periodically. The function never returns. + + Parameters: + * start_after_from and start_after_to: when to call "callback" for the first on the Redis instance + * every_from and every_to: after the first call, how often to call "callback" + + There is no issue: + * to call this function is multiple workers + * to kill workers at any time as long there is one at least one worker + """ + scheduler_now_script = importlib.resources.read_text(__package__, "scheduler.lua") + while True: + # ask the Redis script what to do + # the script says + # * if the checker must run now. + # * how to long to way before calling the script again (it can be call earlier, but not later). + script = lua_script_storage(get_redis_client(), scheduler_now_script) + call_now, wait_time = script(args=[start_after_from, start_after_to, every_from, every_to]) + + # does the worker run the checker now? + if call_now: + # run the checker + try: + callback() + except Exception: # pylint: disable=broad-except + logger.exception("Error calling the embedded checker") + # only worker display the wait_time + logger.info("Next call to the checker in %s seconds", wait_time) + # wait until the next call + time.sleep(wait_time) |