summaryrefslogtreecommitdiff
path: root/cli/tests/test_cli_web.py
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tests/test_cli_web.py')
-rw-r--r--cli/tests/test_cli_web.py268
1 files changed, 268 insertions, 0 deletions
diff --git a/cli/tests/test_cli_web.py b/cli/tests/test_cli_web.py
index 5856ba9a..edd838c4 100644
--- a/cli/tests/test_cli_web.py
+++ b/cli/tests/test_cli_web.py
@@ -1,16 +1,24 @@
import os
import random
import re
+import socket
+import subprocess
+import time
import zipfile
import tempfile
import base64
from io import BytesIO
import pytest
+from contextlib import contextmanager
+from multiprocessing import Process
+from urllib.request import urlopen, Request
from werkzeug.datastructures import Headers
+from werkzeug.exceptions import RequestedRangeNotSatisfiable
from onionshare_cli.common import Common
from onionshare_cli.web import Web
+from onionshare_cli.web.share_mode import parse_range_header
from onionshare_cli.settings import Settings
from onionshare_cli.mode_settings import ModeSettings
import onionshare_cli.web.receive_mode
@@ -272,3 +280,263 @@ class TestZipWriterCustom:
def test_custom_callback(self, custom_zw):
assert custom_zw.processed_size_callback(None) == "custom_callback"
+
+
+def check_unsupported(cmd: str, args: list):
+ cmd_args = [cmd]
+ cmd_args.extend(args)
+ skip = False
+
+ try:
+ subprocess.check_call(cmd_args)
+ except Exception:
+ skip = True
+
+ return pytest.mark.skipif(skip, reason="Command {!r} not supported".format(cmd))
+
+
+@contextmanager
+def live_server(web):
+ s = socket.socket()
+ s.bind(("localhost", 0))
+ port = s.getsockname()[1]
+ s.close()
+
+ def run():
+ web.app.run(host="127.0.0.1", port=port, debug=False)
+
+ proc = Process(target=run)
+ proc.start()
+
+ url = "http://127.0.0.1:{}".format(port)
+ auth = base64.b64encode(b"onionshare:" + web.password.encode()).decode()
+ req = Request(url, headers={"Authorization": "Basic {}".format(auth)})
+
+ attempts = 20
+ while True:
+ try:
+ urlopen(req)
+ break
+ except Exception:
+ attempts -= 1
+ if attempts > 0:
+ time.sleep(0.5)
+ else:
+ raise
+
+ yield url + "/download"
+
+ proc.terminate()
+
+
+class TestRangeRequests:
+
+ VALID_RANGES = [
+ (None, 500, [(0, 499)]),
+ ("bytes=0", 500, [(0, 499)]),
+ ("bytes=100", 500, [(100, 499)]),
+ ("bytes=100-", 500, [(100, 499)]), # not in the RFC, but how curl sends
+ ("bytes=0-99", 500, [(0, 99)]),
+ ("bytes=0-599", 500, [(0, 499)]),
+ ("bytes=0-0", 500, [(0, 0)]),
+ ("bytes=-100", 500, [(400, 499)]),
+ ("bytes=0-99,100-199", 500, [(0, 199)]),
+ ("bytes=0-100,100-199", 500, [(0, 199)]),
+ ("bytes=0-99,101-199", 500, [(0, 99), (101, 199)]),
+ ("bytes=0-199,100-299", 500, [(0, 299)]),
+ ("bytes=0-99,200-299", 500, [(0, 99), (200, 299)]),
+ ]
+
+ INVALID_RANGES = [
+ "bytes=200-100",
+ "bytes=0-100,300-200",
+ ]
+
+ def test_parse_ranges(self):
+ for case in self.VALID_RANGES:
+ (header, target_size, expected) = case
+ parsed = parse_range_header(header, target_size)
+ assert parsed == expected, case
+
+ for invalid in self.INVALID_RANGES:
+ with pytest.raises(RequestedRangeNotSatisfiable):
+ parse_range_header(invalid, 500)
+
+ def test_headers(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+
+ with web.app.test_client() as client:
+ resp = client.get(url, headers=self._make_auth_headers(web.password))
+ assert resp.headers["ETag"].startswith('"sha256:')
+ assert resp.headers["Accept-Ranges"] == "bytes"
+ assert resp.headers.get("Last-Modified") is not None
+ assert resp.headers.get("Content-Length") is not None
+ assert "Accept-Encoding" in resp.headers["Vary"]
+
+ def test_basic(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+ with open(web.share_mode.download_filename, "rb") as f:
+ contents = f.read()
+
+ with web.app.test_client() as client:
+ resp = client.get(url, headers=self._make_auth_headers(web.password))
+ assert resp.status_code == 200
+ assert resp.data == contents
+
+ def test_reassemble(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+ with open(web.share_mode.download_filename, "rb") as f:
+ contents = f.read()
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ headers.extend({"Range": "bytes=0-10"})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 206
+ content_range = resp.headers["Content-Range"]
+ assert content_range == "bytes {}-{}/{}".format(
+ 0, 10, web.share_mode.download_filesize
+ )
+ bytes_out = resp.data
+
+ headers.update({"Range": "bytes=11-100000"})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 206
+ content_range = resp.headers["Content-Range"]
+ assert content_range == "bytes {}-{}/{}".format(
+ 11,
+ web.share_mode.download_filesize - 1,
+ web.share_mode.download_filesize,
+ )
+ bytes_out += resp.data
+
+ assert bytes_out == contents
+
+ def test_mismatched_etags(self, temp_dir, common_obj):
+ """RFC 7233 Section 3.2
+ The "If-Range" header field allows a client to "short-circuit" the second request.
+ Informally, its meaning is as follows: if the representation is unchanged, send me the
+ part(s) that I am requesting in Range; otherwise, send me the entire representation.
+ """
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+ with open(web.share_mode.download_filename, "rb") as f:
+ contents = f.read()
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+
+ headers.extend({"If-Range": "mismatched etag", "Range": "bytes=10-100"})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+ assert resp.data == contents
+
+ def test_if_unmodified_since(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+ last_mod = resp.headers["Last-Modified"]
+
+ headers.extend({"If-Unmodified-Since": last_mod})
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 304
+
+ def test_firefox_like_behavior(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+ url = "/download"
+
+ with web.app.test_client() as client:
+ headers = self._make_auth_headers(web.password)
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 200
+
+ # Firefox sends these with all range requests
+ etag = resp.headers["ETag"]
+ last_mod = resp.headers["Last-Modified"]
+
+ # make a request that uses the full header set
+ headers.extend(
+ {
+ "Range": "bytes=0-10",
+ "If-Unmodified-Since": last_mod,
+ "If-Range": etag,
+ }
+ )
+ resp = client.get(url, headers=headers)
+ assert resp.status_code == 206
+
+ def _make_auth_headers(self, password):
+ auth = base64.b64encode(b"onionshare:" + password.encode()).decode()
+ h = Headers()
+ h.add("Authorization", "Basic " + auth)
+ return h
+
+ @check_unsupported("curl", ["--version"])
+ def test_curl(self, temp_dir, tmpdir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+
+ download = tmpdir.join("download")
+
+ with live_server(web) as url:
+ # Debugging help from `man curl`, on error 33
+ # 33 HTTP range error. The range "command" didn't work.
+ auth_header = self._make_auth_headers(web.password)
+ subprocess.check_call(
+ [
+ "curl",
+ "-H",
+ str(auth_header).strip(),
+ "--output",
+ str(download),
+ "--continue-at",
+ "10",
+ url,
+ ]
+ )
+
+ @check_unsupported("wget", ["--version"])
+ def test_wget(self, temp_dir, tmpdir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+
+ # wget needs a file to exist to continue
+ download = tmpdir.join("download")
+ download.write("x" * 10)
+
+ with live_server(web) as url:
+ auth_header = self._make_auth_headers(web.password)
+ subprocess.check_call(
+ [
+ "wget",
+ "--header",
+ str(auth_header).strip(),
+ "--continue",
+ "-O",
+ str(download),
+ url,
+ ]
+ )
+
+ @check_unsupported("http", ["--version"])
+ def test_httpie(self, temp_dir, common_obj):
+ web = web_obj(temp_dir, common_obj, "share", 3)
+ web.settings.set("share", "autostop_sharing", False)
+
+ with live_server(web) as url:
+ subprocess.check_call(["http", url, "Range: bytes=10"])