summaryrefslogtreecommitdiff
path: root/desktop/onionshare/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'desktop/onionshare/common.py')
-rw-r--r--desktop/onionshare/common.py336
1 files changed, 0 insertions, 336 deletions
diff --git a/desktop/onionshare/common.py b/desktop/onionshare/common.py
deleted file mode 100644
index 27104669..00000000
--- a/desktop/onionshare/common.py
+++ /dev/null
@@ -1,336 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-OnionShare | https://onionshare.org/
-
-Copyright (C) 2014-2020 Micah Lee, et al. <micah@micahflee.com>
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-import base64
-import hashlib
-import inspect
-import os
-import platform
-import random
-import socket
-import sys
-import tempfile
-import threading
-import time
-import shutil
-
-from .settings import Settings
-
-
-class Common:
- """
- The Common object is shared amongst all parts of OnionShare.
- """
-
- def __init__(self, verbose=False):
- self.verbose = verbose
-
- # The platform OnionShare is running on
- self.platform = platform.system()
- if self.platform.endswith("BSD") or self.platform == "DragonFly":
- self.platform = "BSD"
-
- # The current version of OnionShare
- with open(self.get_resource_path("version.txt")) as f:
- self.version = f.read().strip()
-
- def load_settings(self, config=None):
- """
- Loading settings, optionally from a custom config json file.
- """
- self.settings = Settings(self, config)
- self.settings.load()
-
- def log(self, module, func, msg=None):
- """
- If verbose mode is on, log error messages to stdout
- """
- if self.verbose:
- timestamp = time.strftime("%b %d %Y %X")
-
- final_msg = f"[{timestamp}] {module}.{func}"
- if msg:
- final_msg = f"{final_msg}: {msg}"
- print(final_msg)
-
- def get_resource_path(self, filename):
- """
- Returns the absolute path of a resource, regardless of whether OnionShare is installed
- systemwide, and whether regardless of platform
- """
- # On Windows, and in Windows dev mode, switch slashes in incoming filename to backslackes
- if self.platform == "Windows":
- filename = filename.replace("/", "\\")
-
- if getattr(sys, "onionshare_dev_mode", False):
- # Look for resources directory relative to python file
- prefix = os.path.join(
- os.path.dirname(
- os.path.dirname(
- os.path.abspath(inspect.getfile(inspect.currentframe()))
- )
- ),
- "share",
- )
- if not os.path.exists(prefix):
- # While running tests during stdeb bdist_deb, look 3 directories up for the share folder
- prefix = os.path.join(
- os.path.dirname(
- os.path.dirname(os.path.dirname(os.path.dirname(prefix)))
- ),
- "share",
- )
-
- elif self.platform == "BSD" or self.platform == "Linux":
- # Look for resources relative to the binary, so if the binary is /usr/bin/onionshare-gui and
- # the resource dir is /usr/share/onionshare, then the resource dir relative to the binary dir
- # is ../share/onionshare
- prefix = os.path.join(
- os.path.dirname(os.path.dirname(sys.argv[0])), "share/onionshare"
- )
-
- elif getattr(sys, "frozen", False):
- # Check if app is "frozen"
- # https://pythonhosted.org/PyInstaller/#run-time-information
- if self.platform == "Darwin":
- prefix = os.path.join(sys._MEIPASS, "share")
- elif self.platform == "Windows":
- prefix = os.path.join(os.path.dirname(sys.executable), "share")
-
- return os.path.join(prefix, filename)
-
- def get_tor_paths(self):
- if self.platform == "Linux":
- tor_path = shutil.which("tor")
- obfs4proxy_file_path = shutil.which("obfs4proxy")
- prefix = os.path.dirname(os.path.dirname(tor_path))
- tor_geo_ip_file_path = os.path.join(prefix, "share/tor/geoip")
- tor_geo_ipv6_file_path = os.path.join(prefix, "share/tor/geoip6")
- elif self.platform == "Windows":
- base_path = os.path.join(
- os.path.dirname(os.path.dirname(self.get_resource_path(""))), "tor"
- )
- tor_path = os.path.join(os.path.join(base_path, "Tor"), "tor.exe")
- obfs4proxy_file_path = os.path.join(
- os.path.join(base_path, "Tor"), "obfs4proxy.exe"
- )
- tor_geo_ip_file_path = os.path.join(
- os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip"
- )
- tor_geo_ipv6_file_path = os.path.join(
- os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip6"
- )
- elif self.platform == "Darwin":
- base_path = os.path.dirname(
- os.path.dirname(os.path.dirname(self.get_resource_path("")))
- )
- tor_path = os.path.join(base_path, "Resources", "Tor", "tor")
- tor_geo_ip_file_path = os.path.join(base_path, "Resources", "Tor", "geoip")
- tor_geo_ipv6_file_path = os.path.join(
- base_path, "Resources", "Tor", "geoip6"
- )
- obfs4proxy_file_path = os.path.join(
- base_path, "Resources", "Tor", "obfs4proxy"
- )
- elif self.platform == "BSD":
- tor_path = "/usr/local/bin/tor"
- tor_geo_ip_file_path = "/usr/local/share/tor/geoip"
- tor_geo_ipv6_file_path = "/usr/local/share/tor/geoip6"
- obfs4proxy_file_path = "/usr/local/bin/obfs4proxy"
-
- return (
- tor_path,
- tor_geo_ip_file_path,
- tor_geo_ipv6_file_path,
- obfs4proxy_file_path,
- )
-
- def build_data_dir(self):
- """
- Returns the path of the OnionShare data directory.
- """
- if self.platform == "Windows":
- try:
- appdata = os.environ["APPDATA"]
- onionshare_data_dir = f"{appdata}\\OnionShare"
- except:
- # If for some reason we don't have the 'APPDATA' environment variable
- # (like running tests in Linux while pretending to be in Windows)
- onionshare_data_dir = os.path.expanduser("~/.config/onionshare")
- elif self.platform == "Darwin":
- onionshare_data_dir = os.path.expanduser(
- "~/Library/Application Support/OnionShare"
- )
- else:
- onionshare_data_dir = os.path.expanduser("~/.config/onionshare")
-
- # Modify the data dir if running tests
- if getattr(sys, "onionshare_test_mode", False):
- onionshare_data_dir += "-testdata"
-
- os.makedirs(onionshare_data_dir, 0o700, True)
- return onionshare_data_dir
-
- def build_tmp_dir(self):
- """
- Returns path to a folder that can hold temporary files
- """
- tmp_dir = os.path.join(self.build_data_dir(), "tmp")
- os.makedirs(tmp_dir, 0o700, True)
- return tmp_dir
-
- def build_persistent_dir(self):
- """
- Returns the path to the folder that holds persistent files
- """
- persistent_dir = os.path.join(self.build_data_dir(), "persistent")
- os.makedirs(persistent_dir, 0o700, True)
- return persistent_dir
-
- def build_tor_dir(self):
- """
- Returns path to the tor data directory
- """
- tor_dir = os.path.join(self.build_data_dir(), "tor_data")
- os.makedirs(tor_dir, 0o700, True)
- return tor_dir
-
- def build_password(self, word_count=2):
- """
- Returns a random string made of words from the wordlist, such as "deter-trig".
- """
- with open(self.get_resource_path("wordlist.txt")) as f:
- wordlist = f.read().split()
-
- r = random.SystemRandom()
- return "-".join(r.choice(wordlist) for _ in range(word_count))
-
- def build_username(self, word_count=2):
- """
- Returns a random string made of words from the wordlist, such as "deter-trig".
- """
- with open(self.get_resource_path("wordlist.txt")) as f:
- wordlist = f.read().split()
-
- r = random.SystemRandom()
- return "-".join(r.choice(wordlist) for _ in range(word_count))
-
- @staticmethod
- def random_string(num_bytes, output_len=None):
- """
- Returns a random string with a specified number of bytes.
- """
- b = os.urandom(num_bytes)
- h = hashlib.sha256(b).digest()[:16]
- s = base64.b32encode(h).lower().replace(b"=", b"").decode("utf-8")
- if not output_len:
- return s
- return s[:output_len]
-
- @staticmethod
- def human_readable_filesize(b):
- """
- Returns filesize in a human readable format.
- """
- thresh = 1024.0
- if b < thresh:
- return "{:.1f} B".format(b)
- units = ("KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
- u = 0
- b /= thresh
- while b >= thresh:
- b /= thresh
- u += 1
- return "{:.1f} {}".format(b, units[u])
-
- @staticmethod
- def format_seconds(seconds):
- """Return a human-readable string of the format 1d2h3m4s"""
- days, seconds = divmod(seconds, 86400)
- hours, seconds = divmod(seconds, 3600)
- minutes, seconds = divmod(seconds, 60)
-
- human_readable = []
- if days:
- human_readable.append("{:.0f}d".format(days))
- if hours:
- human_readable.append("{:.0f}h".format(hours))
- if minutes:
- human_readable.append("{:.0f}m".format(minutes))
- if seconds or not human_readable:
- human_readable.append("{:.0f}s".format(seconds))
- return "".join(human_readable)
-
- @staticmethod
- def estimated_time_remaining(bytes_downloaded, total_bytes, started):
- now = time.time()
- time_elapsed = now - started # in seconds
- download_rate = bytes_downloaded / time_elapsed
- remaining_bytes = total_bytes - bytes_downloaded
- eta = remaining_bytes / download_rate
- return Common.format_seconds(eta)
-
- @staticmethod
- def get_available_port(min_port, max_port):
- """
- Find a random available port within the given range.
- """
- with socket.socket() as tmpsock:
- while True:
- try:
- tmpsock.bind(("127.0.0.1", random.randint(min_port, max_port)))
- break
- except OSError as e:
- pass
- _, port = tmpsock.getsockname()
- return port
-
- @staticmethod
- def dir_size(start_path):
- """
- Calculates the total size, in bytes, of all of the files in a directory.
- """
- total_size = 0
- for dirpath, dirnames, filenames in os.walk(start_path):
- for f in filenames:
- fp = os.path.join(dirpath, f)
- if not os.path.islink(fp):
- total_size += os.path.getsize(fp)
- return total_size
-
-
-class AutoStopTimer(threading.Thread):
- """
- Background thread sleeps t hours and returns.
- """
-
- def __init__(self, common, time):
- threading.Thread.__init__(self)
-
- self.common = common
-
- self.setDaemon(True)
- self.time = time
-
- def run(self):
- self.common.log(
- "AutoStopTimer", f"Server will shut down after {self.time} seconds"
- )
- time.sleep(self.time)
- return 1