summaryrefslogtreecommitdiff
path: root/searx/external_bang.py
blob: fde2f8eb5621315550dd0c1ca07b955f6cbb3749 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring

from urllib.parse import quote_plus, urlparse
from searx.data import EXTERNAL_BANGS

LEAF_KEY = chr(16)


def get_node(external_bangs_db, bang):
    node = external_bangs_db['trie']
    after = ''
    before = ''
    for bang_letter in bang:
        after += bang_letter
        if after in node and isinstance(node, dict):
            node = node[after]
            before += after
            after = ''
    return node, before, after


def get_bang_definition_and_ac(external_bangs_db, bang):
    node, before, after = get_node(external_bangs_db, bang)

    bang_definition = None
    bang_ac_list = []
    if after != '':
        for k in node:
            if k.startswith(after):
                bang_ac_list.append(before + k)
    elif isinstance(node, dict):
        bang_definition = node.get(LEAF_KEY)
        bang_ac_list = [before + k for k in node.keys() if k != LEAF_KEY]
    elif isinstance(node, str):
        bang_definition = node
        bang_ac_list = []

    return bang_definition, bang_ac_list


def resolve_bang_definition(bang_definition, query):
    url, rank = bang_definition.split(chr(1))
    if url.startswith('//'):
        url = 'https:' + url
    if query:
        url = url.replace(chr(2), quote_plus(query))
    else:
        # go to main instead of search page
        o = urlparse(url)
        url = o.scheme + '://' + o.netloc

    rank = int(rank) if len(rank) > 0 else 0
    return (url, rank)


def get_bang_definition_and_autocomplete(bang, external_bangs_db=None):  # pylint: disable=invalid-name
    if external_bangs_db is None:
        external_bangs_db = EXTERNAL_BANGS

    bang_definition, bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang)

    new_autocomplete = []
    current = [*bang_ac_list]
    done = set()
    while len(current) > 0:
        bang_ac = current.pop(0)
        done.add(bang_ac)

        current_bang_definition, current_bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang_ac)
        if current_bang_definition:
            _, order = resolve_bang_definition(current_bang_definition, '')
            new_autocomplete.append((bang_ac, order))
        for new_bang in current_bang_ac_list:
            if new_bang not in done and new_bang not in current:
                current.append(new_bang)

    new_autocomplete.sort(key=lambda t: (-t[1], t[0]))
    new_autocomplete = list(map(lambda t: t[0], new_autocomplete))

    return bang_definition, new_autocomplete


def get_bang_url(search_query, external_bangs_db=None):
    """
    Redirects if the user supplied a correct bang search.
    :param search_query: This is a search_query object which contains preferences and the submitted queries.
    :return: None if the bang was invalid, else a string of the redirect url.
    """
    ret_val = None

    if external_bangs_db is None:
        external_bangs_db = EXTERNAL_BANGS

    if search_query.external_bang:
        bang_definition, _ = get_bang_definition_and_ac(external_bangs_db, search_query.external_bang)
        if bang_definition and isinstance(bang_definition, str):
            ret_val = resolve_bang_definition(bang_definition, search_query.query)[0]

    return ret_val