summaryrefslogtreecommitdiff
path: root/qutebrowser/misc/httpclient.py
blob: d401c83c166a89deac38df41e4f7bc9ec0b23db6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:

# Copyright 2014-2021 Florian Bruhin (The Compiler) <mail@qutebrowser.org>
#
# This file is part of qutebrowser.
#
# qutebrowser is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# qutebrowser is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with qutebrowser.  If not, see <https://www.gnu.org/licenses/>.

"""An HTTP client based on QNetworkAccessManager."""

import functools
import urllib.parse
from typing import MutableMapping

from qutebrowser.utils import log
from qutebrowser.qt import QtNetwork, QtCore


class HTTPRequest(QtNetwork.QNetworkRequest):
    """A QNetworkRquest that follows (secure) redirects by default."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.setAttribute(QtNetwork.QNetworkRequest.RedirectPolicyAttribute,
                          QtNetwork.QNetworkRequest.NoLessSafeRedirectPolicy)


class HTTPClient(QtCore.QObject):

    """An HTTP client based on QNetworkAccessManager.

    Intended for APIs, automatically decodes data.

    Attributes:
        _nam: The QNetworkAccessManager used.
        _timers: A {QNetworkReply: QTimer} dict.

    Signals:
        success: Emitted when the operation succeeded.
                 arg: The received data.
        error: Emitted when the request failed.
               arg: The error message, as string.
    """

    success = QtCore.pyqtSignal(str)
    error = QtCore.pyqtSignal(str)

    def __init__(self, parent=None):
        super().__init__(parent)
        with log.disable_qt_msghandler():
            # WORKAROUND for a hang when messages are printed, see our
            # NetworkAccessManager subclass for details.
            self._nam = QtNetwork.QNetworkAccessManager(self)
        self._timers: MutableMapping[QtNetwork.QNetworkReply, QtCore.QTimer] = {}

    def post(self, url, data=None):
        """Create a new POST request.

        Args:
            url: The URL to post to, as QUrl.
            data: A dict of data to send.
        """
        if data is None:
            data = {}
        encoded_data = urllib.parse.urlencode(data).encode('utf-8')
        request = HTTPRequest(url)
        request.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader,
                          'application/x-www-form-urlencoded;charset=utf-8')
        reply = self._nam.post(request, encoded_data)
        self._handle_reply(reply)

    def get(self, url):
        """Create a new GET request.

        Emits success/error when done.

        Args:
            url: The URL to access, as QUrl.
        """
        request = HTTPRequest(url)
        reply = self._nam.get(request)
        self._handle_reply(reply)

    def _handle_reply(self, reply):
        """Handle a new QNetworkReply."""
        if reply.isFinished():
            self.on_reply_finished(reply)
        else:
            timer = QtCore.QTimer(self)
            timer.setInterval(10000)
            timer.timeout.connect(reply.abort)
            timer.start()
            self._timers[reply] = timer
            reply.finished.connect(functools.partial(
                self.on_reply_finished, reply))

    def on_reply_finished(self, reply):
        """Read the data and finish when the reply finished.

        Args:
            reply: The QNetworkReply which finished.
        """
        timer = self._timers.pop(reply)
        if timer is not None:
            timer.stop()
            timer.deleteLater()
        if reply.error() != QtNetwork.QNetworkReply.NoError:
            self.error.emit(reply.errorString())
            return
        try:
            data = bytes(reply.readAll()).decode('utf-8')
        except UnicodeDecodeError:
            self.error.emit("Invalid UTF-8 data received in reply!")
            return
        self.success.emit(data)