summaryrefslogtreecommitdiff
path: root/searx/botdetection/link_token.py
blob: 4273e22c5b234ba3b1d6b3b636ce9d33a40fc012 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Method ``link_token``
---------------------

The ``link_token`` method evaluates a request as :py:obj:`suspicious
<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the
client.  By adding a random component (the token) in the URL, a bot can not send
a ping by request a static URL.

.. note::

   This method requires a redis DB and needs a HTTP X-Forwarded-For_ header.

To get in use of this method a flask URL route needs to be added:

.. code:: python

   @app.route('/client<token>.css', methods=['GET', 'POST'])
   def client_token(token=None):
       link_token.ping(request, token)
       return Response('', mimetype='text/css')

And in the HTML template from flask a stylesheet link is needed (the value of
``link_token`` comes from :py:obj:`get_token`):

.. code:: html

   <link rel="stylesheet"
         href="{{ url_for('client_token', token=link_token) }}"
         type="text/css" >

.. _X-Forwarded-For:
   https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For

"""
from __future__ import annotations
from ipaddress import (
    IPv4Network,
    IPv6Network,
    ip_address,
)

import string
import random
import flask

from searx import logger
from searx import redisdb
from searx.redislib import secret_hash

from ._helpers import (
    get_network,
    get_real_ip,
)

TOKEN_LIVE_TIME = 600
"""Lifetime (sec) of limiter's CSS token."""

PING_LIVE_TIME = 3600
"""Lifetime (sec) of the ping-key from a client (request)"""

PING_KEY = 'SearXNG_limiter.ping'
"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`"""

TOKEN_KEY = 'SearXNG_limiter.token'
"""Key for which the current token is stored in the DB"""

logger = logger.getChild('botdetection.link_token')


def is_suspicious(network: IPv4Network | IPv6Network, request: flask.Request, renew: bool = False):
    """Checks whether a valid ping is exists for this (client) network, if not
    this request is rated as *suspicious*.  If a valid ping exists and argument
    ``renew`` is ``True`` the expire time of this ping is reset to
    :py:obj:`PING_LIVE_TIME`.

    """
    redis_client = redisdb.client()
    if not redis_client:
        return False

    ping_key = get_ping_key(network, request)
    if not redis_client.get(ping_key):
        logger.info("missing ping (IP: %s) / request: %s", network.compressed, ping_key)
        return True

    if renew:
        redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)

    logger.debug("found ping for (client) network %s -> %s", network.compressed, ping_key)
    return False


def ping(request: flask.Request, token: str):
    """This function is called by a request to URL ``/client<token>.css``.  If
    ``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB.
    The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`.

    """
    from . import redis_client, cfg  # pylint: disable=import-outside-toplevel, cyclic-import

    if not redis_client:
        return
    if not token_is_valid(token):
        return

    real_ip = ip_address(get_real_ip(request))
    network = get_network(real_ip, cfg)

    ping_key = get_ping_key(network, request)
    logger.debug("store ping_key for (client) network %s (IP %s) -> %s", network.compressed, real_ip, ping_key)
    redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)


def get_ping_key(network: IPv4Network | IPv6Network, request: flask.Request) -> str:
    """Generates a hashed key that fits (more or less) to a *WEB-browser
    session* in a network."""
    return (
        PING_KEY
        + "["
        + secret_hash(
            network.compressed + request.headers.get('Accept-Language', '') + request.headers.get('User-Agent', '')
        )
        + "]"
    )


def token_is_valid(token) -> bool:
    valid = token == get_token()
    logger.debug("token is valid --> %s", valid)
    return valid


def get_token() -> str:
    """Returns current token.  If there is no currently active token a new token
    is generated randomly and stored in the redis DB.

    - :py:obj:`TOKEN_LIVE_TIME`
    - :py:obj:`TOKEN_KEY`

    """
    redis_client = redisdb.client()
    if not redis_client:
        # This function is also called when limiter is inactive / no redis DB
        # (see render function in webapp.py)
        return '12345678'
    token = redis_client.get(TOKEN_KEY)
    if token:
        token = token.decode('UTF-8')
    else:
        token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16))
        redis_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME)
    return token