diff options
Diffstat (limited to 'onionshare/web/send_base_mode.py')
-rw-r--r-- | onionshare/web/send_base_mode.py | 139 |
1 files changed, 84 insertions, 55 deletions
diff --git a/onionshare/web/send_base_mode.py b/onionshare/web/send_base_mode.py index 24ad55d7..86d34016 100644 --- a/onionshare/web/send_base_mode.py +++ b/onionshare/web/send_base_mode.py @@ -12,6 +12,7 @@ class SendBaseModeWeb: """ All of the web logic shared between share and website mode (modes where the user sends files) """ + def __init__(self, common, web): super(SendBaseModeWeb, self).__init__() self.common = common @@ -41,20 +42,24 @@ class SendBaseModeWeb: """ # If there's just one folder, replace filenames with a list of files inside that folder if len(filenames) == 1 and os.path.isdir(filenames[0]): - filenames = [os.path.join(filenames[0], x) for x in os.listdir(filenames[0])] + filenames = [ + os.path.join(filenames[0], x) for x in os.listdir(filenames[0]) + ] # Re-initialize - self.files = {} # Dictionary mapping file paths to filenames on disk - self.root_files = {} # This is only the root files and dirs, as opposed to all of them + self.files = {} # Dictionary mapping file paths to filenames on disk + self.root_files = ( + {} + ) # This is only the root files and dirs, as opposed to all of them self.cleanup_filenames = [] self.cur_history_id = 0 - self.file_info = {'files': [], 'dirs': []} + self.file_info = {"files": [], "dirs": []} self.gzip_individual_files = {} self.init() # Build the file list for filename in filenames: - basename = os.path.basename(filename.rstrip('/')) + basename = os.path.basename(filename.rstrip("/")) # If it's a filename, add it if os.path.isfile(filename): @@ -63,42 +68,50 @@ class SendBaseModeWeb: # If it's a directory, add it recursively elif os.path.isdir(filename): - self.root_files[basename + '/'] = filename + self.root_files[basename + "/"] = filename for root, _, nested_filenames in os.walk(filename): # Normalize the root path. So if the directory name is "/home/user/Documents/some_folder", # and it has a nested folder foobar, the root is "/home/user/Documents/some_folder/foobar". # The normalized_root should be "some_folder/foobar" - normalized_root = os.path.join(basename, root[len(filename):].lstrip('/')).rstrip('/') + normalized_root = os.path.join( + basename, root[len(filename) :].lstrip("/") + ).rstrip("/") # Add the dir itself - self.files[normalized_root + '/'] = root + self.files[normalized_root + "/"] = root # Add the files in this dir for nested_filename in nested_filenames: - self.files[os.path.join(normalized_root, nested_filename)] = os.path.join(root, nested_filename) + self.files[ + os.path.join(normalized_root, nested_filename) + ] = os.path.join(root, nested_filename) self.set_file_info_custom(filenames, processed_size_callback) - def directory_listing(self, filenames, path='', filesystem_path=None): + def directory_listing(self, filenames, path="", filesystem_path=None): # Tell the GUI about the directory listing history_id = self.cur_history_id self.cur_history_id += 1 - self.web.add_request(self.web.REQUEST_INDIVIDUAL_FILE_STARTED, '/{}'.format(path), { - 'id': history_id, - 'method': request.method, - 'status_code': 200 - }) - - breadcrumbs = [('☗', '/')] - parts = path.split('/')[:-1] + self.web.add_request( + self.web.REQUEST_INDIVIDUAL_FILE_STARTED, + "/{}".format(path), + {"id": history_id, "method": request.method, "status_code": 200}, + ) + + breadcrumbs = [("☗", "/")] + parts = path.split("/")[:-1] for i in range(len(parts)): - breadcrumbs.append(('{}'.format(parts[i]), '/{}/'.format('/'.join(parts[0:i+1])))) + breadcrumbs.append( + ("{}".format(parts[i]), "/{}/".format("/".join(parts[0 : i + 1]))) + ) breadcrumbs_leaf = breadcrumbs.pop()[0] # If filesystem_path is None, this is the root directory listing files, dirs = self.build_directory_listing(filenames, filesystem_path) - r = self.directory_listing_template(path, files, dirs, breadcrumbs, breadcrumbs_leaf) + r = self.directory_listing_template( + path, files, dirs, breadcrumbs, breadcrumbs_leaf + ) return self.web.add_security_headers(r) def build_directory_listing(self, filenames, filesystem_path): @@ -114,16 +127,11 @@ class SendBaseModeWeb: is_dir = os.path.isdir(this_filesystem_path) if is_dir: - dirs.append({ - 'basename': filename - }) + dirs.append({"basename": filename}) else: size = os.path.getsize(this_filesystem_path) size_human = self.common.human_readable_filesize(size) - files.append({ - 'basename': filename, - 'size_human': size_human - }) + files.append({"basename": filename, "size_human": size_human}) return files, dirs def stream_individual_file(self, filesystem_path): @@ -136,7 +144,7 @@ class SendBaseModeWeb: # gzip compress the individual file, if it hasn't already been compressed if use_gzip: if filesystem_path not in self.gzip_individual_files: - gzip_filename = tempfile.mkstemp('wb+')[1] + gzip_filename = tempfile.mkstemp("wb+")[1] self._gzip_compress(filesystem_path, gzip_filename, 6, None) self.gzip_individual_files[filesystem_path] = gzip_filename @@ -154,10 +162,11 @@ class SendBaseModeWeb: # Tell GUI the individual file started history_id = self.cur_history_id self.cur_history_id += 1 - self.web.add_request(self.web.REQUEST_INDIVIDUAL_FILE_STARTED, path, { - 'id': history_id, - 'filesize': filesize - }) + self.web.add_request( + self.web.REQUEST_INDIVIDUAL_FILE_STARTED, + path, + {"id": history_id, "filesize": filesize}, + ) # Only GET requests are allowed, any other method should fail if request.method != "GET": @@ -166,11 +175,11 @@ class SendBaseModeWeb: def generate(): chunk_size = 102400 # 100kb - fp = open(file_to_download, 'rb') + fp = open(file_to_download, "rb") done = False while not done: chunk = fp.read(chunk_size) - if chunk == b'': + if chunk == b"": done = True else: try: @@ -179,59 +188,79 @@ class SendBaseModeWeb: # Tell GUI the progress downloaded_bytes = fp.tell() percent = (1.0 * downloaded_bytes / filesize) * 100 - if not self.web.is_gui or self.common.platform == 'Linux' or self.common.platform == 'BSD': + if ( + not self.web.is_gui + or self.common.platform == "Linux" + or self.common.platform == "BSD" + ): sys.stdout.write( - "\r{0:s}, {1:.2f}% ".format(self.common.human_readable_filesize(downloaded_bytes), percent)) + "\r{0:s}, {1:.2f}% ".format( + self.common.human_readable_filesize( + downloaded_bytes + ), + percent, + ) + ) sys.stdout.flush() - self.web.add_request(self.web.REQUEST_INDIVIDUAL_FILE_PROGRESS, path, { - 'id': history_id, - 'bytes': downloaded_bytes, - 'filesize': filesize - }) + self.web.add_request( + self.web.REQUEST_INDIVIDUAL_FILE_PROGRESS, + path, + { + "id": history_id, + "bytes": downloaded_bytes, + "filesize": filesize, + }, + ) done = False except: # Looks like the download was canceled done = True # Tell the GUI the individual file was canceled - self.web.add_request(self.web.REQUEST_INDIVIDUAL_FILE_CANCELED, path, { - 'id': history_id - }) + self.web.add_request( + self.web.REQUEST_INDIVIDUAL_FILE_CANCELED, + path, + {"id": history_id}, + ) fp.close() - if self.common.platform != 'Darwin': + if self.common.platform != "Darwin": sys.stdout.write("\n") basename = os.path.basename(filesystem_path) r = Response(generate()) if use_gzip: - r.headers.set('Content-Encoding', 'gzip') - r.headers.set('Content-Length', filesize) - r.headers.set('Content-Disposition', 'inline', filename=basename) + r.headers.set("Content-Encoding", "gzip") + r.headers.set("Content-Length", filesize) + r.headers.set("Content-Disposition", "inline", filename=basename) r = self.web.add_security_headers(r) (content_type, _) = mimetypes.guess_type(basename, strict=False) if content_type is not None: - r.headers.set('Content-Type', content_type) + r.headers.set("Content-Type", content_type) return r def should_use_gzip(self): """ Should we use gzip for this browser? """ - return (not self.is_zipped) and ('gzip' in request.headers.get('Accept-Encoding', '').lower()) + return (not self.is_zipped) and ( + "gzip" in request.headers.get("Accept-Encoding", "").lower() + ) - def _gzip_compress(self, input_filename, output_filename, level, processed_size_callback=None): + def _gzip_compress( + self, input_filename, output_filename, level, processed_size_callback=None + ): """ Compress a file with gzip, without loading the whole thing into memory Thanks: https://stackoverflow.com/questions/27035296/python-how-to-gzip-a-large-text-file-without-memoryerror """ bytes_processed = 0 - blocksize = 1 << 16 # 64kB - with open(input_filename, 'rb') as input_file: - output_file = gzip.open(output_filename, 'wb', level) + blocksize = 1 << 16 # 64kB + with open(input_filename, "rb") as input_file: + output_file = gzip.open(output_filename, "wb", level) while True: if processed_size_callback is not None: processed_size_callback(bytes_processed) @@ -269,7 +298,7 @@ class SendBaseModeWeb: """ pass - def render_logic(self, path=''): + def render_logic(self, path=""): """ Inherited class will implement this. """ |