1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# SPDX-License-Identifier: AGPL-3.0-or-later
from collections.abc import Iterable
from json import loads
from urllib.parse import urlencode
from searx.utils import to_string
search_url = None
url_query = None
content_query = None
title_query = None
paging = False
suggestion_query = ''
results_query = ''
# parameters for engines with paging support
#
# number of results on each page
# (only needed if the site requires not a page number, but an offset)
page_size = 1
# number of the first page (usually 0 or 1)
first_page_num = 1
def iterate(iterable):
if type(iterable) == dict:
it = iterable.items()
else:
it = enumerate(iterable)
for index, value in it:
yield str(index), value
def is_iterable(obj):
if type(obj) == str:
return False
return isinstance(obj, Iterable)
def parse(query):
q = []
for part in query.split('/'):
if part == '':
continue
else:
q.append(part)
return q
def do_query(data, q):
ret = []
if not q:
return ret
qkey = q[0]
for key, value in iterate(data):
if len(q) == 1:
if key == qkey:
ret.append(value)
elif is_iterable(value):
ret.extend(do_query(value, q))
else:
if not is_iterable(value):
continue
if key == qkey:
ret.extend(do_query(value, q[1:]))
else:
ret.extend(do_query(value, q))
return ret
def query(data, query_string):
q = parse(query_string)
return do_query(data, q)
def request(query, params):
query = urlencode({'q': query})[2:]
fp = {'query': query}
if paging and search_url.find('{pageno}') >= 0:
fp['pageno'] = (params['pageno'] - 1) * page_size + first_page_num
params['url'] = search_url.format(**fp)
params['query'] = query
return params
def response(resp):
results = []
json = loads(resp.text)
if results_query:
rs = query(json, results_query)
if not len(rs):
return results
for result in rs[0]:
try:
url = query(result, url_query)[0]
title = query(result, title_query)[0]
except:
continue
try:
content = query(result, content_query)[0]
except:
content = ""
results.append({
'url': to_string(url),
'title': to_string(title),
'content': to_string(content),
})
else:
for url, title, content in zip(
query(json, url_query),
query(json, title_query),
query(json, content_query)
):
results.append({
'url': to_string(url),
'title': to_string(title),
'content': to_string(content),
})
if not suggestion_query:
return results
for suggestion in query(json, suggestion_query):
results.append({'suggestion': suggestion})
return results
|