summaryrefslogtreecommitdiff
path: root/tests/test_onionshare_web.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_onionshare_web.py')
-rw-r--r--tests/test_onionshare_web.py150
1 files changed, 85 insertions, 65 deletions
diff --git a/tests/test_onionshare_web.py b/tests/test_onionshare_web.py
index 0c29859b..b971b31a 100644
--- a/tests/test_onionshare_web.py
+++ b/tests/test_onionshare_web.py
@@ -27,16 +27,18 @@ import socket
import sys
import zipfile
import tempfile
+import base64
import pytest
+from werkzeug.datastructures import Headers
from onionshare.common import Common
from onionshare import strings
from onionshare.web import Web
from onionshare.settings import Settings
-DEFAULT_ZW_FILENAME_REGEX = re.compile(r'^onionshare_[a-z2-7]{6}.zip$')
-RANDOM_STR_REGEX = re.compile(r'^[a-z2-7]+$')
+DEFAULT_ZW_FILENAME_REGEX = re.compile(r"^onionshare_[a-z2-7]{6}.zip$")
+RANDOM_STR_REGEX = re.compile(r"^[a-z2-7]+$")
def web_obj(common_obj, mode, num_files=0):
@@ -44,19 +46,19 @@ def web_obj(common_obj, mode, num_files=0):
common_obj.settings = Settings(common_obj)
strings.load_strings(common_obj)
web = Web(common_obj, False, mode)
- web.generate_slug()
+ web.generate_password()
web.stay_open = True
web.running = True
web.app.testing = True
# Share mode
- if mode == 'share':
+ if mode == "share":
# Add files
files = []
for i in range(num_files):
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
- tmp_file.write(b'*' * 1024)
+ tmp_file.write(b"*" * 1024)
files.append(tmp_file.name)
web.share_mode.set_file_info(files)
# Receive mode
@@ -68,114 +70,130 @@ def web_obj(common_obj, mode, num_files=0):
class TestWeb:
def test_share_mode(self, common_obj):
- web = web_obj(common_obj, 'share', 3)
- assert web.mode is 'share'
+ web = web_obj(common_obj, "share", 3)
+ assert web.mode is "share"
with web.app.test_client() as c:
- # Load 404 pages
- res = c.get('/')
+ # Load / without auth
+ res = c.get("/")
res.get_data()
- assert res.status_code == 404
+ assert res.status_code == 401
- res = c.get('/invalidslug'.format(web.slug))
+ # Load / with invalid auth
+ res = c.get("/", headers=self._make_auth_headers("invalid"))
res.get_data()
- assert res.status_code == 404
+ assert res.status_code == 401
- # Load download page
- res = c.get('/{}'.format(web.slug))
+ # Load / with valid auth
+ res = c.get("/", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
# Download
- res = c.get('/{}/download'.format(web.slug))
+ res = c.get("/download", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
- assert res.mimetype == 'application/zip'
+ assert res.mimetype == "application/zip"
def test_share_mode_close_after_first_download_on(self, common_obj, temp_file_1024):
- web = web_obj(common_obj, 'share', 3)
+ web = web_obj(common_obj, "share", 3)
web.stay_open = False
assert web.running == True
with web.app.test_client() as c:
# Download the first time
- res = c.get('/{}/download'.format(web.slug))
+ res = c.get("/download", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
- assert res.mimetype == 'application/zip'
+ assert res.mimetype == "application/zip"
assert web.running == False
- def test_share_mode_close_after_first_download_off(self, common_obj, temp_file_1024):
- web = web_obj(common_obj, 'share', 3)
+ def test_share_mode_close_after_first_download_off(
+ self, common_obj, temp_file_1024
+ ):
+ web = web_obj(common_obj, "share", 3)
web.stay_open = True
assert web.running == True
with web.app.test_client() as c:
# Download the first time
- res = c.get('/{}/download'.format(web.slug))
+ res = c.get("/download", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
- assert res.mimetype == 'application/zip'
+ assert res.mimetype == "application/zip"
assert web.running == True
def test_receive_mode(self, common_obj):
- web = web_obj(common_obj, 'receive')
- assert web.mode is 'receive'
+ web = web_obj(common_obj, "receive")
+ assert web.mode is "receive"
with web.app.test_client() as c:
- # Load 404 pages
- res = c.get('/')
+ # Load / without auth
+ res = c.get("/")
res.get_data()
- assert res.status_code == 404
+ assert res.status_code == 401
- res = c.get('/invalidslug'.format(web.slug))
+ # Load / with invalid auth
+ res = c.get("/", headers=self._make_auth_headers("invalid"))
res.get_data()
- assert res.status_code == 404
+ assert res.status_code == 401
- # Load upload page
- res = c.get('/{}'.format(web.slug))
+ # Load / with valid auth
+ res = c.get("/", headers=self._make_auth_headers(web.password))
res.get_data()
assert res.status_code == 200
def test_public_mode_on(self, common_obj):
- web = web_obj(common_obj, 'receive')
- common_obj.settings.set('public_mode', True)
+ web = web_obj(common_obj, "receive")
+ common_obj.settings.set("public_mode", True)
with web.app.test_client() as c:
- # Upload page should be accessible from /
- res = c.get('/')
+ # Loading / should work without auth
+ res = c.get("/")
data1 = res.get_data()
assert res.status_code == 200
- # /[slug] should be a 404
- res = c.get('/{}'.format(web.slug))
- data2 = res.get_data()
- assert res.status_code == 404
-
def test_public_mode_off(self, common_obj):
- web = web_obj(common_obj, 'receive')
- common_obj.settings.set('public_mode', False)
+ web = web_obj(common_obj, "receive")
+ common_obj.settings.set("public_mode", False)
with web.app.test_client() as c:
- # / should be a 404
- res = c.get('/')
- data1 = res.get_data()
- assert res.status_code == 404
+ # Load / without auth
+ res = c.get("/")
+ res.get_data()
+ assert res.status_code == 401
+
+ # But static resources should work without auth
+ res = c.get("{}/css/style.css".format(web.static_url_path))
+ res.get_data()
+ assert res.status_code == 200
- # Upload page should be accessible from /[slug]
- res = c.get('/{}'.format(web.slug))
- data2 = res.get_data()
+ # Load / with valid auth
+ res = c.get("/", headers=self._make_auth_headers(web.password))
+ res.get_data()
assert res.status_code == 200
+ def _make_auth_headers(self, password):
+ auth = base64.b64encode(b"onionshare:" + password.encode()).decode()
+ h = Headers()
+ h.add("Authorization", "Basic " + auth)
+ return h
+
class TestZipWriterDefault:
- @pytest.mark.parametrize('test_input', (
- 'onionshare_{}.zip'.format(''.join(
- random.choice('abcdefghijklmnopqrstuvwxyz234567') for _ in range(6)
- )) for _ in range(50)
- ))
+ @pytest.mark.parametrize(
+ "test_input",
+ (
+ "onionshare_{}.zip".format(
+ "".join(
+ random.choice("abcdefghijklmnopqrstuvwxyz234567") for _ in range(6)
+ )
+ )
+ for _ in range(50)
+ ),
+ )
def test_default_zw_filename_regex(self, test_input):
assert bool(DEFAULT_ZW_FILENAME_REGEX.match(test_input))
@@ -190,15 +208,14 @@ class TestZipWriterDefault:
assert default_zw.z._allowZip64 is True
def test_zipfile_mode(self, default_zw):
- assert default_zw.z.mode == 'w'
+ assert default_zw.z.mode == "w"
def test_callback(self, default_zw):
assert default_zw.processed_size_callback(None) is None
def test_add_file(self, default_zw, temp_file_1024_delete):
default_zw.add_file(temp_file_1024_delete)
- zipfile_info = default_zw.z.getinfo(
- os.path.basename(temp_file_1024_delete))
+ zipfile_info = default_zw.z.getinfo(os.path.basename(temp_file_1024_delete))
assert zipfile_info.compress_type == zipfile.ZIP_DEFLATED
assert zipfile_info.file_size == 1024
@@ -210,12 +227,15 @@ class TestZipWriterDefault:
class TestZipWriterCustom:
- @pytest.mark.parametrize('test_input', (
- Common.random_string(
- random.randint(2, 50),
- random.choice((None, random.randint(2, 50)))
- ) for _ in range(50)
- ))
+ @pytest.mark.parametrize(
+ "test_input",
+ (
+ Common.random_string(
+ random.randint(2, 50), random.choice((None, random.randint(2, 50)))
+ )
+ for _ in range(50)
+ ),
+ )
def test_random_string_regex(self, test_input):
assert bool(RANDOM_STR_REGEX.match(test_input))
@@ -223,4 +243,4 @@ class TestZipWriterCustom:
assert bool(RANDOM_STR_REGEX.match(custom_zw.zip_filename))
def test_custom_callback(self, custom_zw):
- assert custom_zw.processed_size_callback(None) == 'custom_callback'
+ assert custom_zw.processed_size_callback(None) == "custom_callback"