aboutsummaryrefslogtreecommitdiff
path: root/desktop/onionshare/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'desktop/onionshare/common.py')
-rw-r--r--desktop/onionshare/common.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/desktop/onionshare/common.py b/desktop/onionshare/common.py
new file mode 100644
index 00000000..27104669
--- /dev/null
+++ b/desktop/onionshare/common.py
@@ -0,0 +1,336 @@
+# -*- coding: utf-8 -*-
+"""
+OnionShare | https://onionshare.org/
+
+Copyright (C) 2014-2020 Micah Lee, et al. <micah@micahflee.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+import base64
+import hashlib
+import inspect
+import os
+import platform
+import random
+import socket
+import sys
+import tempfile
+import threading
+import time
+import shutil
+
+from .settings import Settings
+
+
+class Common:
+ """
+ The Common object is shared amongst all parts of OnionShare.
+ """
+
+ def __init__(self, verbose=False):
+ self.verbose = verbose
+
+ # The platform OnionShare is running on
+ self.platform = platform.system()
+ if self.platform.endswith("BSD") or self.platform == "DragonFly":
+ self.platform = "BSD"
+
+ # The current version of OnionShare
+ with open(self.get_resource_path("version.txt")) as f:
+ self.version = f.read().strip()
+
+ def load_settings(self, config=None):
+ """
+ Loading settings, optionally from a custom config json file.
+ """
+ self.settings = Settings(self, config)
+ self.settings.load()
+
+ def log(self, module, func, msg=None):
+ """
+ If verbose mode is on, log error messages to stdout
+ """
+ if self.verbose:
+ timestamp = time.strftime("%b %d %Y %X")
+
+ final_msg = f"[{timestamp}] {module}.{func}"
+ if msg:
+ final_msg = f"{final_msg}: {msg}"
+ print(final_msg)
+
+ def get_resource_path(self, filename):
+ """
+ Returns the absolute path of a resource, regardless of whether OnionShare is installed
+ systemwide, and whether regardless of platform
+ """
+ # On Windows, and in Windows dev mode, switch slashes in incoming filename to backslackes
+ if self.platform == "Windows":
+ filename = filename.replace("/", "\\")
+
+ if getattr(sys, "onionshare_dev_mode", False):
+ # Look for resources directory relative to python file
+ prefix = os.path.join(
+ os.path.dirname(
+ os.path.dirname(
+ os.path.abspath(inspect.getfile(inspect.currentframe()))
+ )
+ ),
+ "share",
+ )
+ if not os.path.exists(prefix):
+ # While running tests during stdeb bdist_deb, look 3 directories up for the share folder
+ prefix = os.path.join(
+ os.path.dirname(
+ os.path.dirname(os.path.dirname(os.path.dirname(prefix)))
+ ),
+ "share",
+ )
+
+ elif self.platform == "BSD" or self.platform == "Linux":
+ # Look for resources relative to the binary, so if the binary is /usr/bin/onionshare-gui and
+ # the resource dir is /usr/share/onionshare, then the resource dir relative to the binary dir
+ # is ../share/onionshare
+ prefix = os.path.join(
+ os.path.dirname(os.path.dirname(sys.argv[0])), "share/onionshare"
+ )
+
+ elif getattr(sys, "frozen", False):
+ # Check if app is "frozen"
+ # https://pythonhosted.org/PyInstaller/#run-time-information
+ if self.platform == "Darwin":
+ prefix = os.path.join(sys._MEIPASS, "share")
+ elif self.platform == "Windows":
+ prefix = os.path.join(os.path.dirname(sys.executable), "share")
+
+ return os.path.join(prefix, filename)
+
+ def get_tor_paths(self):
+ if self.platform == "Linux":
+ tor_path = shutil.which("tor")
+ obfs4proxy_file_path = shutil.which("obfs4proxy")
+ prefix = os.path.dirname(os.path.dirname(tor_path))
+ tor_geo_ip_file_path = os.path.join(prefix, "share/tor/geoip")
+ tor_geo_ipv6_file_path = os.path.join(prefix, "share/tor/geoip6")
+ elif self.platform == "Windows":
+ base_path = os.path.join(
+ os.path.dirname(os.path.dirname(self.get_resource_path(""))), "tor"
+ )
+ tor_path = os.path.join(os.path.join(base_path, "Tor"), "tor.exe")
+ obfs4proxy_file_path = os.path.join(
+ os.path.join(base_path, "Tor"), "obfs4proxy.exe"
+ )
+ tor_geo_ip_file_path = os.path.join(
+ os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip"
+ )
+ tor_geo_ipv6_file_path = os.path.join(
+ os.path.join(os.path.join(base_path, "Data"), "Tor"), "geoip6"
+ )
+ elif self.platform == "Darwin":
+ base_path = os.path.dirname(
+ os.path.dirname(os.path.dirname(self.get_resource_path("")))
+ )
+ tor_path = os.path.join(base_path, "Resources", "Tor", "tor")
+ tor_geo_ip_file_path = os.path.join(base_path, "Resources", "Tor", "geoip")
+ tor_geo_ipv6_file_path = os.path.join(
+ base_path, "Resources", "Tor", "geoip6"
+ )
+ obfs4proxy_file_path = os.path.join(
+ base_path, "Resources", "Tor", "obfs4proxy"
+ )
+ elif self.platform == "BSD":
+ tor_path = "/usr/local/bin/tor"
+ tor_geo_ip_file_path = "/usr/local/share/tor/geoip"
+ tor_geo_ipv6_file_path = "/usr/local/share/tor/geoip6"
+ obfs4proxy_file_path = "/usr/local/bin/obfs4proxy"
+
+ return (
+ tor_path,
+ tor_geo_ip_file_path,
+ tor_geo_ipv6_file_path,
+ obfs4proxy_file_path,
+ )
+
+ def build_data_dir(self):
+ """
+ Returns the path of the OnionShare data directory.
+ """
+ if self.platform == "Windows":
+ try:
+ appdata = os.environ["APPDATA"]
+ onionshare_data_dir = f"{appdata}\\OnionShare"
+ except:
+ # If for some reason we don't have the 'APPDATA' environment variable
+ # (like running tests in Linux while pretending to be in Windows)
+ onionshare_data_dir = os.path.expanduser("~/.config/onionshare")
+ elif self.platform == "Darwin":
+ onionshare_data_dir = os.path.expanduser(
+ "~/Library/Application Support/OnionShare"
+ )
+ else:
+ onionshare_data_dir = os.path.expanduser("~/.config/onionshare")
+
+ # Modify the data dir if running tests
+ if getattr(sys, "onionshare_test_mode", False):
+ onionshare_data_dir += "-testdata"
+
+ os.makedirs(onionshare_data_dir, 0o700, True)
+ return onionshare_data_dir
+
+ def build_tmp_dir(self):
+ """
+ Returns path to a folder that can hold temporary files
+ """
+ tmp_dir = os.path.join(self.build_data_dir(), "tmp")
+ os.makedirs(tmp_dir, 0o700, True)
+ return tmp_dir
+
+ def build_persistent_dir(self):
+ """
+ Returns the path to the folder that holds persistent files
+ """
+ persistent_dir = os.path.join(self.build_data_dir(), "persistent")
+ os.makedirs(persistent_dir, 0o700, True)
+ return persistent_dir
+
+ def build_tor_dir(self):
+ """
+ Returns path to the tor data directory
+ """
+ tor_dir = os.path.join(self.build_data_dir(), "tor_data")
+ os.makedirs(tor_dir, 0o700, True)
+ return tor_dir
+
+ def build_password(self, word_count=2):
+ """
+ Returns a random string made of words from the wordlist, such as "deter-trig".
+ """
+ with open(self.get_resource_path("wordlist.txt")) as f:
+ wordlist = f.read().split()
+
+ r = random.SystemRandom()
+ return "-".join(r.choice(wordlist) for _ in range(word_count))
+
+ def build_username(self, word_count=2):
+ """
+ Returns a random string made of words from the wordlist, such as "deter-trig".
+ """
+ with open(self.get_resource_path("wordlist.txt")) as f:
+ wordlist = f.read().split()
+
+ r = random.SystemRandom()
+ return "-".join(r.choice(wordlist) for _ in range(word_count))
+
+ @staticmethod
+ def random_string(num_bytes, output_len=None):
+ """
+ Returns a random string with a specified number of bytes.
+ """
+ b = os.urandom(num_bytes)
+ h = hashlib.sha256(b).digest()[:16]
+ s = base64.b32encode(h).lower().replace(b"=", b"").decode("utf-8")
+ if not output_len:
+ return s
+ return s[:output_len]
+
+ @staticmethod
+ def human_readable_filesize(b):
+ """
+ Returns filesize in a human readable format.
+ """
+ thresh = 1024.0
+ if b < thresh:
+ return "{:.1f} B".format(b)
+ units = ("KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
+ u = 0
+ b /= thresh
+ while b >= thresh:
+ b /= thresh
+ u += 1
+ return "{:.1f} {}".format(b, units[u])
+
+ @staticmethod
+ def format_seconds(seconds):
+ """Return a human-readable string of the format 1d2h3m4s"""
+ days, seconds = divmod(seconds, 86400)
+ hours, seconds = divmod(seconds, 3600)
+ minutes, seconds = divmod(seconds, 60)
+
+ human_readable = []
+ if days:
+ human_readable.append("{:.0f}d".format(days))
+ if hours:
+ human_readable.append("{:.0f}h".format(hours))
+ if minutes:
+ human_readable.append("{:.0f}m".format(minutes))
+ if seconds or not human_readable:
+ human_readable.append("{:.0f}s".format(seconds))
+ return "".join(human_readable)
+
+ @staticmethod
+ def estimated_time_remaining(bytes_downloaded, total_bytes, started):
+ now = time.time()
+ time_elapsed = now - started # in seconds
+ download_rate = bytes_downloaded / time_elapsed
+ remaining_bytes = total_bytes - bytes_downloaded
+ eta = remaining_bytes / download_rate
+ return Common.format_seconds(eta)
+
+ @staticmethod
+ def get_available_port(min_port, max_port):
+ """
+ Find a random available port within the given range.
+ """
+ with socket.socket() as tmpsock:
+ while True:
+ try:
+ tmpsock.bind(("127.0.0.1", random.randint(min_port, max_port)))
+ break
+ except OSError as e:
+ pass
+ _, port = tmpsock.getsockname()
+ return port
+
+ @staticmethod
+ def dir_size(start_path):
+ """
+ Calculates the total size, in bytes, of all of the files in a directory.
+ """
+ total_size = 0
+ for dirpath, dirnames, filenames in os.walk(start_path):
+ for f in filenames:
+ fp = os.path.join(dirpath, f)
+ if not os.path.islink(fp):
+ total_size += os.path.getsize(fp)
+ return total_size
+
+
+class AutoStopTimer(threading.Thread):
+ """
+ Background thread sleeps t hours and returns.
+ """
+
+ def __init__(self, common, time):
+ threading.Thread.__init__(self)
+
+ self.common = common
+
+ self.setDaemon(True)
+ self.time = time
+
+ def run(self):
+ self.common.log(
+ "AutoStopTimer", f"Server will shut down after {self.time} seconds"
+ )
+ time.sleep(self.time)
+ return 1