diff options
author | Alexandre Flament <alex@al-f.net> | 2021-04-05 10:43:33 +0200 |
---|---|---|
committer | Alexandre Flament <alex@al-f.net> | 2021-04-12 17:25:56 +0200 |
commit | d14994dc73ba5c95382812581dac146d9eceaafa (patch) | |
tree | 2f7720dbae8f1064fe479f986f0b198aff2beb99 /searx/network/__init__.py | |
parent | eaa694fb7d0e47b943bc6d6edb6cb6a40ab2d85e (diff) | |
download | searxng-d14994dc73ba5c95382812581dac146d9eceaafa.tar.gz searxng-d14994dc73ba5c95382812581dac146d9eceaafa.zip |
[httpx] replace searx.poolrequests by searx.network
settings.yml:
* outgoing.networks:
* can contains network definition
* propertiers: enable_http, verify, http2, max_connections, max_keepalive_connections,
keepalive_expiry, local_addresses, support_ipv4, support_ipv6, proxies, max_redirects, retries
* retries: 0 by default, number of times searx retries to send the HTTP request (using different IP & proxy each time)
* local_addresses can be "192.168.0.1/24" (it supports IPv6)
* support_ipv4 & support_ipv6: both True by default
see https://github.com/searx/searx/pull/1034
* each engine can define a "network" section:
* either a full network description
* either reference an existing network
* all HTTP requests of engine use the same HTTP configuration (it was not the case before, see proxy configuration in master)
Diffstat (limited to 'searx/network/__init__.py')
-rw-r--r-- | searx/network/__init__.py | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py new file mode 100644 index 000000000..dbd31c781 --- /dev/null +++ b/searx/network/__init__.py @@ -0,0 +1,189 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +import asyncio +import threading +import concurrent.futures +from time import time + +import httpx +import h2.exceptions + +from .network import get_network, initialize +from .client import LOOP +from .raise_for_httperror import raise_for_httperror + +# queue.SimpleQueue: Support Python 3.6 +try: + from queue import SimpleQueue +except ImportError: + from queue import Empty + from collections import deque + + class SimpleQueue: + """Minimal backport of queue.SimpleQueue""" + + def __init__(self): + self._queue = deque() + self._count = threading.Semaphore(0) + + def put(self, item): + self._queue.append(item) + self._count.release() + + def get(self): + if not self._count.acquire(True): + raise Empty + return self._queue.popleft() + + +THREADLOCAL = threading.local() + + +def reset_time_for_thread(): + THREADLOCAL.total_time = 0 + + +def get_time_for_thread(): + return THREADLOCAL.total_time + + +def set_timeout_for_thread(timeout, start_time=None): + THREADLOCAL.timeout = timeout + THREADLOCAL.start_time = start_time + + +def set_context_network_name(network_name): + THREADLOCAL.network = get_network(network_name) + + +def get_context_network(): + try: + return THREADLOCAL.network + except AttributeError: + return get_network() + + +def request(method, url, **kwargs): + """same as requests/requests/api.py request(...)""" + time_before_request = time() + + # timeout (httpx) + if 'timeout' in kwargs: + timeout = kwargs['timeout'] + else: + timeout = getattr(THREADLOCAL, 'timeout', None) + if timeout is not None: + kwargs['timeout'] = timeout + + # 2 minutes timeout for the requests without timeout + timeout = timeout or 120 + + # ajdust actual timeout + timeout += 0.2 # overhead + start_time = getattr(THREADLOCAL, 'start_time', time_before_request) + if start_time: + timeout -= time() - start_time + + # raise_for_error + check_for_httperror = True + if 'raise_for_httperror' in kwargs: + check_for_httperror = kwargs['raise_for_httperror'] + del kwargs['raise_for_httperror'] + + # requests compatibility + if isinstance(url, bytes): + url = url.decode() + + # network + network = get_context_network() + + # do request + future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), LOOP) + try: + response = future.result(timeout) + except concurrent.futures.TimeoutError as e: + raise httpx.TimeoutException('Timeout', request=None) from e + + # requests compatibility + # see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses + response.ok = not response.is_error + + # update total_time. + # See get_time_for_thread() and reset_time_for_thread() + if hasattr(THREADLOCAL, 'total_time'): + time_after_request = time() + THREADLOCAL.total_time += time_after_request - time_before_request + + # raise an exception + if check_for_httperror: + raise_for_httperror(response) + + return response + + +def get(url, **kwargs): + kwargs.setdefault('allow_redirects', True) + return request('get', url, **kwargs) + + +def options(url, **kwargs): + kwargs.setdefault('allow_redirects', True) + return request('options', url, **kwargs) + + +def head(url, **kwargs): + kwargs.setdefault('allow_redirects', False) + return request('head', url, **kwargs) + + +def post(url, data=None, **kwargs): + return request('post', url, data=data, **kwargs) + + +def put(url, data=None, **kwargs): + return request('put', url, data=data, **kwargs) + + +def patch(url, data=None, **kwargs): + return request('patch', url, data=data, **kwargs) + + +def delete(url, **kwargs): + return request('delete', url, **kwargs) + + +async def stream_chunk_to_queue(network, q, method, url, **kwargs): + try: + async with network.stream(method, url, **kwargs) as response: + q.put(response) + async for chunk in response.aiter_bytes(65536): + if len(chunk) > 0: + q.put(chunk) + except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e: + q.put(e) + finally: + q.put(None) + + +def stream(method, url, **kwargs): + """Replace httpx.stream. + + Usage: + stream = poolrequests.stream(...) + response = next(stream) + for chunk in stream: + ... + + httpx.Client.stream requires to write the httpx.HTTPTransport version of the + the httpx.AsyncHTTPTransport declared above. + """ + q = SimpleQueue() + future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(get_network(), q, method, url, **kwargs), + LOOP) + chunk_or_exception = q.get() + while chunk_or_exception is not None: + if isinstance(chunk_or_exception, Exception): + raise chunk_or_exception + yield chunk_or_exception + chunk_or_exception = q.get() + return future.result() |