summaryrefslogtreecommitdiff
path: root/searx/network/__init__.py
diff options
context:
space:
mode:
authorAlexandre Flament <alex@al-f.net>2021-04-05 10:43:33 +0200
committerAlexandre Flament <alex@al-f.net>2021-04-12 17:25:56 +0200
commitd14994dc73ba5c95382812581dac146d9eceaafa (patch)
tree2f7720dbae8f1064fe479f986f0b198aff2beb99 /searx/network/__init__.py
parenteaa694fb7d0e47b943bc6d6edb6cb6a40ab2d85e (diff)
downloadsearxng-d14994dc73ba5c95382812581dac146d9eceaafa.tar.gz
searxng-d14994dc73ba5c95382812581dac146d9eceaafa.zip
[httpx] replace searx.poolrequests by searx.network
settings.yml: * outgoing.networks: * can contains network definition * propertiers: enable_http, verify, http2, max_connections, max_keepalive_connections, keepalive_expiry, local_addresses, support_ipv4, support_ipv6, proxies, max_redirects, retries * retries: 0 by default, number of times searx retries to send the HTTP request (using different IP & proxy each time) * local_addresses can be "192.168.0.1/24" (it supports IPv6) * support_ipv4 & support_ipv6: both True by default see https://github.com/searx/searx/pull/1034 * each engine can define a "network" section: * either a full network description * either reference an existing network * all HTTP requests of engine use the same HTTP configuration (it was not the case before, see proxy configuration in master)
Diffstat (limited to 'searx/network/__init__.py')
-rw-r--r--searx/network/__init__.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py
new file mode 100644
index 000000000..dbd31c781
--- /dev/null
+++ b/searx/network/__init__.py
@@ -0,0 +1,189 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+import asyncio
+import threading
+import concurrent.futures
+from time import time
+
+import httpx
+import h2.exceptions
+
+from .network import get_network, initialize
+from .client import LOOP
+from .raise_for_httperror import raise_for_httperror
+
+# queue.SimpleQueue: Support Python 3.6
+try:
+ from queue import SimpleQueue
+except ImportError:
+ from queue import Empty
+ from collections import deque
+
+ class SimpleQueue:
+ """Minimal backport of queue.SimpleQueue"""
+
+ def __init__(self):
+ self._queue = deque()
+ self._count = threading.Semaphore(0)
+
+ def put(self, item):
+ self._queue.append(item)
+ self._count.release()
+
+ def get(self):
+ if not self._count.acquire(True):
+ raise Empty
+ return self._queue.popleft()
+
+
+THREADLOCAL = threading.local()
+
+
+def reset_time_for_thread():
+ THREADLOCAL.total_time = 0
+
+
+def get_time_for_thread():
+ return THREADLOCAL.total_time
+
+
+def set_timeout_for_thread(timeout, start_time=None):
+ THREADLOCAL.timeout = timeout
+ THREADLOCAL.start_time = start_time
+
+
+def set_context_network_name(network_name):
+ THREADLOCAL.network = get_network(network_name)
+
+
+def get_context_network():
+ try:
+ return THREADLOCAL.network
+ except AttributeError:
+ return get_network()
+
+
+def request(method, url, **kwargs):
+ """same as requests/requests/api.py request(...)"""
+ time_before_request = time()
+
+ # timeout (httpx)
+ if 'timeout' in kwargs:
+ timeout = kwargs['timeout']
+ else:
+ timeout = getattr(THREADLOCAL, 'timeout', None)
+ if timeout is not None:
+ kwargs['timeout'] = timeout
+
+ # 2 minutes timeout for the requests without timeout
+ timeout = timeout or 120
+
+ # ajdust actual timeout
+ timeout += 0.2 # overhead
+ start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
+ if start_time:
+ timeout -= time() - start_time
+
+ # raise_for_error
+ check_for_httperror = True
+ if 'raise_for_httperror' in kwargs:
+ check_for_httperror = kwargs['raise_for_httperror']
+ del kwargs['raise_for_httperror']
+
+ # requests compatibility
+ if isinstance(url, bytes):
+ url = url.decode()
+
+ # network
+ network = get_context_network()
+
+ # do request
+ future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), LOOP)
+ try:
+ response = future.result(timeout)
+ except concurrent.futures.TimeoutError as e:
+ raise httpx.TimeoutException('Timeout', request=None) from e
+
+ # requests compatibility
+ # see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
+ response.ok = not response.is_error
+
+ # update total_time.
+ # See get_time_for_thread() and reset_time_for_thread()
+ if hasattr(THREADLOCAL, 'total_time'):
+ time_after_request = time()
+ THREADLOCAL.total_time += time_after_request - time_before_request
+
+ # raise an exception
+ if check_for_httperror:
+ raise_for_httperror(response)
+
+ return response
+
+
+def get(url, **kwargs):
+ kwargs.setdefault('allow_redirects', True)
+ return request('get', url, **kwargs)
+
+
+def options(url, **kwargs):
+ kwargs.setdefault('allow_redirects', True)
+ return request('options', url, **kwargs)
+
+
+def head(url, **kwargs):
+ kwargs.setdefault('allow_redirects', False)
+ return request('head', url, **kwargs)
+
+
+def post(url, data=None, **kwargs):
+ return request('post', url, data=data, **kwargs)
+
+
+def put(url, data=None, **kwargs):
+ return request('put', url, data=data, **kwargs)
+
+
+def patch(url, data=None, **kwargs):
+ return request('patch', url, data=data, **kwargs)
+
+
+def delete(url, **kwargs):
+ return request('delete', url, **kwargs)
+
+
+async def stream_chunk_to_queue(network, q, method, url, **kwargs):
+ try:
+ async with network.stream(method, url, **kwargs) as response:
+ q.put(response)
+ async for chunk in response.aiter_bytes(65536):
+ if len(chunk) > 0:
+ q.put(chunk)
+ except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e:
+ q.put(e)
+ finally:
+ q.put(None)
+
+
+def stream(method, url, **kwargs):
+ """Replace httpx.stream.
+
+ Usage:
+ stream = poolrequests.stream(...)
+ response = next(stream)
+ for chunk in stream:
+ ...
+
+ httpx.Client.stream requires to write the httpx.HTTPTransport version of the
+ the httpx.AsyncHTTPTransport declared above.
+ """
+ q = SimpleQueue()
+ future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(get_network(), q, method, url, **kwargs),
+ LOOP)
+ chunk_or_exception = q.get()
+ while chunk_or_exception is not None:
+ if isinstance(chunk_or_exception, Exception):
+ raise chunk_or_exception
+ yield chunk_or_exception
+ chunk_or_exception = q.get()
+ return future.result()