diff options
author | Alexandre Flament <alex@al-f.net> | 2021-04-05 10:43:33 +0200 |
---|---|---|
committer | Alexandre Flament <alex@al-f.net> | 2021-04-12 17:25:56 +0200 |
commit | d14994dc73ba5c95382812581dac146d9eceaafa (patch) | |
tree | 2f7720dbae8f1064fe479f986f0b198aff2beb99 /searx/network/client.py | |
parent | eaa694fb7d0e47b943bc6d6edb6cb6a40ab2d85e (diff) | |
download | searxng-d14994dc73ba5c95382812581dac146d9eceaafa.tar.gz searxng-d14994dc73ba5c95382812581dac146d9eceaafa.zip |
[httpx] replace searx.poolrequests by searx.network
settings.yml:
* outgoing.networks:
* can contains network definition
* propertiers: enable_http, verify, http2, max_connections, max_keepalive_connections,
keepalive_expiry, local_addresses, support_ipv4, support_ipv6, proxies, max_redirects, retries
* retries: 0 by default, number of times searx retries to send the HTTP request (using different IP & proxy each time)
* local_addresses can be "192.168.0.1/24" (it supports IPv6)
* support_ipv4 & support_ipv6: both True by default
see https://github.com/searx/searx/pull/1034
* each engine can define a "network" section:
* either a full network description
* either reference an existing network
* all HTTP requests of engine use the same HTTP configuration (it was not the case before, see proxy configuration in master)
Diffstat (limited to 'searx/network/client.py')
-rw-r--r-- | searx/network/client.py | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/searx/network/client.py b/searx/network/client.py new file mode 100644 index 000000000..631e36f8f --- /dev/null +++ b/searx/network/client.py @@ -0,0 +1,214 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +import asyncio +import logging +import threading + +import httpcore +import httpx +from httpx_socks import AsyncProxyTransport +from python_socks import parse_proxy_url +import python_socks._errors + +from searx import logger + +# Optional uvloop (support Python 3.6) +try: + import uvloop +except ImportError: + pass +else: + uvloop.install() + + +logger = logger.getChild('searx.http.client') +LOOP = None +TRANSPORT_KWARGS = { + 'backend': 'asyncio', + 'trust_env': False, +} + + +async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL): + origin = httpcore._utils.url_to_origin(url) + logger.debug('Drop connections for %r', origin) + connections_to_close = connection_pool._connections_for_origin(origin) + for connection in connections_to_close: + await connection_pool._remove_from_pool(connection) + try: + await connection.aclose() + except httpcore.NetworkError as e: + logger.warning('Error closing an existing connection', exc_info=e) + + +class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport): + """Block HTTP request""" + + async def arequest(self, method, url, headers=None, stream=None, ext=None): + raise httpcore.UnsupportedProtocol("HTTP protocol is disabled") + + +class AsyncProxyTransportFixed(AsyncProxyTransport): + """Fix httpx_socks.AsyncProxyTransport + + Map python_socks exceptions to httpcore.ProxyError + + Map socket.gaierror to httpcore.ConnectError + + Note: keepalive_expiry is ignored, AsyncProxyTransport should call: + * self._keepalive_sweep() + * self._response_closed(self, connection) + + Note: AsyncProxyTransport inherit from AsyncConnectionPool + + Note: the API is going to change on httpx 0.18.0 + see https://github.com/encode/httpx/pull/1522 + """ + + async def arequest(self, method, url, headers=None, stream=None, ext=None): + retry = 2 + while retry > 0: + retry -= 1 + try: + return await super().arequest(method, url, headers, stream, ext) + except (python_socks._errors.ProxyConnectionError, + python_socks._errors.ProxyTimeoutError, + python_socks._errors.ProxyError) as e: + raise httpcore.ProxyError(e) + except OSError as e: + # socket.gaierror when DNS resolution fails + raise httpcore.NetworkError(e) + except httpcore.RemoteProtocolError as e: + # in case of httpcore.RemoteProtocolError: Server disconnected + await close_connections_for_url(self, url) + logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e) + # retry + except (httpcore.NetworkError, httpcore.ProtocolError) as e: + # httpcore.WriteError on HTTP/2 connection leaves a new opened stream + # then each new request creates a new stream and raise the same WriteError + await close_connections_for_url(self, url) + raise e + + +class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport): + """Fix httpx.AsyncHTTPTransport""" + + async def arequest(self, method, url, headers=None, stream=None, ext=None): + retry = 2 + while retry > 0: + retry -= 1 + try: + return await super().arequest(method, url, headers, stream, ext) + except OSError as e: + # socket.gaierror when DNS resolution fails + raise httpcore.ConnectError(e) + except httpcore.CloseError as e: + # httpcore.CloseError: [Errno 104] Connection reset by peer + # raised by _keepalive_sweep() + # from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa + await close_connections_for_url(self._pool, url) + logger.warning('httpcore.CloseError: retry', exc_info=e) + # retry + except httpcore.RemoteProtocolError as e: + # in case of httpcore.RemoteProtocolError: Server disconnected + await close_connections_for_url(self._pool, url) + logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e) + # retry + except (httpcore.ProtocolError, httpcore.NetworkError) as e: + await close_connections_for_url(self._pool, url) + raise e + + +def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries): + global LOOP, TRANSPORT_KWARGS + # support socks5h (requests compatibility): + # https://requests.readthedocs.io/en/master/user/advanced/#socks + # socks5:// hostname is resolved on client side + # socks5h:// hostname is resolved on proxy side + rdns = False + socks5h = 'socks5h://' + if proxy_url.startswith(socks5h): + proxy_url = 'socks5://' + proxy_url[len(socks5h):] + rdns = True + + proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url) + + return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port, + username=proxy_username, password=proxy_password, + rdns=rdns, + loop=LOOP, + verify=verify, + http2=http2, + local_address=local_address, + max_connections=limit.max_connections, + max_keepalive_connections=limit.max_keepalive_connections, + keepalive_expiry=limit.keepalive_expiry, + retries=retries, + **TRANSPORT_KWARGS) + + +def get_transport(verify, http2, local_address, proxy_url, limit, retries): + return AsyncHTTPTransportFixed(verify=verify, + http2=http2, + local_address=local_address, + proxy=httpx._config.Proxy(proxy_url) if proxy_url else None, + limits=limit, + retries=retries, + **TRANSPORT_KWARGS) + + +def iter_proxies(proxies): + # https://www.python-httpx.org/compatibility/#proxy-keys + if isinstance(proxies, str): + yield 'all://', proxies + elif isinstance(proxies, dict): + for pattern, proxy_url in proxies.items(): + yield pattern, proxy_url + + +def new_client(enable_http, verify, enable_http2, + max_connections, max_keepalive_connections, keepalive_expiry, + proxies, local_address, retries, max_redirects): + limit = httpx.Limits(max_connections=max_connections, + max_keepalive_connections=max_keepalive_connections, + keepalive_expiry=keepalive_expiry) + # See https://www.python-httpx.org/advanced/#routing + mounts = {} + for pattern, proxy_url in iter_proxies(proxies): + if not enable_http and (pattern == 'http' or pattern.startswith('http://')): + continue + if proxy_url.startswith('socks4://') \ + or proxy_url.startswith('socks5://') \ + or proxy_url.startswith('socks5h://'): + mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit, + retries) + else: + mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries) + + if not enable_http: + mounts['http://'] = AsyncHTTPTransportNoHttp() + + transport = get_transport(verify, enable_http2, local_address, None, limit, retries) + return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects) + + +def init(): + # log + for logger_name in ('hpack.hpack', 'hpack.table'): + logging.getLogger(logger_name).setLevel(logging.WARNING) + + # loop + def loop_thread(): + global LOOP + LOOP = asyncio.new_event_loop() + LOOP.run_forever() + + th = threading.Thread( + target=loop_thread, + name='asyncio_loop', + daemon=True, + ) + th.start() + + +init() |