diff options
author | Micah Lee <micah@micahflee.com> | 2021-08-20 14:43:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-20 14:43:21 -0700 |
commit | b66e742bc20ae977232fc53f22d485c10173ac2f (patch) | |
tree | c6eb66c340c34b45bac5525350c5eb5c43fe33a6 /cli/tests/test_cli_web.py | |
parent | 76104671c3eacbef53ad96fffd8a57512ab2d093 (diff) | |
parent | 02254b13bb4818745193092f2144fd83726d79e7 (diff) | |
download | onionshare-b66e742bc20ae977232fc53f22d485c10173ac2f.tar.gz onionshare-b66e742bc20ae977232fc53f22d485c10173ac2f.zip |
Merge pull request #1395 from onionshare/developv2.3.3
Version 2.3.3, merge develop into stable
Diffstat (limited to 'cli/tests/test_cli_web.py')
-rw-r--r-- | cli/tests/test_cli_web.py | 475 |
1 files changed, 465 insertions, 10 deletions
diff --git a/cli/tests/test_cli_web.py b/cli/tests/test_cli_web.py index 063bd43c..f8c96f9c 100644 --- a/cli/tests/test_cli_web.py +++ b/cli/tests/test_cli_web.py @@ -1,35 +1,57 @@ -import contextlib -import inspect -import io import os import random import re import socket -import sys +import subprocess +import time import zipfile import tempfile import base64 +import shutil +import sys +from io import BytesIO import pytest +from contextlib import contextmanager +from multiprocessing import Process +from urllib.request import urlopen, Request from werkzeug.datastructures import Headers +from werkzeug.exceptions import RequestedRangeNotSatisfiable from onionshare_cli.common import Common from onionshare_cli.web import Web +from onionshare_cli.web.share_mode import parse_range_header from onionshare_cli.settings import Settings from onionshare_cli.mode_settings import ModeSettings +import onionshare_cli.web.receive_mode + +# Stub requests.post, for receive mode webhook tests +webhook_url = None +webhook_data = None + + +def requests_post_stub(url, data, timeout, proxies): + global webhook_url, webhook_data + webhook_url = url + webhook_data = data + + +onionshare_cli.web.receive_mode.requests.post = requests_post_stub + DEFAULT_ZW_FILENAME_REGEX = re.compile(r"^onionshare_[a-z2-7]{6}.zip$") RANDOM_STR_REGEX = re.compile(r"^[a-z2-7]+$") def web_obj(temp_dir, common_obj, mode, num_files=0): - """ Creates a Web object, in either share mode or receive mode, ready for testing """ + """Creates a Web object, in either share mode or receive mode, ready for testing""" common_obj.settings = Settings(common_obj) mode_settings = ModeSettings(common_obj) web = Web(common_obj, False, mode_settings, mode) web.generate_password() web.running = True + web.cleanup_filenames == [] web.app.testing = True # Share mode @@ -81,7 +103,7 @@ class TestWeb: web = web_obj(temp_dir, common_obj, "share", 3) web.settings.set("share", "autostop_sharing", True) - assert web.running == True + assert web.running is True with web.app.test_client() as c: # Download the first time @@ -93,7 +115,7 @@ class TestWeb: or res.mimetype == "application/x-zip-compressed" ) - assert web.running == False + assert web.running is False def test_share_mode_autostop_sharing_off( self, temp_dir, common_obj, temp_file_1024 @@ -101,7 +123,7 @@ class TestWeb: web = web_obj(temp_dir, common_obj, "share", 3) web.settings.set("share", "autostop_sharing", False) - assert web.running == True + assert web.running is True with web.app.test_client() as c: # Download the first time @@ -112,7 +134,7 @@ class TestWeb: res.mimetype == "application/zip" or res.mimetype == "application/x-zip-compressed" ) - assert web.running == True + assert web.running is True def test_receive_mode(self, temp_dir, common_obj): web = web_obj(temp_dir, common_obj, "receive") @@ -134,6 +156,166 @@ class TestWeb: res.get_data() assert res.status_code == 200 + def test_receive_mode_webhook(self, temp_dir, common_obj): + global webhook_url, webhook_data + webhook_url = None + webhook_data = None + + web = web_obj(temp_dir, common_obj, "receive") + assert web.mode == "receive" + web.settings.set("receive", "webhook_url", "http://127.0.0.1:1337/example") + web.proxies = None + assert ( + web.settings.get("receive", "webhook_url") + == "http://127.0.0.1:1337/example" + ) + + with web.app.test_client() as c: + res = c.get("/", headers=self._make_auth_headers(web.password)) + res.get_data() + assert res.status_code == 200 + + res = c.post( + "/upload-ajax", + buffered=True, + content_type="multipart/form-data", + data={"file[]": (BytesIO(b"THIS IS A TEST FILE"), "new_york.jpg")}, + headers=self._make_auth_headers(web.password), + ) + res.get_data() + assert res.status_code == 200 + + assert webhook_url == "http://127.0.0.1:1337/example" + assert webhook_data == "1 file submitted to OnionShare" + + def test_receive_mode_message_no_files(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "receive") + + data_dir = os.path.join(temp_dir, "OnionShare") + os.makedirs(data_dir, exist_ok=True) + + web.settings.set("receive", "data_dir", data_dir) + + with web.app.test_client() as c: + res = c.post( + "/upload-ajax", + buffered=True, + content_type="multipart/form-data", + data={"text": "you know just sending an anonymous message"}, + headers=self._make_auth_headers(web.password), + ) + content = res.get_data() + assert res.status_code == 200 + assert b"Message submitted" in content + + # ~/OnionShare should have a folder for the date + filenames = os.listdir(data_dir) + assert len(filenames) == 1 + data_dir_date = os.path.join(data_dir, filenames[0]) + + # The date folder should have a single message txt file, no folders + filenames = os.listdir(data_dir_date) + assert len(filenames) == 1 + assert filenames[0].endswith("-message.txt") + + shutil.rmtree(data_dir) + + def test_receive_mode_message_and_files(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "receive") + + data_dir = os.path.join(temp_dir, "OnionShare") + os.makedirs(data_dir, exist_ok=True) + + web.settings.set("receive", "data_dir", data_dir) + + with web.app.test_client() as c: + res = c.post( + "/upload-ajax", + buffered=True, + content_type="multipart/form-data", + data={ + "file[]": (BytesIO(b"THIS IS A TEST FILE"), "new_york.jpg"), + "text": "you know just sending an anonymous message", + }, + headers=self._make_auth_headers(web.password), + ) + content = res.get_data() + assert res.status_code == 200 + assert b"Message submitted, uploaded new_york.jpg" in content + + # Date folder should have a time folder with new_york.jpg, and a text message file + data_dir_date = os.path.join(data_dir, os.listdir(data_dir)[0]) + filenames = os.listdir(data_dir_date) + assert len(filenames) == 2 + time_str = filenames[0][0:6] + assert time_str in filenames + assert f"{time_str}-message.txt" in filenames + data_dir_time = os.path.join(data_dir_date, time_str) + assert os.path.isdir(data_dir_time) + assert os.path.exists(os.path.join(data_dir_time, "new_york.jpg")) + + shutil.rmtree(data_dir) + + def test_receive_mode_files_no_message(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "receive") + + data_dir = os.path.join(temp_dir, "OnionShare") + os.makedirs(data_dir, exist_ok=True) + + web.settings.set("receive", "data_dir", data_dir) + + with web.app.test_client() as c: + res = c.post( + "/upload-ajax", + buffered=True, + content_type="multipart/form-data", + data={"file[]": (BytesIO(b"THIS IS A TEST FILE"), "new_york.jpg")}, + headers=self._make_auth_headers(web.password), + ) + content = res.get_data() + assert res.status_code == 200 + assert b"Uploaded new_york.jpg" in content + + # Date folder should have just a time folder with new_york.jpg + data_dir_date = os.path.join(data_dir, os.listdir(data_dir)[0]) + filenames = os.listdir(data_dir_date) + assert len(filenames) == 1 + time_str = filenames[0][0:6] + assert time_str in filenames + assert f"{time_str}-message.txt" not in filenames + data_dir_time = os.path.join(data_dir_date, time_str) + assert os.path.isdir(data_dir_time) + assert os.path.exists(os.path.join(data_dir_time, "new_york.jpg")) + + shutil.rmtree(data_dir) + + def test_receive_mode_no_message_no_files(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "receive") + + data_dir = os.path.join(temp_dir, "OnionShare") + os.makedirs(data_dir, exist_ok=True) + + web.settings.set("receive", "data_dir", data_dir) + + with web.app.test_client() as c: + res = c.post( + "/upload-ajax", + buffered=True, + content_type="multipart/form-data", + data={}, + headers=self._make_auth_headers(web.password), + ) + content = res.get_data() + assert res.status_code == 200 + assert b"Nothing submitted" in content + + # Date folder should be empty + data_dir_date = os.path.join(data_dir, os.listdir(data_dir)[0]) + filenames = os.listdir(data_dir_date) + assert len(filenames) == 0 + + shutil.rmtree(data_dir) + def test_public_mode_on(self, temp_dir, common_obj): web = web_obj(temp_dir, common_obj, "receive") web.settings.set("general", "public", True) @@ -141,7 +323,7 @@ class TestWeb: with web.app.test_client() as c: # Loading / should work without auth res = c.get("/") - data1 = res.get_data() + res.get_data() assert res.status_code == 200 def test_public_mode_off(self, temp_dir, common_obj): @@ -164,6 +346,16 @@ class TestWeb: res.get_data() assert res.status_code == 200 + def test_cleanup(self, common_obj, temp_dir_1024, temp_file_1024): + web = web_obj(temp_dir_1024, common_obj, "share", 3) + + web.cleanup_filenames = [temp_dir_1024, temp_file_1024] + web.cleanup() + + assert os.path.exists(temp_file_1024) is False + assert os.path.exists(temp_dir_1024) is False + assert web.cleanup_filenames == [] + def _make_auth_headers(self, password): auth = base64.b64encode(b"onionshare:" + password.encode()).decode() h = Headers() @@ -229,3 +421,266 @@ class TestZipWriterCustom: def test_custom_callback(self, custom_zw): assert custom_zw.processed_size_callback(None) == "custom_callback" + + +def check_unsupported(cmd: str, args: list): + cmd_args = [cmd] + cmd_args.extend(args) + skip = False + + try: + subprocess.check_call(cmd_args) + except Exception: + skip = True + + return pytest.mark.skipif(skip, reason="Command {!r} not supported".format(cmd)) + + +@contextmanager +def live_server(web): + s = socket.socket() + s.bind(("localhost", 0)) + port = s.getsockname()[1] + s.close() + + def run(): + web.app.run(host="127.0.0.1", port=port, debug=False) + + proc = Process(target=run) + proc.start() + + url = "http://127.0.0.1:{}".format(port) + auth = base64.b64encode(b"onionshare:" + web.password.encode()).decode() + req = Request(url, headers={"Authorization": "Basic {}".format(auth)}) + + attempts = 20 + while True: + try: + urlopen(req) + break + except Exception: + attempts -= 1 + if attempts > 0: + time.sleep(0.5) + else: + raise + + yield url + "/download" + + proc.terminate() + + +class TestRangeRequests: + + VALID_RANGES = [ + (None, 500, [(0, 499)]), + ("bytes=0", 500, [(0, 499)]), + ("bytes=100", 500, [(100, 499)]), + ("bytes=100-", 500, [(100, 499)]), # not in the RFC, but how curl sends + ("bytes=0-99", 500, [(0, 99)]), + ("bytes=0-599", 500, [(0, 499)]), + ("bytes=0-0", 500, [(0, 0)]), + ("bytes=-100", 500, [(400, 499)]), + ("bytes=0-99,100-199", 500, [(0, 199)]), + ("bytes=0-100,100-199", 500, [(0, 199)]), + ("bytes=0-99,101-199", 500, [(0, 99), (101, 199)]), + ("bytes=0-199,100-299", 500, [(0, 299)]), + ("bytes=0-99,200-299", 500, [(0, 99), (200, 299)]), + ] + + INVALID_RANGES = [ + "bytes=200-100", + "bytes=0-100,300-200", + ] + + def test_parse_ranges(self): + for case in self.VALID_RANGES: + (header, target_size, expected) = case + parsed = parse_range_header(header, target_size) + assert parsed == expected, case + + for invalid in self.INVALID_RANGES: + with pytest.raises(RequestedRangeNotSatisfiable): + parse_range_header(invalid, 500) + + def test_headers(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + url = "/download" + + with web.app.test_client() as client: + resp = client.get(url, headers=self._make_auth_headers(web.password)) + assert resp.headers["ETag"].startswith('"sha256:') + assert resp.headers["Accept-Ranges"] == "bytes" + assert resp.headers.get("Last-Modified") is not None + assert resp.headers.get("Content-Length") is not None + assert "Accept-Encoding" in resp.headers["Vary"] + + def test_basic(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + url = "/download" + with open(web.share_mode.download_filename, "rb") as f: + contents = f.read() + + with web.app.test_client() as client: + resp = client.get(url, headers=self._make_auth_headers(web.password)) + assert resp.status_code == 200 + assert resp.data == contents + + def test_reassemble(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + url = "/download" + with open(web.share_mode.download_filename, "rb") as f: + contents = f.read() + + with web.app.test_client() as client: + headers = self._make_auth_headers(web.password) + headers.extend({"Range": "bytes=0-10"}) + resp = client.get(url, headers=headers) + assert resp.status_code == 206 + content_range = resp.headers["Content-Range"] + assert content_range == "bytes {}-{}/{}".format( + 0, 10, web.share_mode.download_filesize + ) + bytes_out = resp.data + + headers.update({"Range": "bytes=11-100000"}) + resp = client.get(url, headers=headers) + assert resp.status_code == 206 + content_range = resp.headers["Content-Range"] + assert content_range == "bytes {}-{}/{}".format( + 11, + web.share_mode.download_filesize - 1, + web.share_mode.download_filesize, + ) + bytes_out += resp.data + + assert bytes_out == contents + + def test_mismatched_etags(self, temp_dir, common_obj): + """RFC 7233 Section 3.2 + The "If-Range" header field allows a client to "short-circuit" the second request. + Informally, its meaning is as follows: if the representation is unchanged, send me the + part(s) that I am requesting in Range; otherwise, send me the entire representation. + """ + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + url = "/download" + with open(web.share_mode.download_filename, "rb") as f: + contents = f.read() + + with web.app.test_client() as client: + headers = self._make_auth_headers(web.password) + resp = client.get(url, headers=headers) + assert resp.status_code == 200 + + headers.extend({"If-Range": "mismatched etag", "Range": "bytes=10-100"}) + resp = client.get(url, headers=headers) + assert resp.status_code == 200 + assert resp.data == contents + + def test_if_unmodified_since(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + url = "/download" + + with web.app.test_client() as client: + headers = self._make_auth_headers(web.password) + resp = client.get(url, headers=headers) + assert resp.status_code == 200 + last_mod = resp.headers["Last-Modified"] + + headers.extend({"If-Unmodified-Since": last_mod}) + resp = client.get(url, headers=headers) + assert resp.status_code == 304 + + def test_firefox_like_behavior(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + url = "/download" + + with web.app.test_client() as client: + headers = self._make_auth_headers(web.password) + resp = client.get(url, headers=headers) + assert resp.status_code == 200 + + # Firefox sends these with all range requests + etag = resp.headers["ETag"] + last_mod = resp.headers["Last-Modified"] + + # make a request that uses the full header set + headers.extend( + { + "Range": "bytes=0-10", + "If-Unmodified-Since": last_mod, + "If-Range": etag, + } + ) + resp = client.get(url, headers=headers) + assert resp.status_code == 206 + + def _make_auth_headers(self, password): + auth = base64.b64encode(b"onionshare:" + password.encode()).decode() + h = Headers() + h.add("Authorization", "Basic " + auth) + return h + + @pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux") + @check_unsupported("curl", ["--version"]) + def test_curl(self, temp_dir, tmpdir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + + download = tmpdir.join("download") + + with live_server(web) as url: + # Debugging help from `man curl`, on error 33 + # 33 HTTP range error. The range "command" didn't work. + auth_header = self._make_auth_headers(web.password) + subprocess.check_call( + [ + "curl", + "-H", + str(auth_header).strip(), + "--output", + str(download), + "--continue-at", + "10", + url, + ] + ) + + @pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux") + @check_unsupported("wget", ["--version"]) + def test_wget(self, temp_dir, tmpdir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + + # wget needs a file to exist to continue + download = tmpdir.join("download") + download.write("x" * 10) + + with live_server(web) as url: + auth_header = self._make_auth_headers(web.password) + subprocess.check_call( + [ + "wget", + "--header", + str(auth_header).strip(), + "--continue", + "-O", + str(download), + url, + ] + ) + + @pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux") + @check_unsupported("http", ["--version"]) + def test_httpie(self, temp_dir, common_obj): + web = web_obj(temp_dir, common_obj, "share", 3) + web.settings.set("share", "autostop_sharing", False) + + with live_server(web) as url: + subprocess.check_call(["http", url, "Range: bytes=10"]) |