summaryrefslogtreecommitdiff
path: root/searx/engines/mysql_server.py
blob: 369b2b7fec2760bbc5d47a9ef15bb4bbd149804f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# SPDX-License-Identifier: AGPL-3.0-or-later
"""MySQL is said to be the most popular open source database.  Before enabling
MySQL engine, you must install the package ``mysql-connector-python``.

The authentication plugin is configurable by setting ``auth_plugin`` in the
attributes.  By default it is set to ``caching_sha2_password``.

Example
=======

This is an example configuration for querying a MySQL server:

.. code:: yaml

   - name: my_database
     engine: mysql_server
     database: my_database
     username: searxng
     password: password
     limit: 5
     query_str: 'SELECT * from my_table WHERE my_column=%(query)s'

Implementations
===============

"""

try:
    import mysql.connector  # type: ignore
except ImportError:
    # import error is ignored because the admin has to install mysql manually to use
    # the engine
    pass

engine_type = 'offline'
auth_plugin = 'caching_sha2_password'
host = "127.0.0.1"
port = 3306
database = ""
username = ""
password = ""
query_str = ""
limit = 10
paging = True
result_template = 'key-value.html'
_connection = None


def init(engine_settings):
    global _connection  # pylint: disable=global-statement

    if 'query_str' not in engine_settings:
        raise ValueError('query_str cannot be empty')

    if not engine_settings['query_str'].lower().startswith('select '):
        raise ValueError('only SELECT query is supported')

    _connection = mysql.connector.connect(
        database=database,
        user=username,
        password=password,
        host=host,
        port=port,
        auth_plugin=auth_plugin,
    )


def search(query, params):
    query_params = {'query': query}
    query_to_run = query_str + ' LIMIT {0} OFFSET {1}'.format(limit, (params['pageno'] - 1) * limit)

    with _connection.cursor() as cur:
        cur.execute(query_to_run, query_params)

        return _fetch_results(cur)


def _fetch_results(cur):
    results = []
    for res in cur:
        result = dict(zip(cur.column_names, map(str, res)))
        result['template'] = result_template
        results.append(result)

    return results