summaryrefslogtreecommitdiff
path: root/searx/network/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'searx/network/__init__.py')
-rw-r--r--searx/network/__init__.py23
1 files changed, 22 insertions, 1 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py
index aaebb7928..9e80a30a1 100644
--- a/searx/network/__init__.py
+++ b/searx/network/__init__.py
@@ -5,6 +5,7 @@
import asyncio
import threading
import concurrent.futures
+from types import MethodType
from timeit import default_timer
import httpx
@@ -161,6 +162,7 @@ def patch(url, data=None, **kwargs):
def delete(url, **kwargs):
return request('delete', url, **kwargs)
+
async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
try:
async with network.stream(method, url, **kwargs) as response:
@@ -170,12 +172,22 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
async for chunk in response.aiter_raw(65536):
if len(chunk) > 0:
queue.put(chunk)
+ except httpx.ResponseClosed as e:
+ # the response was closed
+ pass
except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e:
queue.put(e)
finally:
queue.put(None)
+def _close_response_method(self):
+ asyncio.run_coroutine_threadsafe(
+ self.aclose(),
+ get_loop()
+ )
+
+
def stream(method, url, **kwargs):
"""Replace httpx.stream.
@@ -193,10 +205,19 @@ def stream(method, url, **kwargs):
stream_chunk_to_queue(get_network(), queue, method, url, **kwargs),
get_loop()
)
+
+ # yield response
+ response = queue.get()
+ if isinstance(response, Exception):
+ raise response
+ response.close = MethodType(_close_response_method, response)
+ yield response
+
+ # yield chunks
chunk_or_exception = queue.get()
while chunk_or_exception is not None:
if isinstance(chunk_or_exception, Exception):
raise chunk_or_exception
yield chunk_or_exception
chunk_or_exception = queue.get()
- return future.result()
+ future.result()