summaryrefslogtreecommitdiff
path: root/cli/tests/test_cli_web.py
diff options
context:
space:
mode:
authorMicah Lee <micah@micahflee.com>2021-08-20 14:43:21 -0700
committerGitHub <noreply@github.com>2021-08-20 14:43:21 -0700
commitb66e742bc20ae977232fc53f22d485c10173ac2f (patch)
treec6eb66c340c34b45bac5525350c5eb5c43fe33a6 /cli/tests/test_cli_web.py
parent76104671c3eacbef53ad96fffd8a57512ab2d093 (diff)
parent02254b13bb4818745193092f2144fd83726d79e7 (diff)
downloadonionshare-b66e742bc20ae977232fc53f22d485c10173ac2f.tar.gz
onionshare-b66e742bc20ae977232fc53f22d485c10173ac2f.zip
Merge pull request #1395 from onionshare/developv2.3.3
Version 2.3.3, merge develop into stable
Diffstat (limited to 'cli/tests/test_cli_web.py')
-rw-r--r--cli/tests/test_cli_web.py475
1 files changed, 465 insertions, 10 deletions
diff --git a/cli/tests/test_cli_web.py b/cli/tests/test_cli_web.py
index 063bd43c..f8c96f9c 100644
--- a/cli/tests/test_cli_web.py
+++ b/cli/tests/test_cli_web.py
@@ -1,35 +1,57 @@
-import contextlib
-import inspect
-import io
import os
import random
import re
import socket
-import sys
+import subprocess
+import time
import zipfile
import tempfile
import base64
+import shutil
+import sys
+from io import BytesIO
import pytest
+from contextlib import contextmanager
+from multiprocessing import Process
+from urllib.request import urlopen, Request
from werkzeug.datastructures import Headers
+from werkzeug.exceptions import RequestedRangeNotSatisfiable
from onionshare_cli.common import Common
from onionshare_cli.web import Web
+from onionshare_cli.web.share_mode import parse_range_header
from onionshare_cli.settings import Settings
from onionshare_cli.mode_settings import ModeSettings
+import onionshare_cli.web.receive_mode
+
+# Stub requests.post, for receive mode webhook tests
+webhook_url = None
+webhook_data = None
+
+
+def requests_post_stub(url, data, timeout, proxies):
+ global webhook_url, webhook_data
+ webhook_url = url
+ webhook_data = data
+
+
+onionshare_cli.web.receive_mode.requests.post = requests_post_stub
+
DEFAULT_ZW_FILENAME_REGEX = re.compile(r"^onionshare_[a-z2-7]{6}.zip$")
RANDOM_STR_REGEX = re.compile(r"^[a-z2-7]+$")
def web_obj(temp_dir, common_obj, mode, num_files=0):
- """ Creates a Web object, in either share mode or receive mode, ready for testing """
+ """Creates a Web object, in either share mode or receive mode, ready for testing"""
common_obj.settings = Settings(common_obj)
mode_settings = ModeSettings(common_obj)
web = Web(common_obj, False, mode_settings, mode)
web.generate_password()
web.running = True
+ web.cleanup_filenames == []
web.app.testing = True
# Share mode
@@ -81,7 +103,7 @@ class TestWeb:
web = web_obj(temp_dir, common_obj, "share", 3)
web.settings.set("share", "autostop_sharing", True)
- assert web.running == True
+ assert web.running is True
with web.app.test_client() as c:
# Download the first time
@@ -93,7 +115,7 @@ class TestWeb:
or res.mimetype == "application/x-zip-compressed"
)
- assert web.running == False
+ assert web.running is False
def test_share_mode_autostop_sharing_off(
self, temp_dir, common_obj, temp_file_1024
@@ -101,7 +123,7 @@ class TestWeb:
web = web_obj(temp_dir, common_obj, "share", 3)
web.settings.set("share", "autostop_sharing", False)
- assert web.running == True
+ assert web.running is True
with web.app.test_client() as c:
# Download the first time
@@ -112,7 +134,7 @@ class TestWeb:
res.mimetype == "application/zip"
or res.mimetype == "application/x-zip-compressed"
)
- assert web.running == True
+ assert web.running is True
def test_receive_mode(self, temp_dir, common_obj):
web = web_obj(temp_dir, common_obj, "receive")
@@ -134,6 +156,166 @@ class TestWeb:
res.get_data()
assert res.status_code == 200
+ def test_receive_mode_webhook(self, temp_dir, common_obj):
+ global webhook_url, webhook_data
+ webhook_url = None
+ webhook_data = None
+
+ web = web_obj(temp_dir, common_obj, "receive")
+ assert web.mode == "receive"
+ web.settings.set("receive", "webhook_url", "http://127.0.0.1:1337/example")
+ web.proxies = None
+ assert (
+ web.settings.get("receive", "webhook_url")
+ == "http://127.0.0.1:1337/example"
+ )
+
+ with web.app.test_client() as c:
+ res = c.get("/", headers=self._make_auth_headers(web.password))
+ res.get_data()
+ assert res.status_code == 200
+
+ res = c.post(
+ "/upload-ajax",
+ buffered=True,
+ content_type="multipart/form-data",
+ data={"file[]": (BytesIO(b"THIS IS A TEST FILE"), "new_york.jpg")},
+ headers=self._make_auth_headers(web.password),
+ )
+ res.get_data()
+ assert res.status_code == 200
+
+ assert webhook_url == "http://127.0.0.1:1337/example"
+ assert webhook_data == "1 file submitted to OnionShare"
+
+ def test_receive_mode_message_no_files(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "receive")
+
+ data_dir = os.path.join(temp_dir, "OnionShare")
+ os.makedirs(data_dir, exist_ok=True)
+
+ web.settings.set("receive", "data_dir", data_dir)
+
+ with web.app.test_client() as c:
+ res = c.post(
+ "/upload-ajax",
+ buffered=True,
+ content_type="multipart/form-data",
+ data={"text": "you know just sending an anonymous message"},
+ headers=self._make_auth_headers(web.password),
+ )
+ content = res.get_data()
+ assert res.status_code == 200
+ assert b"Message submitted" in content
+
+ # ~/OnionShare should have a folder for the date
+ filenames = os.listdir(data_dir)
+ assert len(filenames) == 1
+ data_dir_date = os.path.join(data_dir, filenames[0])
+
+ # The date folder should have a single message txt file, no folders
+ filenames = os.listdir(data_dir_date)
+ assert len(filenames) == 1
+ assert filenames[0].endswith("-message.txt")
+
+ shutil.rmtree(data_dir)
+
+ def test_receive_mode_message_and_files(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "receive")
+
+ data_dir = os.path.join(temp_dir, "OnionShare")
+ os.makedirs(data_dir, exist_ok=True)
+
+ web.settings.set("receive", "data_dir", data_dir)
+
+ with web.app.test_client() as c:
+ res = c.post(
+ "/upload-ajax",
+ buffered=True,
+ content_type="multipart/form-data",
+ data={
+ "file[]": (BytesIO(b"THIS IS A TEST FILE"), "new_york.jpg"),
+ "text": "you know just sending an anonymous message",
+ },
+ headers=self._make_auth_headers(web.password),
+ )
+ content = res.get_data()
+ assert res.status_code == 200
+ assert b"Message submitted, uploaded new_york.jpg" in content
+
+ # Date folder should have a time folder with new_york.jpg, and a text message file
+ data_dir_date = os.path.join(data_dir, os.listdir(data_dir)[0])
+ filenames = os.listdir(data_dir_date)
+ assert len(filenames) == 2
+ time_str = filenames[0][0:6]
+ assert time_str in filenames
+ assert f"{time_str}-message.txt" in filenames
+ data_dir_time = os.path.join(data_dir_date, time_str)
+ assert os.path.isdir(data_dir_time)
+ assert os.path.exists(os.path.join(data_dir_time, "new_york.jpg"))
+
+ shutil.rmtree(data_dir)
+
+ def test_receive_mode_files_no_message(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "receive")
+
+ data_dir = os.path.join(temp_dir, "OnionShare")
+ os.makedirs(data_dir, exist_ok=True)
+
+ web.settings.set("receive", "data_dir", data_dir)
+
+ with web.app.test_client() as c:
+ res = c.post(
+ "/upload-ajax",
+ buffered=True,
+ content_type="multipart/form-data",
+ data={"file[]": (BytesIO(b"THIS IS A TEST FILE"), "new_york.jpg")},
+ headers=self._make_auth_headers(web.password),
+ )
+ content = res.get_data()
+ assert res.status_code == 200
+ assert b"Uploaded new_york.jpg" in content
+
+ # Date folder should have just a time folder with new_york.jpg
+ data_dir_date = os.path.join(data_dir, os.listdir(data_dir)[0])
+ filenames = os.listdir(data_dir_date)
+ assert len(filenames) == 1
+ time_str = filenames[0][0:6]
+ assert time_str in filenames
+ assert f"{time_str}-message.txt" not in filenames
+ data_dir_time = os.path.join(data_dir_date, time_str)
+ assert os.path.isdir(data_dir_time)
+ assert os.path.exists(os.path.join(data_dir_time, "new_york.jpg"))
+
+ shutil.rmtree(data_dir)
+
+ def test_receive_mode_no_message_no_files(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "receive")
+
+ data_dir = os.path.join(temp_dir, "OnionShare")
+ os.makedirs(data_dir, exist_ok=True)
+
+ web.settings.set("receive", "data_dir", data_dir)
+
+ with web.app.test_client() as c:
+ res = c.post(
+ "/upload-ajax",
+ buffered=True,
+ content_type="multipart/form-data",
+ data={},
+ headers=self._make_auth_headers(web.password),
+ )
+ content = res.get_data()
+ assert res.status_code == 200
+ assert b"Nothing submitted" in content
+
+ # Date folder should be empty
+ data_dir_date = os.path.join(data_dir, os.listdir(data_dir)[0])
+ filenames = os.listdir(data_dir_date)
+ assert len(filenames) == 0
+
+ shutil.rmtree(data_dir)
+
def test_public_mode_on(self, temp_dir, common_obj):
web = web_obj(temp_dir, common_obj, "receive")
web.settings.set("general", "public", True)
@@ -141,7 +323,7 @@ class TestWeb:
with web.app.test_client() as c:
# Loading / should work without auth
res = c.get("/")
- data1 = res.get_data()
+ res.get_data()
assert res.status_code == 200
def test_public_mode_off(self, temp_dir, common_obj):
@@ -164,6 +346,16 @@ class TestWeb:
res.get_data()
assert res.status_code == 200
+ def test_cleanup(self, common_obj, temp_dir_1024, temp_file_1024):
+ web = web_obj(temp_dir_1024, common_obj, "share", 3)
+
+ web.cleanup_filenames = [temp_dir_1024, temp_file_1024]
+ web.cleanup()
+
+ assert os.path.exists(temp_file_1024) is False
+ assert os.path.exists(temp_dir_1024) is False
+ assert web.cleanup_filenames == []
+
def _make_auth_headers(self, password):
auth = base64.b64encode(b"onionshare:" + password.encode()).decode()
h = Headers()
@@ -229,3 +421,266 @@ class TestZipWriterCustom:
def test_custom_callback(self, custom_zw):
assert custom_zw.processed_size_callback(None) == "custom_callback"
+
+
+def check_unsupported(cmd: str, args: list):
+ cmd_args = [cmd]
+ cmd_args.extend(args)
+ skip = False
+
+ try:
+ subprocess.check_call(cmd_args)
+ except Exception:
+ skip = True
+
+ return pytest.mark.skipif(skip, reason="Command {!r} not supported".format(cmd))
+
+
+@contextmanager
+def live_server(web):
+ s = socket.socket()
+ s.bind(("localhost", 0))
+ port = s.getsockname()[1]
+ s.close()
+
+ def run():
+ web.app.run(host="127.0.0.1", port=port, debug=False)
+
+ proc = Process(target=run)
+ proc.start()
+
+ url = "http://127.0.0.1:{}".format(port)
+ auth = base64.b64encode(b"onionshare:" + web.password.encode()).decode()
+ req = Request(url, headers={"Authorization": "Basic {}".format(auth)})
+
+ attempts = 20
+ while True:
+ try:
+ urlopen(req)
+ break
+ except Exception:
+ attempts -= 1
+ if attempts > 0:
+ time.sleep(0.5)
+ else:
+ raise
+
+ yield url + "/download"
+
+ proc.terminate()
+
+
+class TestRangeRequests:
+
+ VALID_RANGES = [
+ (None, 500, [(0, 499)]),
+ ("bytes=0", 500, [(0, 499)]),
+ ("bytes=100", 500, [(100, 499)]),
+ ("bytes=100-", 500, [(100, 499)]), # not in the RFC, but how curl sends
+ ("bytes=0-99", 500, [(0, 99)]),
+ ("bytes=0-599", 500, [(0, 499)]),
+ ("bytes=0-0", 500, [(0, 0)]),
+ ("bytes=-100", 500, [(400, 499)]),
+ ("bytes=0-99,100-199", 500, [(0, 199)]),
+ ("bytes=0-100,100-199", 500, [(0, 199)]),
+ ("bytes=0-99,101-199", 500, [(0, 99), (101, 199)]),
+ ("bytes=0-199,100-299", 500, [(0, 299)]),
+ ("bytes=0-99,200-299", 500, [(0, 99), (200, 299)]),
+ ]
+
+ INVALID_RANGES = [
+ "bytes=200-100",
+ "bytes=0-100,300-200",
+ ]
+
+ def test_parse_ranges(self):
+ for case in self.VALID_RANGES:
+ (header, target_size, expected) = case
+ parsed = parse_range_header(header, target_size)
+ assert parsed == expected, case
+
+ for invalid in self.INVALID_RANGES:
+ with pytest.raises(RequestedRangeNotSatisfiable):
+ parse_range_header(invalid, 500)
+
+ def test_headers(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+
+ with web.app.test_client() as client:
+ resp = client.get(url, headers=self._make_auth_headers(web.password))
+ assert resp.headers["ETag"].startswith('"sha256:')
+ assert resp.headers["Accept-Ranges"] == "bytes"
+ assert resp.headers.get("Last-Modified") is not None
+ assert resp.headers.get("Content-Length") is not None
+ assert "Accept-Encoding" in resp.headers["Vary"]
+
+ def test_basic(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+ with open(web.share_mode.download_filename, "rb") as f:
+ contents = f.read()
+
+ with web.app.test_client() as client:
+ resp = client.get(url, headers=self._make_auth_headers(web.password))
+ assert resp.status_code == 200
+ assert resp.data == contents
+
+ def test_reassemble(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+ with open(web.share_mode.download_filename, "rb") as f:
+ contents = f.read()
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ headers.extend({"Range": "bytes=0-10"})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 206
+ content_range = resp.headers["Content-Range"]
+ assert content_range == "bytes {}-{}/{}".format(
+ 0, 10, web.share_mode.download_filesize
+ )
+ bytes_out = resp.data
+
+ headers.update({"Range": "bytes=11-100000"})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 206
+ content_range = resp.headers["Content-Range"]
+ assert content_range == "bytes {}-{}/{}".format(
+ 11,
+ web.share_mode.download_filesize - 1,
+ web.share_mode.download_filesize,
+ )
+ bytes_out += resp.data
+
+ assert bytes_out == contents
+
+ def test_mismatched_etags(self, temp_dir, common_obj):
+ """RFC 7233 Section 3.2
+ The "If-Range" header field allows a client to "short-circuit" the second request.
+ Informally, its meaning is as follows: if the representation is unchanged, send me the
+ part(s) that I am requesting in Range; otherwise, send me the entire representation.
+ """
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+ with open(web.share_mode.download_filename, "rb") as f:
+ contents = f.read()
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+
+ headers.extend({"If-Range": "mismatched etag", "Range": "bytes=10-100"})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+ assert resp.data == contents
+
+ def test_if_unmodified_since(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+ last_mod = resp.headers["Last-Modified"]
+
+ headers.extend({"If-Unmodified-Since": last_mod})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 304
+
+ def test_firefox_like_behavior(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+
+ # Firefox sends these with all range requests
+ etag = resp.headers["ETag"]
+ last_mod = resp.headers["Last-Modified"]
+
+ # make a request that uses the full header set
+ headers.extend(
+ {
+ "Range": "bytes=0-10",
+ "If-Unmodified-Since": last_mod,
+ "If-Range": etag,
+ }
+ )
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 206
+
+ def _make_auth_headers(self, password):
+ auth = base64.b64encode(b"onionshare:" + password.encode()).decode()
+ h = Headers()
+ h.add("Authorization", "Basic " + auth)
+ return h
+
+ @pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux")
+ @check_unsupported("curl", ["--version"])
+ def test_curl(self, temp_dir, tmpdir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+
+ download = tmpdir.join("download")
+
+ with live_server(web) as url:
+ # Debugging help from `man curl`, on error 33
+ # 33 HTTP range error. The range "command" didn't work.
+ auth_header = self._make_auth_headers(web.password)
+ subprocess.check_call(
+ [
+ "curl",
+ "-H",
+ str(auth_header).strip(),
+ "--output",
+ str(download),
+ "--continue-at",
+ "10",
+ url,
+ ]
+ )
+
+ @pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux")
+ @check_unsupported("wget", ["--version"])
+ def test_wget(self, temp_dir, tmpdir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+
+ # wget needs a file to exist to continue
+ download = tmpdir.join("download")
+ download.write("x" * 10)
+
+ with live_server(web) as url:
+ auth_header = self._make_auth_headers(web.password)
+ subprocess.check_call(
+ [
+ "wget",
+ "--header",
+ str(auth_header).strip(),
+ "--continue",
+ "-O",
+ str(download),
+ url,
+ ]
+ )
+
+ @pytest.mark.skipif(sys.platform != "Linux", reason="requires Linux")
+ @check_unsupported("http", ["--version"])
+ def test_httpie(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+
+ with live_server(web) as url:
+ subprocess.check_call(["http", url, "Range: bytes=10"])