summaryrefslogtreecommitdiff
path: root/onionshare/onion.py
diff options
context:
space:
mode:
Diffstat (limited to 'onionshare/onion.py')
-rw-r--r--onionshare/onion.py442
1 files changed, 312 insertions, 130 deletions
diff --git a/onionshare/onion.py b/onionshare/onion.py
index 2f4ddffd..727cf5f1 100644
--- a/onionshare/onion.py
+++ b/onionshare/onion.py
@@ -28,90 +28,113 @@ from distutils.version import LooseVersion as Version
from . import common, strings
from .settings import Settings
+
class TorErrorAutomatic(Exception):
"""
OnionShare is failing to connect and authenticate to the Tor controller,
using automatic settings that should work with Tor Browser.
"""
+
pass
+
class TorErrorInvalidSetting(Exception):
"""
This exception is raised if the settings just don't make sense.
"""
+
pass
+
class TorErrorSocketPort(Exception):
"""
OnionShare can't connect to the Tor controller using the supplied address and port.
"""
+
pass
+
class TorErrorSocketFile(Exception):
"""
OnionShare can't connect to the Tor controller using the supplied socket file.
"""
+
pass
+
class TorErrorMissingPassword(Exception):
"""
OnionShare connected to the Tor controller, but it requires a password.
"""
+
pass
+
class TorErrorUnreadableCookieFile(Exception):
"""
OnionShare connected to the Tor controller, but your user does not have permission
to access the cookie file.
"""
+
pass
+
class TorErrorAuthError(Exception):
"""
OnionShare connected to the address and port, but can't authenticate. It's possible
that a Tor controller isn't listening on this port.
"""
+
pass
+
class TorErrorProtocolError(Exception):
"""
This exception is raised if onionshare connects to the Tor controller, but it
isn't acting like a Tor controller (such as in Whonix).
"""
+
pass
+
class TorTooOld(Exception):
"""
This exception is raised if onionshare needs to use a feature of Tor or stem
(like stealth ephemeral onion services) but the version you have installed
is too old.
"""
+
pass
+
class BundledTorNotSupported(Exception):
"""
This exception is raised if onionshare is set to use the bundled Tor binary,
but it's not supported on that platform, or in dev mode.
"""
+
class BundledTorTimeout(Exception):
"""
This exception is raised if onionshare is set to use the bundled Tor binary,
but Tor doesn't finish connecting promptly.
"""
+
class BundledTorCanceled(Exception):
"""
This exception is raised if onionshare is set to use the bundled Tor binary,
and the user cancels connecting to Tor
"""
+
class BundledTorBroken(Exception):
"""
This exception is raised if onionshare is set to use the bundled Tor binary,
but the process seems to fail to run.
"""
+
class Onion(object):
"""
Onion is an abstraction layer for connecting to the Tor control port and
@@ -126,10 +149,11 @@ class Onion(object):
call this function and pass in a status string while connecting to tor. This
is necessary for status updates to reach the GUI.
"""
+
def __init__(self, common):
self.common = common
- self.common.log('Onion', '__init__')
+ self.common.log("Onion", "__init__")
self.stealth = False
self.service_id = None
@@ -137,13 +161,20 @@ class Onion(object):
self.scheduled_auth_cookie = None
# Is bundled tor supported?
- if (self.common.platform == 'Windows' or self.common.platform == 'Darwin') and getattr(sys, 'onionshare_dev_mode', False):
+ if (
+ self.common.platform == "Windows" or self.common.platform == "Darwin"
+ ) and getattr(sys, "onionshare_dev_mode", False):
self.bundle_tor_supported = False
else:
self.bundle_tor_supported = True
# Set the path of the tor binary, for bundled tor
- (self.tor_path, self.tor_geo_ip_file_path, self.tor_geo_ipv6_file_path, self.obfs4proxy_file_path) = self.common.get_tor_paths()
+ (
+ self.tor_path,
+ self.tor_geo_ip_file_path,
+ self.tor_geo_ipv6_file_path,
+ self.obfs4proxy_file_path,
+ ) = self.common.get_tor_paths()
# The tor process
self.tor_proc = None
@@ -154,8 +185,14 @@ class Onion(object):
# Start out not connected to Tor
self.connected_to_tor = False
- def connect(self, custom_settings=False, config=False, tor_status_update_func=None, connect_timeout=120):
- self.common.log('Onion', 'connect')
+ def connect(
+ self,
+ custom_settings=False,
+ config=False,
+ tor_status_update_func=None,
+ connect_timeout=120,
+ ):
+ self.common.log("Onion", "connect")
# Either use settings that are passed in, or use them from common
if custom_settings:
@@ -171,95 +208,157 @@ class Onion(object):
# The Tor controller
self.c = None
- if self.settings.get('connection_type') == 'bundled':
+ if self.settings.get("connection_type") == "bundled":
if not self.bundle_tor_supported:
- raise BundledTorNotSupported(strings._('settings_error_bundled_tor_not_supported'))
+ raise BundledTorNotSupported(
+ strings._("settings_error_bundled_tor_not_supported")
+ )
# Create a torrc for this session
- self.tor_data_directory = tempfile.TemporaryDirectory(dir=self.common.build_data_dir())
- self.common.log('Onion', 'connect', 'tor_data_directory={}'.format(self.tor_data_directory.name))
+ self.tor_data_directory = tempfile.TemporaryDirectory(
+ dir=self.common.build_data_dir()
+ )
+ self.common.log(
+ "Onion",
+ "connect",
+ "tor_data_directory={}".format(self.tor_data_directory.name),
+ )
# Create the torrc
- with open(self.common.get_resource_path('torrc_template')) as f:
+ with open(self.common.get_resource_path("torrc_template")) as f:
torrc_template = f.read()
- self.tor_cookie_auth_file = os.path.join(self.tor_data_directory.name, 'cookie')
+ self.tor_cookie_auth_file = os.path.join(
+ self.tor_data_directory.name, "cookie"
+ )
try:
self.tor_socks_port = self.common.get_available_port(1000, 65535)
except:
- raise OSError(strings._('no_available_port'))
- self.tor_torrc = os.path.join(self.tor_data_directory.name, 'torrc')
+ raise OSError(strings._("no_available_port"))
+ self.tor_torrc = os.path.join(self.tor_data_directory.name, "torrc")
- if self.common.platform == 'Windows' or self.common.platform == "Darwin":
+ if self.common.platform == "Windows" or self.common.platform == "Darwin":
# Windows doesn't support unix sockets, so it must use a network port.
# macOS can't use unix sockets either because socket filenames are limited to
# 100 chars, and the macOS sandbox forces us to put the socket file in a place
# with a really long path.
- torrc_template += 'ControlPort {{control_port}}\n'
+ torrc_template += "ControlPort {{control_port}}\n"
try:
self.tor_control_port = self.common.get_available_port(1000, 65535)
except:
- raise OSError(strings._('no_available_port'))
+ raise OSError(strings._("no_available_port"))
self.tor_control_socket = None
else:
# Linux and BSD can use unix sockets
- torrc_template += 'ControlSocket {{control_socket}}\n'
+ torrc_template += "ControlSocket {{control_socket}}\n"
self.tor_control_port = None
- self.tor_control_socket = os.path.join(self.tor_data_directory.name, 'control_socket')
-
- torrc_template = torrc_template.replace('{{data_directory}}', self.tor_data_directory.name)
- torrc_template = torrc_template.replace('{{control_port}}', str(self.tor_control_port))
- torrc_template = torrc_template.replace('{{control_socket}}', str(self.tor_control_socket))
- torrc_template = torrc_template.replace('{{cookie_auth_file}}', self.tor_cookie_auth_file)
- torrc_template = torrc_template.replace('{{geo_ip_file}}', self.tor_geo_ip_file_path)
- torrc_template = torrc_template.replace('{{geo_ipv6_file}}', self.tor_geo_ipv6_file_path)
- torrc_template = torrc_template.replace('{{socks_port}}', str(self.tor_socks_port))
-
- with open(self.tor_torrc, 'w') as f:
+ self.tor_control_socket = os.path.join(
+ self.tor_data_directory.name, "control_socket"
+ )
+
+ torrc_template = torrc_template.replace(
+ "{{data_directory}}", self.tor_data_directory.name
+ )
+ torrc_template = torrc_template.replace(
+ "{{control_port}}", str(self.tor_control_port)
+ )
+ torrc_template = torrc_template.replace(
+ "{{control_socket}}", str(self.tor_control_socket)
+ )
+ torrc_template = torrc_template.replace(
+ "{{cookie_auth_file}}", self.tor_cookie_auth_file
+ )
+ torrc_template = torrc_template.replace(
+ "{{geo_ip_file}}", self.tor_geo_ip_file_path
+ )
+ torrc_template = torrc_template.replace(
+ "{{geo_ipv6_file}}", self.tor_geo_ipv6_file_path
+ )
+ torrc_template = torrc_template.replace(
+ "{{socks_port}}", str(self.tor_socks_port)
+ )
+
+ with open(self.tor_torrc, "w") as f:
f.write(torrc_template)
# Bridge support
- if self.settings.get('tor_bridges_use_obfs4'):
- f.write('ClientTransportPlugin obfs4 exec {}\n'.format(self.obfs4proxy_file_path))
- with open(self.common.get_resource_path('torrc_template-obfs4')) as o:
+ if self.settings.get("tor_bridges_use_obfs4"):
+ f.write(
+ "ClientTransportPlugin obfs4 exec {}\n".format(
+ self.obfs4proxy_file_path
+ )
+ )
+ with open(
+ self.common.get_resource_path("torrc_template-obfs4")
+ ) as o:
for line in o:
f.write(line)
- elif self.settings.get('tor_bridges_use_meek_lite_azure'):
- f.write('ClientTransportPlugin meek_lite exec {}\n'.format(self.obfs4proxy_file_path))
- with open(self.common.get_resource_path('torrc_template-meek_lite_azure')) as o:
+ elif self.settings.get("tor_bridges_use_meek_lite_azure"):
+ f.write(
+ "ClientTransportPlugin meek_lite exec {}\n".format(
+ self.obfs4proxy_file_path
+ )
+ )
+ with open(
+ self.common.get_resource_path("torrc_template-meek_lite_azure")
+ ) as o:
for line in o:
f.write(line)
- if self.settings.get('tor_bridges_use_custom_bridges'):
- if 'obfs4' in self.settings.get('tor_bridges_use_custom_bridges'):
- f.write('ClientTransportPlugin obfs4 exec {}\n'.format(self.obfs4proxy_file_path))
- elif 'meek_lite' in self.settings.get('tor_bridges_use_custom_bridges'):
- f.write('ClientTransportPlugin meek_lite exec {}\n'.format(self.obfs4proxy_file_path))
- f.write(self.settings.get('tor_bridges_use_custom_bridges'))
- f.write('\nUseBridges 1')
+ if self.settings.get("tor_bridges_use_custom_bridges"):
+ if "obfs4" in self.settings.get("tor_bridges_use_custom_bridges"):
+ f.write(
+ "ClientTransportPlugin obfs4 exec {}\n".format(
+ self.obfs4proxy_file_path
+ )
+ )
+ elif "meek_lite" in self.settings.get(
+ "tor_bridges_use_custom_bridges"
+ ):
+ f.write(
+ "ClientTransportPlugin meek_lite exec {}\n".format(
+ self.obfs4proxy_file_path
+ )
+ )
+ f.write(self.settings.get("tor_bridges_use_custom_bridges"))
+ f.write("\nUseBridges 1")
# Execute a tor subprocess
start_ts = time.time()
- if self.common.platform == 'Windows':
+ if self.common.platform == "Windows":
# In Windows, hide console window when opening tor.exe subprocess
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- self.tor_proc = subprocess.Popen([self.tor_path, '-f', self.tor_torrc], stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=startupinfo)
+ self.tor_proc = subprocess.Popen(
+ [self.tor_path, "-f", self.tor_torrc],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ startupinfo=startupinfo,
+ )
else:
- self.tor_proc = subprocess.Popen([self.tor_path, '-f', self.tor_torrc], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.tor_proc = subprocess.Popen(
+ [self.tor_path, "-f", self.tor_torrc],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
# Wait for the tor controller to start
time.sleep(2)
# Connect to the controller
try:
- if self.common.platform == 'Windows' or self.common.platform == "Darwin":
+ if (
+ self.common.platform == "Windows"
+ or self.common.platform == "Darwin"
+ ):
self.c = Controller.from_port(port=self.tor_control_port)
self.c.authenticate()
else:
self.c = Controller.from_socket_file(path=self.tor_control_socket)
self.c.authenticate()
except Exception as e:
- raise BundledTorBroken(strings._('settings_error_bundled_tor_broken').format(e.args[0]))
+ raise BundledTorBroken(
+ strings._("settings_error_bundled_tor_broken").format(e.args[0])
+ )
while True:
try:
@@ -268,47 +367,60 @@ class Onion(object):
raise BundledTorCanceled()
res_parts = shlex.split(res)
- progress = res_parts[2].split('=')[1]
- summary = res_parts[4].split('=')[1]
+ progress = res_parts[2].split("=")[1]
+ summary = res_parts[4].split("=")[1]
# "\033[K" clears the rest of the line
- print("Connecting to the Tor network: {}% - {}{}".format(progress, summary, "\033[K"), end="\r")
+ print(
+ "Connecting to the Tor network: {}% - {}{}".format(
+ progress, summary, "\033[K"
+ ),
+ end="\r",
+ )
if callable(tor_status_update_func):
if not tor_status_update_func(progress, summary):
# If the dialog was canceled, stop connecting to Tor
- self.common.log('Onion', 'connect', 'tor_status_update_func returned false, canceling connecting to Tor')
+ self.common.log(
+ "Onion",
+ "connect",
+ "tor_status_update_func returned false, canceling connecting to Tor",
+ )
print()
return False
- if summary == 'Done':
+ if summary == "Done":
print("")
break
time.sleep(0.2)
# If using bridges, it might take a bit longer to connect to Tor
- if self.settings.get('tor_bridges_use_custom_bridges') or \
- self.settings.get('tor_bridges_use_obfs4') or \
- self.settings.get('tor_bridges_use_meek_lite_azure'):
- # Only override timeout if a custom timeout has not been passed in
- if connect_timeout == 120:
- connect_timeout = 150
+ if (
+ self.settings.get("tor_bridges_use_custom_bridges")
+ or self.settings.get("tor_bridges_use_obfs4")
+ or self.settings.get("tor_bridges_use_meek_lite_azure")
+ ):
+ # Only override timeout if a custom timeout has not been passed in
+ if connect_timeout == 120:
+ connect_timeout = 150
if time.time() - start_ts > connect_timeout:
print("")
try:
self.tor_proc.terminate()
- raise BundledTorTimeout(strings._('settings_error_bundled_tor_timeout'))
+ raise BundledTorTimeout(
+ strings._("settings_error_bundled_tor_timeout")
+ )
except FileNotFoundError:
pass
- elif self.settings.get('connection_type') == 'automatic':
+ elif self.settings.get("connection_type") == "automatic":
# Automatically try to guess the right way to connect to Tor Browser
# Try connecting to control port
found_tor = False
# If the TOR_CONTROL_PORT environment variable is set, use that
- env_port = os.environ.get('TOR_CONTROL_PORT')
+ env_port = os.environ.get("TOR_CONTROL_PORT")
if env_port:
try:
self.c = Controller.from_port(port=int(env_port))
@@ -327,11 +439,13 @@ class Onion(object):
pass
# If this still didn't work, try guessing the default socket file path
- socket_file_path = ''
+ socket_file_path = ""
if not found_tor:
try:
- if self.common.platform == 'Darwin':
- socket_file_path = os.path.expanduser('~/Library/Application Support/TorBrowser-Data/Tor/control.socket')
+ if self.common.platform == "Darwin":
+ socket_file_path = os.path.expanduser(
+ "~/Library/Application Support/TorBrowser-Data/Tor/control.socket"
+ )
self.c = Controller.from_socket_file(path=socket_file_path)
found_tor = True
@@ -342,74 +456,108 @@ class Onion(object):
# guessing the socket file name next
if not found_tor:
try:
- if self.common.platform == 'Linux' or self.common.platform == 'BSD':
- socket_file_path = '/run/user/{}/Tor/control.socket'.format(os.geteuid())
- elif self.common.platform == 'Darwin':
- socket_file_path = '/run/user/{}/Tor/control.socket'.format(os.geteuid())
- elif self.common.platform == 'Windows':
+ if self.common.platform == "Linux" or self.common.platform == "BSD":
+ socket_file_path = "/run/user/{}/Tor/control.socket".format(
+ os.geteuid()
+ )
+ elif self.common.platform == "Darwin":
+ socket_file_path = "/run/user/{}/Tor/control.socket".format(
+ os.geteuid()
+ )
+ elif self.common.platform == "Windows":
# Windows doesn't support unix sockets
- raise TorErrorAutomatic(strings._('settings_error_automatic'))
+ raise TorErrorAutomatic(strings._("settings_error_automatic"))
self.c = Controller.from_socket_file(path=socket_file_path)
except:
- raise TorErrorAutomatic(strings._('settings_error_automatic'))
+ raise TorErrorAutomatic(strings._("settings_error_automatic"))
# Try authenticating
try:
self.c.authenticate()
except:
- raise TorErrorAutomatic(strings._('settings_error_automatic'))
+ raise TorErrorAutomatic(strings._("settings_error_automatic"))
else:
# Use specific settings to connect to tor
# Try connecting
try:
- if self.settings.get('connection_type') == 'control_port':
- self.c = Controller.from_port(address=self.settings.get('control_port_address'), port=self.settings.get('control_port_port'))
- elif self.settings.get('connection_type') == 'socket_file':
- self.c = Controller.from_socket_file(path=self.settings.get('socket_file_path'))
+ if self.settings.get("connection_type") == "control_port":
+ self.c = Controller.from_port(
+ address=self.settings.get("control_port_address"),
+ port=self.settings.get("control_port_port"),
+ )
+ elif self.settings.get("connection_type") == "socket_file":
+ self.c = Controller.from_socket_file(
+ path=self.settings.get("socket_file_path")
+ )
else:
raise TorErrorInvalidSetting(strings._("settings_error_unknown"))
except:
- if self.settings.get('connection_type') == 'control_port':
- raise TorErrorSocketPort(strings._("settings_error_socket_port").format(self.settings.get('control_port_address'), self.settings.get('control_port_port')))
+ if self.settings.get("connection_type") == "control_port":
+ raise TorErrorSocketPort(
+ strings._("settings_error_socket_port").format(
+ self.settings.get("control_port_address"),
+ self.settings.get("control_port_port"),
+ )
+ )
else:
- raise TorErrorSocketFile(strings._("settings_error_socket_file").format(self.settings.get('socket_file_path')))
-
+ raise TorErrorSocketFile(
+ strings._("settings_error_socket_file").format(
+ self.settings.get("socket_file_path")
+ )
+ )
# Try authenticating
try:
- if self.settings.get('auth_type') == 'no_auth':
+ if self.settings.get("auth_type") == "no_auth":
self.c.authenticate()
- elif self.settings.get('auth_type') == 'password':
- self.c.authenticate(self.settings.get('auth_password'))
+ elif self.settings.get("auth_type") == "password":
+ self.c.authenticate(self.settings.get("auth_password"))
else:
raise TorErrorInvalidSetting(strings._("settings_error_unknown"))
except MissingPassword:
- raise TorErrorMissingPassword(strings._('settings_error_missing_password'))
+ raise TorErrorMissingPassword(
+ strings._("settings_error_missing_password")
+ )
except UnreadableCookieFile:
- raise TorErrorUnreadableCookieFile(strings._('settings_error_unreadable_cookie_file'))
+ raise TorErrorUnreadableCookieFile(
+ strings._("settings_error_unreadable_cookie_file")
+ )
except AuthenticationFailure:
- raise TorErrorAuthError(strings._('settings_error_auth').format(self.settings.get('control_port_address'), self.settings.get('control_port_port')))
+ raise TorErrorAuthError(
+ strings._("settings_error_auth").format(
+ self.settings.get("control_port_address"),
+ self.settings.get("control_port_port"),
+ )
+ )
# If we made it this far, we should be connected to Tor
self.connected_to_tor = True
# Get the tor version
self.tor_version = self.c.get_version().version_str
- self.common.log('Onion', 'connect', 'Connected to tor {}'.format(self.tor_version))
+ self.common.log(
+ "Onion", "connect", "Connected to tor {}".format(self.tor_version)
+ )
# Do the versions of stem and tor that I'm using support ephemeral onion services?
- list_ephemeral_hidden_services = getattr(self.c, "list_ephemeral_hidden_services", None)
- self.supports_ephemeral = callable(list_ephemeral_hidden_services) and self.tor_version >= '0.2.7.1'
+ list_ephemeral_hidden_services = getattr(
+ self.c, "list_ephemeral_hidden_services", None
+ )
+ self.supports_ephemeral = (
+ callable(list_ephemeral_hidden_services) and self.tor_version >= "0.2.7.1"
+ )
# Do the versions of stem and tor that I'm using support stealth onion services?
try:
- res = self.c.create_ephemeral_hidden_service({1:1}, basic_auth={'onionshare':None}, await_publication=False)
+ res = self.c.create_ephemeral_hidden_service(
+ {1: 1}, basic_auth={"onionshare": None}, await_publication=False
+ )
tmp_service_id = res.service_id
self.c.remove_ephemeral_hidden_service(tmp_service_id)
self.supports_stealth = True
@@ -420,7 +568,7 @@ class Onion(object):
# Does this version of Tor support next-gen ('v3') onions?
# Note, this is the version of Tor where this bug was fixed:
# https://trac.torproject.org/projects/tor/ticket/28619
- self.supports_v3_onions = self.tor_version >= Version('0.3.5.7')
+ self.supports_v3_onions = self.tor_version >= Version("0.3.5.7")
def is_authenticated(self):
"""
@@ -431,37 +579,40 @@ class Onion(object):
else:
return False
-
def start_onion_service(self, port, await_publication, save_scheduled_key=False):
"""
Start a onion service on port 80, pointing to the given port, and
return the onion hostname.
"""
- self.common.log('Onion', 'start_onion_service')
+ self.common.log("Onion", "start_onion_service")
+ # Settings may have changed in the frontend but not updated in our settings object,
+ # such as persistence. Reload the settings now just to be sure.
+ self.settings.load()
+
self.auth_string = None
if not self.supports_ephemeral:
- raise TorTooOld(strings._('error_ephemeral_not_supported'))
+ raise TorTooOld(strings._("error_ephemeral_not_supported"))
if self.stealth and not self.supports_stealth:
- raise TorTooOld(strings._('error_stealth_not_supported'))
+ raise TorTooOld(strings._("error_stealth_not_supported"))
if not save_scheduled_key:
print("Setting up onion service on port {0:d}.".format(int(port)))
if self.stealth:
- if self.settings.get('hidservauth_string'):
- hidservauth_string = self.settings.get('hidservauth_string').split()[2]
- basic_auth = {'onionshare':hidservauth_string}
+ if self.settings.get("hidservauth_string"):
+ hidservauth_string = self.settings.get("hidservauth_string").split()[2]
+ basic_auth = {"onionshare": hidservauth_string}
else:
if self.scheduled_auth_cookie:
- basic_auth = {'onionshare':self.scheduled_auth_cookie}
+ basic_auth = {"onionshare": self.scheduled_auth_cookie}
else:
- basic_auth = {'onionshare':None}
+ basic_auth = {"onionshare": None}
else:
basic_auth = None
- if self.settings.get('private_key'):
- key_content = self.settings.get('private_key')
+ if self.settings.get("private_key"):
+ key_content = self.settings.get("private_key")
if self.is_v2_key(key_content):
key_type = "RSA1024"
else:
@@ -479,7 +630,9 @@ class Onion(object):
else:
key_type = "NEW"
# Work out if we can support v3 onion services, which are preferred
- if self.supports_v3_onions and not self.settings.get('use_legacy_v2_onions'):
+ if self.supports_v3_onions and not self.settings.get(
+ "use_legacy_v2_onions"
+ ):
key_content = "ED25519-V3"
else:
# fall back to v2 onion services
@@ -487,31 +640,48 @@ class Onion(object):
# v3 onions don't yet support basic auth. Our ticket:
# https://github.com/micahflee/onionshare/issues/697
- if key_type == "NEW" and key_content == "ED25519-V3" and not self.settings.get('use_legacy_v2_onions'):
+ if (
+ key_type == "NEW"
+ and key_content == "ED25519-V3"
+ and not self.settings.get("use_legacy_v2_onions")
+ ):
basic_auth = None
self.stealth = False
- debug_message = 'key_type={}'.format(key_type)
+ debug_message = "key_type={}".format(key_type)
if key_type == "NEW":
- debug_message += ', key_content={}'.format(key_content)
- self.common.log('Onion', 'start_onion_service', '{}'.format(debug_message))
+ debug_message += ", key_content={}".format(key_content)
+ self.common.log("Onion", "start_onion_service", "{}".format(debug_message))
try:
if basic_auth != None:
- res = self.c.create_ephemeral_hidden_service({ 80: port }, await_publication=await_publication, basic_auth=basic_auth, key_type=key_type, key_content=key_content)
+ res = self.c.create_ephemeral_hidden_service(
+ {80: port},
+ await_publication=await_publication,
+ basic_auth=basic_auth,
+ key_type=key_type,
+ key_content=key_content,
+ )
else:
# if the stem interface is older than 1.5.0, basic_auth isn't a valid keyword arg
- res = self.c.create_ephemeral_hidden_service({ 80: port }, await_publication=await_publication, key_type=key_type, key_content=key_content)
+ res = self.c.create_ephemeral_hidden_service(
+ {80: port},
+ await_publication=await_publication,
+ key_type=key_type,
+ key_content=key_content,
+ )
except ProtocolError as e:
- raise TorErrorProtocolError(strings._('error_tor_protocol_error').format(e.args[0]))
+ raise TorErrorProtocolError(
+ strings._("error_tor_protocol_error").format(e.args[0])
+ )
self.service_id = res.service_id
- onion_host = self.service_id + '.onion'
+ onion_host = self.service_id + ".onion"
# A new private key was generated and is in the Control port response.
- if self.settings.get('save_private_key'):
- if not self.settings.get('private_key'):
- self.settings.set('private_key', res.private_key)
+ if self.settings.get("save_private_key"):
+ if not self.settings.get("private_key"):
+ self.settings.set("private_key", res.private_key)
# If we were scheduling a future share, register the private key for later re-use
if save_scheduled_key:
@@ -525,24 +695,30 @@ class Onion(object):
# in the first place.
# If we sent the basic_auth (due to a saved hidservauth_string in the settings),
# there is no response here, so use the saved value from settings.
- if self.settings.get('save_private_key'):
- if self.settings.get('hidservauth_string'):
- self.auth_string = self.settings.get('hidservauth_string')
+ if self.settings.get("save_private_key"):
+ if self.settings.get("hidservauth_string"):
+ self.auth_string = self.settings.get("hidservauth_string")
else:
auth_cookie = list(res.client_auth.values())[0]
- self.auth_string = 'HidServAuth {} {}'.format(onion_host, auth_cookie)
- self.settings.set('hidservauth_string', self.auth_string)
+ self.auth_string = "HidServAuth {} {}".format(
+ onion_host, auth_cookie
+ )
+ self.settings.set("hidservauth_string", self.auth_string)
else:
if not self.scheduled_auth_cookie:
auth_cookie = list(res.client_auth.values())[0]
- self.auth_string = 'HidServAuth {} {}'.format(onion_host, auth_cookie)
+ self.auth_string = "HidServAuth {} {}".format(
+ onion_host, auth_cookie
+ )
if save_scheduled_key:
# Register the HidServAuth for the scheduled share
self.scheduled_auth_cookie = auth_cookie
else:
self.scheduled_auth_cookie = None
else:
- self.auth_string = 'HidServAuth {} {}'.format(onion_host, self.scheduled_auth_cookie)
+ self.auth_string = "HidServAuth {} {}".format(
+ onion_host, self.scheduled_auth_cookie
+ )
if not save_scheduled_key:
# We've used the scheduled share's HidServAuth. Reset it to None for future shares
self.scheduled_auth_cookie = None
@@ -551,23 +727,29 @@ class Onion(object):
self.settings.save()
return onion_host
else:
- raise TorErrorProtocolError(strings._('error_tor_protocol_error_unknown'))
+ raise TorErrorProtocolError(strings._("error_tor_protocol_error_unknown"))
def cleanup(self, stop_tor=True):
"""
Stop onion services that were created earlier. If there's a tor subprocess running, kill it.
"""
- self.common.log('Onion', 'cleanup')
+ self.common.log("Onion", "cleanup")
# Cleanup the ephemeral onion services, if we have any
try:
onions = self.c.list_ephemeral_hidden_services()
for onion in onions:
try:
- self.common.log('Onion', 'cleanup', 'trying to remove onion {}'.format(onion))
+ self.common.log(
+ "Onion", "cleanup", "trying to remove onion {}".format(onion)
+ )
self.c.remove_ephemeral_hidden_service(onion)
except:
- self.common.log('Onion', 'cleanup', 'could not remove onion {}.. moving on anyway'.format(onion))
+ self.common.log(
+ "Onion",
+ "cleanup",
+ "could not remove onion {}.. moving on anyway".format(onion),
+ )
pass
except:
pass
@@ -604,14 +786,14 @@ class Onion(object):
"""
Returns a (address, port) tuple for the Tor SOCKS port
"""
- self.common.log('Onion', 'get_tor_socks_port')
+ self.common.log("Onion", "get_tor_socks_port")
- if self.settings.get('connection_type') == 'bundled':
- return ('127.0.0.1', self.tor_socks_port)
- elif self.settings.get('connection_type') == 'automatic':
- return ('127.0.0.1', 9150)
+ if self.settings.get("connection_type") == "bundled":
+ return ("127.0.0.1", self.tor_socks_port)
+ elif self.settings.get("connection_type") == "automatic":
+ return ("127.0.0.1", 9150)
else:
- return (self.settings.get('socks_address'), self.settings.get('socks_port'))
+ return (self.settings.get("socks_address"), self.settings.get("socks_port"))
def is_v2_key(self, key):
"""