diff options
Diffstat (limited to 'searx/network/__init__.py')
-rw-r--r-- | searx/network/__init__.py | 23 |
1 files changed, 22 insertions, 1 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py index aaebb7928..9e80a30a1 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -5,6 +5,7 @@ import asyncio import threading import concurrent.futures +from types import MethodType from timeit import default_timer import httpx @@ -161,6 +162,7 @@ def patch(url, data=None, **kwargs): def delete(url, **kwargs): return request('delete', url, **kwargs) + async def stream_chunk_to_queue(network, queue, method, url, **kwargs): try: async with network.stream(method, url, **kwargs) as response: @@ -170,12 +172,22 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs): async for chunk in response.aiter_raw(65536): if len(chunk) > 0: queue.put(chunk) + except httpx.ResponseClosed as e: + # the response was closed + pass except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e: queue.put(e) finally: queue.put(None) +def _close_response_method(self): + asyncio.run_coroutine_threadsafe( + self.aclose(), + get_loop() + ) + + def stream(method, url, **kwargs): """Replace httpx.stream. @@ -193,10 +205,19 @@ def stream(method, url, **kwargs): stream_chunk_to_queue(get_network(), queue, method, url, **kwargs), get_loop() ) + + # yield response + response = queue.get() + if isinstance(response, Exception): + raise response + response.close = MethodType(_close_response_method, response) + yield response + + # yield chunks chunk_or_exception = queue.get() while chunk_or_exception is not None: if isinstance(chunk_or_exception, Exception): raise chunk_or_exception yield chunk_or_exception chunk_or_exception = queue.get() - return future.result() + future.result() |