summaryrefslogtreecommitdiff
path: root/qutebrowser/misc/throttle.py
blob: 45ff0f4613778cc993029d0e0e2970a60ffa23ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Copyright 2018-2021 Jay Kamat <jaygkamat@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later

"""A throttle for throttling function calls."""

import dataclasses
import time
from typing import Any, Callable, Mapping, Optional, Sequence

from qutebrowser.qt.core import QObject

from qutebrowser.utils import usertypes


@dataclasses.dataclass
class _CallArgs:

    args: Sequence[Any]
    kwargs: Mapping[str, Any]


class Throttle(QObject):

    """A throttle to throttle calls.

    If a request comes in, it will be processed immediately. If another request
    comes in too soon, it is ignored, but will be processed when a timeout
    ends. If another request comes in, it will update the pending request.
    """

    def __init__(self,
                 func: Callable[..., None],
                 delay_ms: int,
                 parent: QObject = None) -> None:
        """Constructor.

        Args:
            delay_ms: The time to wait before allowing another call of the
                         function. -1 disables the wrapper.
            func: The function/method to call on __call__.
            parent: The parent object.
        """
        super().__init__(parent)
        self._delay_ms = delay_ms
        self._func = func
        self._pending_call: Optional[_CallArgs] = None
        self._last_call_ms: Optional[int] = None
        self._timer = usertypes.Timer(self, 'throttle-timer')
        self._timer.setSingleShot(True)

    def _call_pending(self) -> None:
        """Start a pending call."""
        assert self._pending_call is not None
        self._func(*self._pending_call.args, **self._pending_call.kwargs)
        self._pending_call = None
        self._last_call_ms = int(time.monotonic() * 1000)

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        cur_time_ms = int(time.monotonic() * 1000)
        if self._pending_call is None:
            if (self._last_call_ms is None or
                    cur_time_ms - self._last_call_ms > self._delay_ms):
                # Call right now
                self._last_call_ms = cur_time_ms
                self._func(*args, **kwargs)
                return

            self._timer.setInterval(self._delay_ms -
                                    (cur_time_ms - self._last_call_ms))
            # Disconnect any existing calls, continue if no connections.
            try:
                self._timer.timeout.disconnect()
            except TypeError:
                pass
            self._timer.timeout.connect(self._call_pending)
            self._timer.start()

        # Update arguments for an existing pending call
        self._pending_call = _CallArgs(args=args, kwargs=kwargs)

    def set_delay(self, delay_ms: int) -> None:
        """Set the delay to wait between invocation of this function."""
        self._delay_ms = delay_ms

    def cancel(self) -> None:
        """Cancel any pending instance of this timer."""
        self._timer.stop()