1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# SPDX-License-Identifier: AGPL-3.0-or-later
from collections.abc import Iterable
from json import loads
from urllib.parse import urlencode
from searx.utils import to_string, html_to_text
search_url = None
url_query = None
content_query = None
title_query = None
content_html_to_text = False
title_html_to_text = False
paging = False
suggestion_query = ''
results_query = ''
cookies = {}
headers = {}
'''Some engines might offer different result based on cookies or headers.
Possible use-case: To set safesearch cookie or header to moderate.'''
# parameters for engines with paging support
#
# number of results on each page
# (only needed if the site requires not a page number, but an offset)
page_size = 1
# number of the first page (usually 0 or 1)
first_page_num = 1
def iterate(iterable):
if type(iterable) == dict:
it = iterable.items()
else:
it = enumerate(iterable)
for index, value in it:
yield str(index), value
def is_iterable(obj):
if type(obj) == str:
return False
return isinstance(obj, Iterable)
def parse(query):
q = []
for part in query.split('/'):
if part == '':
continue
else:
q.append(part)
return q
def do_query(data, q):
ret = []
if not q:
return ret
qkey = q[0]
for key, value in iterate(data):
if len(q) == 1:
if key == qkey:
ret.append(value)
elif is_iterable(value):
ret.extend(do_query(value, q))
else:
if not is_iterable(value):
continue
if key == qkey:
ret.extend(do_query(value, q[1:]))
else:
ret.extend(do_query(value, q))
return ret
def query(data, query_string):
q = parse(query_string)
return do_query(data, q)
def request(query, params):
query = urlencode({'q': query})[2:]
fp = {'query': query}
if paging and search_url.find('{pageno}') >= 0:
fp['pageno'] = (params['pageno'] - 1) * page_size + first_page_num
params['cookies'].update(cookies)
params['headers'].update(headers)
params['url'] = search_url.format(**fp)
params['query'] = query
return params
def identity(arg):
return arg
def response(resp):
results = []
json = loads(resp.text)
title_filter = html_to_text if title_html_to_text else identity
content_filter = html_to_text if content_html_to_text else identity
if results_query:
rs = query(json, results_query)
if not len(rs):
return results
for result in rs[0]:
try:
url = query(result, url_query)[0]
title = query(result, title_query)[0]
except:
continue
try:
content = query(result, content_query)[0]
except:
content = ""
results.append(
{
'url': to_string(url),
'title': title_filter(to_string(title)),
'content': content_filter(to_string(content)),
}
)
else:
for url, title, content in zip(query(json, url_query), query(json, title_query), query(json, content_query)):
results.append(
{
'url': to_string(url),
'title': title_filter(to_string(title)),
'content': content_filter(to_string(content)),
}
)
if not suggestion_query:
return results
for suggestion in query(json, suggestion_query):
results.append({'suggestion': suggestion})
return results
|