1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
import sys
from time import time
from itertools import cycle
from threading import local
import requests
from searx import settings
from searx import logger
from searx.raise_for_httperror import raise_for_httperror
logger = logger.getChild('poolrequests')
try:
import ssl
if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
# https://github.com/certifi/python-certifi#1024-bit-root-certificates
logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
.format(ssl.OPENSSL_VERSION))
sys.exit(1)
except ImportError:
ssl = None
if not getattr(ssl, "HAS_SNI", False):
try:
import OpenSSL # pylint: disable=unused-import
except ImportError:
logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
"Some HTTPS connections will fail")
sys.exit(1)
class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
max_retries=requests.adapters.DEFAULT_RETRIES,
pool_block=requests.adapters.DEFAULT_POOLBLOCK,
**conn_params):
if max_retries == requests.adapters.DEFAULT_RETRIES:
self.max_retries = requests.adapters.Retry(0, read=False)
else:
self.max_retries = requests.adapters.Retry.from_int(max_retries)
self.config = {}
self.proxy_manager = {}
super().__init__()
self._pool_connections = pool_connections
self._pool_maxsize = pool_maxsize
self._pool_block = pool_block
self._conn_params = conn_params
self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
def __setstate__(self, state):
# Can't handle by adding 'proxy_manager' to self.__attrs__ because
# because self.poolmanager uses a lambda function, which isn't pickleable.
self.proxy_manager = {}
self.config = {}
for attr, value in state.items():
setattr(self, attr, value)
self.init_poolmanager(self._pool_connections, self._pool_maxsize,
block=self._pool_block, **self._conn_params)
threadLocal = local()
connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
if settings['outgoing'].get('source_ips'):
http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
source_address=(source_ip, 0))
for source_ip in settings['outgoing']['source_ips'])
https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
source_address=(source_ip, 0))
for source_ip in settings['outgoing']['source_ips'])
else:
http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
class SessionSinglePool(requests.Session):
def __init__(self):
super().__init__()
# reuse the same adapters
self.adapters.clear()
https_adapter = threadLocal.__dict__.setdefault('https_adapter', next(https_adapters))
self.mount('https://', https_adapter)
if get_enable_http_protocol():
http_adapter = threadLocal.__dict__.setdefault('http_adapter', next(http_adapters))
self.mount('http://', http_adapter)
def close(self):
"""Call super, but clear adapters since there are managed globaly"""
self.adapters.clear()
super().close()
def set_timeout_for_thread(timeout, start_time=None):
threadLocal.timeout = timeout
threadLocal.start_time = start_time
def set_enable_http_protocol(enable_http):
threadLocal.enable_http = enable_http
def get_enable_http_protocol():
try:
return threadLocal.enable_http
except AttributeError:
return False
def reset_time_for_thread():
threadLocal.total_time = 0
def get_time_for_thread():
return threadLocal.total_time
def get_proxy_cycles(proxy_settings):
if not proxy_settings:
return None
# Backwards compatibility for single proxy in settings.yml
for protocol, proxy in proxy_settings.items():
if isinstance(proxy, str):
proxy_settings[protocol] = [proxy]
for protocol in proxy_settings:
proxy_settings[protocol] = cycle(proxy_settings[protocol])
return proxy_settings
GLOBAL_PROXY_CYCLES = get_proxy_cycles(settings['outgoing'].get('proxies'))
def get_proxies(proxy_cycles):
if proxy_cycles:
return {protocol: next(proxy_cycle) for protocol, proxy_cycle in proxy_cycles.items()}
return None
def get_global_proxies():
return get_proxies(GLOBAL_PROXY_CYCLES)
def request(method, url, **kwargs):
"""same as requests/requests/api.py request(...)"""
time_before_request = time()
# session start
session = SessionSinglePool()
# proxies
if not kwargs.get('proxies'):
kwargs['proxies'] = get_global_proxies()
# timeout
if 'timeout' in kwargs:
timeout = kwargs['timeout']
else:
timeout = getattr(threadLocal, 'timeout', None)
if timeout is not None:
kwargs['timeout'] = timeout
# raise_for_error
check_for_httperror = True
if 'raise_for_httperror' in kwargs:
check_for_httperror = kwargs['raise_for_httperror']
del kwargs['raise_for_httperror']
# do request
response = session.request(method=method, url=url, **kwargs)
time_after_request = time()
# is there a timeout for this engine ?
if timeout is not None:
timeout_overhead = 0.2 # seconds
# start_time = when the user request started
start_time = getattr(threadLocal, 'start_time', time_before_request)
search_duration = time_after_request - start_time
if search_duration > timeout + timeout_overhead:
raise requests.exceptions.Timeout(response=response)
# session end
session.close()
if hasattr(threadLocal, 'total_time'):
threadLocal.total_time += time_after_request - time_before_request
# raise an exception
if check_for_httperror:
raise_for_httperror(response)
return response
def get(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
def options(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
def head(url, **kwargs):
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
def post(url, data=None, **kwargs):
return request('post', url, data=data, **kwargs)
def put(url, data=None, **kwargs):
return request('put', url, data=data, **kwargs)
def patch(url, data=None, **kwargs):
return request('patch', url, data=data, **kwargs)
def delete(url, **kwargs):
return request('delete', url, **kwargs)
|