summaryrefslogtreecommitdiff
path: root/searx/engines/sqlite.py
blob: 84db74d627ae44d6173acd2bc246efca520e92fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-function-docstring

"""SQLite database (Offline)

"""

import sqlite3
import contextlib

from searx import logger


logger = logger.getChild('SQLite engine')

engine_type = 'offline'
database = ""
query_str = ""
limit = 10
paging = True
result_template = 'key-value.html'


def init(engine_settings):
    if 'query_str' not in engine_settings:
        raise ValueError('query_str cannot be empty')

    if not engine_settings['query_str'].lower().startswith('select '):
        raise ValueError('only SELECT query is supported')


@contextlib.contextmanager
def sqlite_cursor():
    """Implements a `Context Manager`_ for a :py:obj:`sqlite3.Cursor`.

    Open database in read only mode: if the database doesn't exist.
    The default mode creates an empty file on the file system.

    see:
    * https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
    * https://www.sqlite.org/uri.html
    """
    global database  # pylint: disable=global-statement
    uri = 'file:' + database + '?mode=ro'
    with contextlib.closing(sqlite3.connect(uri, uri=True)) as connect:
        connect.row_factory = sqlite3.Row
        with contextlib.closing(connect.cursor()) as cursor:
            yield cursor


def search(query, params):
    global query_str, result_template  # pylint: disable=global-statement
    results = []

    query_params = {
        'query': query,
        'wildcard':  r'%' + query.replace(' ', r'%') + r'%',
        'limit': limit,
        'offset': (params['pageno'] - 1) * limit
    }
    query_to_run = query_str + ' LIMIT :limit OFFSET :offset'

    with sqlite_cursor() as cur:

        cur.execute(query_to_run, query_params)
        col_names = [cn[0] for cn in cur.description]

        for row in cur.fetchall():
            item = dict( zip(col_names, map(str, row)) )
            item['template'] = result_template
            logger.debug("append result --> %s", item)
            results.append(item)

    return results