1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
#!/usr/bin/env python
# lint: pylint
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Update searx/data/external_bangs.json using the duckduckgo bangs.
https://duckduckgo.com/newbang loads
* a javascript which provides the bang version ( https://duckduckgo.com/bv1.js )
* a JSON file which contains the bangs ( https://duckduckgo.com/bang.v260.js for example )
This script loads the javascript, then the bangs.
The javascript URL may change in the future ( for example https://duckduckgo.com/bv2.js ),
but most probably it will requires to update RE_BANG_VERSION
"""
# pylint: disable=C0116
import json
import re
from os.path import join
import httpx
from searx import searx_dir # pylint: disable=E0401 C0413
# from https://duckduckgo.com/newbang
URL_BV1 = 'https://duckduckgo.com/bv1.js'
RE_BANG_VERSION = re.compile(r'\/bang\.v([0-9]+)\.js')
HTTPS_COLON = 'https:'
HTTP_COLON = 'http:'
def get_bang_url():
response = httpx.get(URL_BV1)
response.raise_for_status()
r = RE_BANG_VERSION.findall(response.text)
return f'https://duckduckgo.com/bang.v{r[0]}.js', r[0]
def fetch_ddg_bangs(url):
response = httpx.get(url)
response.raise_for_status()
return json.loads(response.content.decode())
def merge_when_no_leaf(node):
"""Minimize the number of nodes
A -> B -> C
B is child of A
C is child of B
If there are no C equals to '*', then each C are merged into A
For example:
d -> d -> g -> * (ddg*)
-> i -> g -> * (dig*)
becomes
d -> dg -> *
-> ig -> *
"""
restart = False
if not isinstance(node, dict):
return
# create a copy of the keys so node can be modified
keys = list(node.keys())
for key in keys:
if key == '*':
continue
value = node[key]
value_keys = list(value.keys())
if '*' not in value_keys:
for value_key in value_keys:
node[key + value_key] = value[value_key]
merge_when_no_leaf(node[key + value_key])
del node[key]
restart = True
else:
merge_when_no_leaf(value)
if restart:
merge_when_no_leaf(node)
def optimize_leaf(parent, parent_key, node):
if not isinstance(node, dict):
return
if len(node) == 1 and '*' in node and parent is not None:
parent[parent_key] = node['*']
else:
for key, value in node.items():
optimize_leaf(node, key, value)
def parse_ddg_bangs(ddg_bangs):
bang_trie = {}
bang_urls = {}
for bang_definition in ddg_bangs:
# bang_list
bang_url = bang_definition['u']
if '{{{s}}}' not in bang_url:
# ignore invalid bang
continue
bang_url = bang_url.replace('{{{s}}}', chr(2))
# only for the https protocol: "https://example.com" becomes "//example.com"
if bang_url.startswith(HTTPS_COLON + '//'):
bang_url = bang_url[len(HTTPS_COLON):]
#
if bang_url.startswith(HTTP_COLON + '//') and bang_url[len(HTTP_COLON):] in bang_urls:
# if the bang_url uses the http:// protocol, and the same URL exists in https://
# then reuse the https:// bang definition. (written //example.com)
bang_def_output = bang_urls[bang_url[len(HTTP_COLON):]]
else:
# normal use case : new http:// URL or https:// URL (without "https:", see above)
bang_rank = str(bang_definition['r'])
bang_def_output = bang_url + chr(1) + bang_rank
bang_def_output = bang_urls.setdefault(bang_url, bang_def_output)
bang_urls[bang_url] = bang_def_output
# bang name
bang = bang_definition['t']
# bang_trie
t = bang_trie
for bang_letter in bang:
t = t.setdefault(bang_letter, {})
t = t.setdefault('*', bang_def_output)
# optimize the trie
merge_when_no_leaf(bang_trie)
optimize_leaf(None, None, bang_trie)
return bang_trie
def get_bangs_filename():
return join(join(searx_dir, "data"), "external_bangs.json")
if __name__ == '__main__':
bangs_url, bangs_version = get_bang_url()
print(f'fetch bangs from {bangs_url}')
output = {
'version': bangs_version,
'trie': parse_ddg_bangs(fetch_ddg_bangs(bangs_url))
}
with open(get_bangs_filename(), 'w', encoding="utf8") as fp:
json.dump(output, fp, ensure_ascii=False, indent=4)
|