diff options
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/address.c | 14 | ||||
-rw-r--r-- | src/common/compat.c | 32 | ||||
-rw-r--r-- | src/common/compat.h | 4 | ||||
-rw-r--r-- | src/common/compat_rust.c | 39 | ||||
-rw-r--r-- | src/common/compat_rust.h | 28 | ||||
-rw-r--r-- | src/common/compress.c | 58 | ||||
-rw-r--r-- | src/common/compress_none.c | 2 | ||||
-rw-r--r-- | src/common/compress_zstd.c | 49 | ||||
-rw-r--r-- | src/common/confline.c | 225 | ||||
-rw-r--r-- | src/common/confline.h | 4 | ||||
-rw-r--r-- | src/common/crypto.c | 6 | ||||
-rw-r--r-- | src/common/include.am | 6 | ||||
-rw-r--r-- | src/common/sandbox.c | 21 | ||||
-rw-r--r-- | src/common/storagedir.c | 61 | ||||
-rw-r--r-- | src/common/tortls.c | 39 | ||||
-rw-r--r-- | src/common/tortls.h | 1 | ||||
-rw-r--r-- | src/common/util.c | 66 | ||||
-rw-r--r-- | src/common/util.h | 1 | ||||
-rw-r--r-- | src/common/util_bug.h | 13 | ||||
-rw-r--r-- | src/common/util_format.h | 8 | ||||
-rw-r--r-- | src/common/workqueue.c | 145 | ||||
-rw-r--r-- | src/common/workqueue.h | 14 |
22 files changed, 717 insertions, 119 deletions
diff --git a/src/common/address.c b/src/common/address.c index c5d0e918ca..894e15114c 100644 --- a/src/common/address.c +++ b/src/common/address.c @@ -564,8 +564,8 @@ tor_addr_parse_PTR_name(tor_addr_t *result, const char *address, /** Convert <b>addr</b> to an in-addr.arpa name or a .ip6.arpa name, * and store the result in the <b>outlen</b>-byte buffer at - * <b>out</b>. Return the number of chars written to <b>out</b>, not - * including the trailing \0, on success. Returns -1 on failure. */ + * <b>out</b>. Returns a non-negative integer on success. + * Returns -1 on failure. */ int tor_addr_to_PTR_name(char *out, size_t outlen, const tor_addr_t *addr) @@ -1781,9 +1781,10 @@ free_interface_address6_list(smartlist_t *addrs) * Returns NULL on failure. * Use free_interface_address6_list to free the returned list. */ -MOCK_IMPL(smartlist_t *,get_interface_address6_list,(int severity, - sa_family_t family, - int include_internal)) +MOCK_IMPL(smartlist_t *, +get_interface_address6_list,(int severity, + sa_family_t family, + int include_internal)) { smartlist_t *addrs; tor_addr_t addr; @@ -2051,7 +2052,8 @@ parse_port_range(const char *port, uint16_t *port_min_out, /** Given an IPv4 in_addr struct *<b>in</b> (in network order, as usual), * write it as a string into the <b>buf_len</b>-byte buffer in - * <b>buf</b>. + * <b>buf</b>. Returns a non-negative integer on success. + * Returns -1 on failure. */ int tor_inet_ntoa(const struct in_addr *in, char *buf, size_t buf_len) diff --git a/src/common/compat.c b/src/common/compat.c index 390cff7dba..4d110aba35 100644 --- a/src/common/compat.c +++ b/src/common/compat.c @@ -1732,19 +1732,24 @@ set_max_file_descriptors(rlim_t limit, int *max_out) if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) { int bad = 1; #ifdef OPEN_MAX - if (errno == EINVAL && OPEN_MAX < rlim.rlim_cur) { + uint64_t try_limit = OPEN_MAX - ULIMIT_BUFFER; + if (errno == EINVAL && try_limit < (uint64_t) rlim.rlim_cur) { /* On some platforms, OPEN_MAX is the real limit, and getrlimit() is * full of nasty lies. I'm looking at you, OSX 10.5.... */ - rlim.rlim_cur = OPEN_MAX; + rlim.rlim_cur = try_limit; if (setrlimit(RLIMIT_NOFILE, &rlim) == 0) { if (rlim.rlim_cur < (rlim_t)limit) { log_warn(LD_CONFIG, "We are limited to %lu file descriptors by " - "OPEN_MAX, and ConnLimit is %lu. Changing ConnLimit; sorry.", - (unsigned long)OPEN_MAX, (unsigned long)limit); + "OPEN_MAX (%lu), and ConnLimit is %lu. Changing " + "ConnLimit; sorry.", + (unsigned long)try_limit, (unsigned long)OPEN_MAX, + (unsigned long)limit); } else { - log_info(LD_CONFIG, "Dropped connection limit to OPEN_MAX (%lu); " - "Apparently, %lu was too high and rlimit lied to us.", - (unsigned long)OPEN_MAX, (unsigned long)rlim.rlim_max); + log_info(LD_CONFIG, "Dropped connection limit to %lu based on " + "OPEN_MAX (%lu); Apparently, %lu was too high and rlimit " + "lied to us.", + (unsigned long)try_limit, (unsigned long)OPEN_MAX, + (unsigned long)rlim.rlim_max); } bad = 0; } @@ -2593,8 +2598,12 @@ tor_inet_pton(int af, const char *src, void *dst) char *next; ssize_t len; long r = strtol(src, &next, 16); - tor_assert(next != NULL); - tor_assert(next != src); + if (next == NULL || next == src) { + /* The 'next == src' error case can happen on versions of openbsd + * where treats "0xfoo" as an error, rather than as "0" followed by + * "xfoo". */ + return 0; + } len = *next == '\0' ? eow - src : next - src; if (len > 4) @@ -2678,7 +2687,8 @@ static int uname_result_is_set = 0; /** Return a pointer to a description of our platform. */ -MOCK_IMPL(const char *, get_uname, (void)) +MOCK_IMPL(const char *, +get_uname,(void)) { #ifdef HAVE_UNAME struct utsname u; @@ -3251,7 +3261,7 @@ format_win32_error(DWORD err) FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, err, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT), (LPVOID)&str, 0, NULL); diff --git a/src/common/compat.h b/src/common/compat.h index 68fad9b598..473ad2b957 100644 --- a/src/common/compat.h +++ b/src/common/compat.h @@ -414,10 +414,10 @@ struct tm *tor_gmtime_r(const time_t *timep, struct tm *result); #endif #ifndef timercmp -/** Replacement for timersub on platforms that do not have it: returns true +/** Replacement for timercmp on platforms that do not have it: returns true * iff the relational operator "op" makes the expression tv1 op tv2 true. * - * Note that while this definition should work for all boolean opeators, some + * Note that while this definition should work for all boolean operators, some * platforms' native timercmp definitions do not support >=, <=, or ==. So * don't use those. */ diff --git a/src/common/compat_rust.c b/src/common/compat_rust.c new file mode 100644 index 0000000000..366fd4037b --- /dev/null +++ b/src/common/compat_rust.c @@ -0,0 +1,39 @@ +/* Copyright (c) 2017, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file rust_compat.c + * \brief Rust FFI compatibility functions and helpers. This file is only built + * if Rust is not used. + **/ + +#include "compat_rust.h" +#include "util.h" + +/** + * Free storage pointed to by <b>str</b>, and itself. + */ +void +rust_str_free(rust_str_t str) +{ + char *s = (char *)str; + tor_free(s); +} + +/** + * Return zero-terminated contained string. + */ +const char * +rust_str_get(const rust_str_t str) +{ + return (const char *)str; +} + +/* If we were using Rust, we'd say so on startup. */ +rust_str_t +rust_welcome_string(void) +{ + char *s = tor_malloc_zero(1); + return (rust_str_t)s; +} + diff --git a/src/common/compat_rust.h b/src/common/compat_rust.h new file mode 100644 index 0000000000..752a29b56c --- /dev/null +++ b/src/common/compat_rust.h @@ -0,0 +1,28 @@ +/* Copyright (c) 2017, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file rust_compat.h + * \brief Headers for rust_compat.c + **/ + +#ifndef TOR_RUST_COMPAT_H +#define TOR_RUST_COMPAT_H + +#include "torint.h" + +/** + * Strings allocated in Rust must be freed from Rust code again. Let's make + * it less likely to accidentally mess up and call tor_free() on it, because + * currently it'll just work but might break at any time. + */ +typedef uintptr_t rust_str_t; + +void rust_str_free(rust_str_t); + +const char *rust_str_get(const rust_str_t); + +rust_str_t rust_welcome_string(void); + +#endif + diff --git a/src/common/compress.c b/src/common/compress.c index 6513029f9c..7926faaa60 100644 --- a/src/common/compress.c +++ b/src/common/compress.c @@ -102,8 +102,13 @@ tor_compress_impl(int compress, stream = tor_compress_new(compress, method, compression_level); - if (stream == NULL) + if (stream == NULL) { + log_warn(LD_GENERAL, "NULL stream while %scompressing", + compress?"":"de"); + log_debug(LD_GENERAL, "method: %d level: %d at len: %lu", + method, compression_level, (unsigned long)in_len); return -1; + } size_t in_len_orig = in_len; size_t out_remaining, out_alloc; @@ -128,13 +133,26 @@ tor_compress_impl(int compress, // inputs. tor_compress_free(stream); stream = tor_compress_new(compress, method, compression_level); + if (stream == NULL) { + log_warn(LD_GENERAL, "NULL stream while %scompressing", + compress?"":"de"); + goto err; + } } break; case TOR_COMPRESS_OK: if (compress || complete_only) { + log_fn(protocol_warn_level, LD_PROTOCOL, + "Unexpected %s while %scompressing", + complete_only?"end of input":"result", + compress?"":"de"); + log_debug(LD_GENERAL, "method: %d level: %d at len: %lu", + method, compression_level, (unsigned long)in_len); goto err; } else { - goto done; + if (in_len == 0) { + goto done; + } } break; case TOR_COMPRESS_BUFFER_FULL: { @@ -524,28 +542,42 @@ tor_compress_process(tor_compress_state_t *state, int finish) { tor_assert(state != NULL); + const size_t in_len_orig = *in_len; + const size_t out_len_orig = *out_len; + tor_compress_output_t rv; switch (state->method) { case GZIP_METHOD: case ZLIB_METHOD: - return tor_zlib_compress_process(state->u.zlib_state, - out, out_len, in, in_len, - finish); + rv = tor_zlib_compress_process(state->u.zlib_state, + out, out_len, in, in_len, + finish); + break; case LZMA_METHOD: - return tor_lzma_compress_process(state->u.lzma_state, - out, out_len, in, in_len, - finish); + rv =tor_lzma_compress_process(state->u.lzma_state, + out, out_len, in, in_len, + finish); + break; case ZSTD_METHOD: - return tor_zstd_compress_process(state->u.zstd_state, - out, out_len, in, in_len, - finish); + rv = tor_zstd_compress_process(state->u.zstd_state, + out, out_len, in, in_len, + finish); + break; case NO_METHOD: - return tor_cnone_compress_process(out, out_len, in, in_len, - finish); + rv = tor_cnone_compress_process(out, out_len, in, in_len, + finish); + break; + default: case UNKNOWN_METHOD: goto err; } + if (BUG((rv == TOR_COMPRESS_OK) && + *in_len == in_len_orig && + *out_len == out_len_orig)) { + return TOR_COMPRESS_ERROR; + } + return rv; err: return TOR_COMPRESS_ERROR; } diff --git a/src/common/compress_none.c b/src/common/compress_none.c index b76e6010ec..34314e4af7 100644 --- a/src/common/compress_none.c +++ b/src/common/compress_none.c @@ -4,7 +4,7 @@ /* See LICENSE for licensing information */ /** - * \file compress_lzma.c + * \file compress_none.c * \brief Compression backend for identity compression. * * We actually define this backend so that we can treat the identity transform diff --git a/src/common/compress_zstd.c b/src/common/compress_zstd.c index f54c4e1b31..5c5026c37d 100644 --- a/src/common/compress_zstd.c +++ b/src/common/compress_zstd.c @@ -98,6 +98,8 @@ struct tor_zstd_compress_state_t { #endif // HAVE_ZSTD. int compress; /**< True if we are compressing; false if we are inflating */ + int have_called_end; /**< True if we are compressing and we've called + * ZSTD_endStream */ /** Number of bytes read so far. Used to detect compression bombs. */ size_t input_so_far; @@ -280,9 +282,16 @@ tor_zstd_compress_process(tor_zstd_compress_state_t *state, ZSTD_inBuffer input = { *in, *in_len, 0 }; ZSTD_outBuffer output = { *out, *out_len, 0 }; + if (BUG(finish == 0 && state->have_called_end)) { + finish = 1; + } + if (state->compress) { - retval = ZSTD_compressStream(state->u.compress_stream, - &output, &input); + if (! state->have_called_end) + retval = ZSTD_compressStream(state->u.compress_stream, + &output, &input); + else + retval = 0; } else { retval = ZSTD_decompressStream(state->u.decompress_stream, &output, &input); @@ -312,7 +321,7 @@ tor_zstd_compress_process(tor_zstd_compress_state_t *state, // LCOV_EXCL_STOP } - if (state->compress && !finish) { + if (state->compress && !state->have_called_end) { retval = ZSTD_flushStream(state->u.compress_stream, &output); *out = (char *)output.dst + output.pos; @@ -326,16 +335,26 @@ tor_zstd_compress_process(tor_zstd_compress_state_t *state, // LCOV_EXCL_STOP } - if (retval > 0) + // ZSTD_flushStream returns 0 if the frame is done, or >0 if it + // is incomplete. + if (retval > 0) { return TOR_COMPRESS_BUFFER_FULL; + } } if (!finish) { - // We're not done with the input, so no need to flush. + // The caller says we're not done with the input, so no need to write an + // epilogue. return TOR_COMPRESS_OK; - } else if (state->compress && finish) { - retval = ZSTD_endStream(state->u.compress_stream, &output); + } else if (state->compress) { + if (*in_len) { + // We say that we're not done with the input, so we can't write an + // epilogue. + return TOR_COMPRESS_OK; + } + retval = ZSTD_endStream(state->u.compress_stream, &output); + state->have_called_end = 1; *out = (char *)output.dst + output.pos; *out_len = output.size - output.pos; @@ -354,10 +373,20 @@ tor_zstd_compress_process(tor_zstd_compress_state_t *state, return TOR_COMPRESS_BUFFER_FULL; return TOR_COMPRESS_DONE; - } else { - // ZSTD_flushStream returns 0 if the frame is done, or >0 if it + } else /* if (!state->compress) */ { + // ZSTD_decompressStream returns 0 if the frame is done, or >0 if it // is incomplete. - return (retval == 0) ? TOR_COMPRESS_DONE : TOR_COMPRESS_OK; + // We check this above. + tor_assert_nonfatal(!ZSTD_isError(retval)); + // Start a new frame if this frame is done + if (retval == 0) + return TOR_COMPRESS_DONE; + // Don't check out_len, it might have some space left if the next output + // chunk is larger than the remaining space + else if (*in_len > 0) + return TOR_COMPRESS_BUFFER_FULL; + else + return TOR_COMPRESS_OK; } #else // HAVE_ZSTD. diff --git a/src/common/confline.c b/src/common/confline.c index d4468f80ea..691cbf8c6f 100644 --- a/src/common/confline.c +++ b/src/common/confline.c @@ -8,6 +8,19 @@ #include "confline.h" #include "torlog.h" #include "util.h" +#include "container.h" + +static int config_get_lines_aux(const char *string, config_line_t **result, + int extended, int allow_include, + int *has_include, int recursion_level, + config_line_t **last); +static smartlist_t *config_get_file_list(const char *path); +static int config_get_included_list(const char *path, int recursion_level, + int extended, config_line_t **list, + config_line_t **list_last); +static int config_process_include(const char *path, int recursion_level, + int extended, config_line_t **list, + config_line_t **list_last); /** Helper: allocate a new configuration option mapping 'key' to 'val', * append it to *<b>lst</b>. */ @@ -65,19 +78,25 @@ config_line_find(const config_line_t *lines, return NULL; } -/** Helper: parse the config string and strdup into key/value - * strings. Set *result to the list, or NULL if parsing the string - * failed. Return 0 on success, -1 on failure. Warn and ignore any - * misformatted lines. - * - * If <b>extended</b> is set, then treat keys beginning with / and with + as - * indicating "clear" and "append" respectively. */ -int -config_get_lines(const char *string, config_line_t **result, int extended) +/** Auxiliary function that does all the work of config_get_lines. + * <b>recursion_level</b> is the count of how many nested %includes we have. + * Returns the a pointer to the last element of the <b>result</b> in + * <b>last</b>. */ +static int +config_get_lines_aux(const char *string, config_line_t **result, int extended, + int allow_include, int *has_include, int recursion_level, + config_line_t **last) { - config_line_t *list = NULL, **next; + config_line_t *list = NULL, **next, *list_last = NULL; char *k, *v; const char *parse_err; + int include_used = 0; + + if (recursion_level > MAX_INCLUDE_RECURSION_LEVEL) { + log_warn(LD_CONFIG, "Error while parsing configuration: more than %d " + "nested %%includes.", MAX_INCLUDE_RECURSION_LEVEL); + return -1; + } next = &list; do { @@ -108,25 +127,193 @@ config_get_lines(const char *string, config_line_t **result, int extended) command = CONFIG_LINE_CLEAR; } } - /* This list can get long, so we keep a pointer to the end of it - * rather than using config_line_append over and over and getting - * n^2 performance. */ - *next = tor_malloc_zero(sizeof(config_line_t)); - (*next)->key = k; - (*next)->value = v; - (*next)->next = NULL; - (*next)->command = command; - next = &((*next)->next); + + if (allow_include && !strcmp(k, "%include")) { + tor_free(k); + include_used = 1; + + config_line_t *include_list; + if (config_process_include(v, recursion_level, extended, &include_list, + &list_last) < 0) { + log_warn(LD_CONFIG, "Error reading included configuration " + "file or directory: \"%s\".", v); + config_free_lines(list); + tor_free(v); + return -1; + } + *next = include_list; + if (list_last) + next = &list_last->next; + tor_free(v); + } else { + /* This list can get long, so we keep a pointer to the end of it + * rather than using config_line_append over and over and getting + * n^2 performance. */ + *next = tor_malloc_zero(sizeof(**next)); + (*next)->key = k; + (*next)->value = v; + (*next)->next = NULL; + (*next)->command = command; + list_last = *next; + next = &((*next)->next); + } } else { tor_free(k); tor_free(v); } } while (*string); + if (last) { + *last = list_last; + } + if (has_include) { + *has_include = include_used; + } *result = list; return 0; } +/** Helper: parse the config string and strdup into key/value + * strings. Set *result to the list, or NULL if parsing the string + * failed. Set *has_include to 1 if <b>result</b> has values from + * %included files. Return 0 on success, -1 on failure. Warn and ignore any + * misformatted lines. + * + * If <b>extended</b> is set, then treat keys beginning with / and with + as + * indicating "clear" and "append" respectively. */ +int +config_get_lines_include(const char *string, config_line_t **result, + int extended, int *has_include) +{ + return config_get_lines_aux(string, result, extended, 1, has_include, 1, + NULL); +} + +/** Same as config_get_lines_include but does not allow %include */ +int +config_get_lines(const char *string, config_line_t **result, int extended) +{ + return config_get_lines_aux(string, result, extended, 0, NULL, 1, NULL); +} + +/** Adds a list of configuration files present on <b>path</b> to + * <b>file_list</b>. <b>path</b> can be a file or a directory. If it is a file, + * only that file will be added to <b>file_list</b>. If it is a directory, + * all paths for files on that directory root (no recursion) except for files + * whose name starts with a dot will be added to <b>file_list</b>. + * Return 0 on success, -1 on failure. Ignores empty files. + */ +static smartlist_t * +config_get_file_list(const char *path) +{ + smartlist_t *file_list = smartlist_new(); + file_status_t file_type = file_status(path); + if (file_type == FN_FILE) { + smartlist_add_strdup(file_list, path); + return file_list; + } else if (file_type == FN_DIR) { + smartlist_t *all_files = tor_listdir(path); + if (!all_files) { + smartlist_free(file_list); + return NULL; + } + smartlist_sort_strings(all_files); + SMARTLIST_FOREACH_BEGIN(all_files, char *, f) { + if (f[0] == '.') { + tor_free(f); + continue; + } + + char *fullname; + tor_asprintf(&fullname, "%s"PATH_SEPARATOR"%s", path, f); + tor_free(f); + + if (file_status(fullname) != FN_FILE) { + tor_free(fullname); + continue; + } + smartlist_add(file_list, fullname); + } SMARTLIST_FOREACH_END(f); + smartlist_free(all_files); + return file_list; + } else if (file_type == FN_EMPTY) { + return file_list; + } else { + smartlist_free(file_list); + return NULL; + } +} + +/** Creates a list of config lines present on included <b>path</b>. + * Set <b>list</b> to the list and <b>list_last</b> to the last element of + * <b>list</b>. Return 0 on success, -1 on failure. */ +static int +config_get_included_list(const char *path, int recursion_level, int extended, + config_line_t **list, config_line_t **list_last) +{ + char *included_conf = read_file_to_str(path, 0, NULL); + if (!included_conf) { + return -1; + } + + if (config_get_lines_aux(included_conf, list, extended, 1, NULL, + recursion_level+1, list_last) < 0) { + tor_free(included_conf); + return -1; + } + + tor_free(included_conf); + return 0; +} + +/** Process an %include <b>path</b> in a config file. Set <b>list</b> to the + * list of configuration settings obtained and <b>list_last</b> to the last + * element of the same list. Return 0 on success, -1 on failure. */ +static int +config_process_include(const char *path, int recursion_level, int extended, + config_line_t **list, config_line_t **list_last) +{ + config_line_t *ret_list = NULL; + config_line_t **next = &ret_list; +#if 0 + // Disabled -- we already unescape_string() on the result. */ + char *unquoted_path = get_unquoted_path(path); + if (!unquoted_path) { + return -1; + } + + smartlist_t *config_files = config_get_file_list(unquoted_path); + if (!config_files) { + tor_free(unquoted_path); + return -1; + } + tor_free(unquoted_path); +#endif + smartlist_t *config_files = config_get_file_list(path); + if (!config_files) { + return -1; + } + + SMARTLIST_FOREACH_BEGIN(config_files, char *, config_file) { + config_line_t *included_list = NULL; + if (config_get_included_list(config_file, recursion_level, extended, + &included_list, list_last) < 0) { + SMARTLIST_FOREACH(config_files, char *, f, tor_free(f)); + smartlist_free(config_files); + return -1; + } + tor_free(config_file); + + *next = included_list; + if (*list_last) + next = &(*list_last)->next; + + } SMARTLIST_FOREACH_END(config_file); + smartlist_free(config_files); + *list = ret_list; + return 0; +} + /** * Free all the configuration lines on the linked list <b>front</b>. */ diff --git a/src/common/confline.h b/src/common/confline.h index 477c6929a2..eb863e8fd8 100644 --- a/src/common/confline.h +++ b/src/common/confline.h @@ -15,6 +15,8 @@ /* Removes all previous configuration for an option. */ #define CONFIG_LINE_CLEAR 2 +#define MAX_INCLUDE_RECURSION_LEVEL 31 + /** A linked list of lines in a config file, or elsewhere */ typedef struct config_line_t { char *key; @@ -41,6 +43,8 @@ const config_line_t *config_line_find(const config_line_t *lines, int config_lines_eq(config_line_t *a, config_line_t *b); int config_count_key(const config_line_t *a, const char *key); int config_get_lines(const char *string, config_line_t **result, int extended); +int config_get_lines_include(const char *string, config_line_t **result, + int extended, int *has_include); void config_free_lines(config_line_t *front); const char *parse_config_line_from_str_verbose(const char *line, char **key_out, char **value_out, diff --git a/src/common/crypto.c b/src/common/crypto.c index a68510103e..0fc8474832 100644 --- a/src/common/crypto.c +++ b/src/common/crypto.c @@ -479,7 +479,7 @@ crypto_pk_get_rsa_(crypto_pk_t *env) * private is set, include the private-key portion of the key. Return a valid * pointer on success, and NULL on failure. */ MOCK_IMPL(EVP_PKEY *, - crypto_pk_get_evp_pkey_,(crypto_pk_t *env, int private)) +crypto_pk_get_evp_pkey_,(crypto_pk_t *env, int private)) { RSA *key = NULL; EVP_PKEY *pkey = NULL; @@ -516,7 +516,7 @@ crypto_dh_get_dh_(crypto_dh_t *dh) * be set. */ MOCK_IMPL(crypto_pk_t *, - crypto_pk_new,(void)) +crypto_pk_new,(void)) { RSA *rsa; @@ -606,7 +606,7 @@ crypto_cipher_free(crypto_cipher_t *env) * Return 0 on success, -1 on failure. */ MOCK_IMPL(int, - crypto_pk_generate_key_with_bits,(crypto_pk_t *env, int bits)) +crypto_pk_generate_key_with_bits,(crypto_pk_t *env, int bits)) { tor_assert(env); diff --git a/src/common/include.am b/src/common/include.am index 51b7da65f5..1253888815 100644 --- a/src/common/include.am +++ b/src/common/include.am @@ -100,6 +100,11 @@ LIBOR_A_SRC = \ $(threads_impl_source) \ $(readpassphrase_source) +if USE_RUST +else +LIBOR_A_SRC += src/common/compat_rust.c +endif + src/common/src_common_libor_testing_a-log.$(OBJEXT) \ src/common/log.$(OBJEXT): micro-revision.i @@ -147,6 +152,7 @@ COMMONHEADERS = \ src/common/compat.h \ src/common/compat_libevent.h \ src/common/compat_openssl.h \ + src/common/compat_rust.h \ src/common/compat_threads.h \ src/common/compat_time.h \ src/common/compress.h \ diff --git a/src/common/sandbox.c b/src/common/sandbox.c index 7826b2d40c..5063717355 100644 --- a/src/common/sandbox.c +++ b/src/common/sandbox.c @@ -19,8 +19,14 @@ #define _LARGEFILE64_SOURCE #endif -/** Malloc mprotect limit in bytes. */ -#define MALLOC_MP_LIM (16*1024*1024) +/** Malloc mprotect limit in bytes. + * + * 28/06/2017: This value was increased from 16 MB to 20 MB after we introduced + * LZMA support in Tor (0.3.1.1-alpha). We limit our LZMA coder to 16 MB, but + * liblzma have a small overhead that we need to compensate for to avoid being + * killed by the sandbox. + */ +#define MALLOC_MP_LIM (20*1024*1024) #include <stdio.h> #include <string.h> @@ -136,6 +142,9 @@ static int filter_nopar_gen[] = { #ifdef HAVE_PIPE SCMP_SYS(pipe), #endif +#ifdef __NR_fchmod + SCMP_SYS(fchmod), +#endif SCMP_SYS(fcntl), SCMP_SYS(fstat), #ifdef __NR_fstat64 @@ -725,6 +734,14 @@ sb_setsockopt(scmp_filter_ctx ctx, sandbox_cfg_t *filter) return rc; #endif +#ifdef IPV6_V6ONLY + rc = seccomp_rule_add_2(ctx, SCMP_ACT_ALLOW, SCMP_SYS(setsockopt), + SCMP_CMP(1, SCMP_CMP_EQ, IPPROTO_IPV6), + SCMP_CMP(2, SCMP_CMP_EQ, IPV6_V6ONLY)); + if (rc) + return rc; +#endif + return 0; } diff --git a/src/common/storagedir.c b/src/common/storagedir.c index 309d42db17..31933f64c2 100644 --- a/src/common/storagedir.c +++ b/src/common/storagedir.c @@ -76,8 +76,8 @@ storage_dir_free(storage_dir_t *d) * operations that <b>d</b> will need. * * The presence of this function is why we need an upper limit on the - * number of filers in a storage_dir_t: we need to approve file - * operaitons one by one. + * number of files in a storage_dir_t: we need to approve file operations + * one by one. */ int storage_dir_register_with_sandbox(storage_dir_t *d, sandbox_cfg_t **cfg) @@ -119,7 +119,8 @@ storage_dir_clean_tmpfiles(storage_dir_t *d) char *path = NULL; tor_asprintf(&path, "%s/%s", d->directory, fname); if (unlink(sandbox_intern_string(path))) { - log_warn(LD_FS, "Unable to unlink %s", escaped(path)); + log_warn(LD_FS, "Unable to unlink %s while cleaning " + "temporary files: %s", escaped(path), strerror(errno)); tor_free(path); continue; } @@ -210,7 +211,9 @@ storage_dir_read(storage_dir_t *d, const char *fname, int bin, size_t *sz_out) char *contents = read_file_to_str(path, flags, &st); if (contents && sz_out) { // it fits in RAM, so we know its size is less than SIZE_MAX +#if UINT64_MAX > SIZE_MAX tor_assert((uint64_t)st.st_size <= SIZE_MAX); +#endif *sz_out = (size_t) st.st_size; } @@ -309,9 +312,8 @@ storage_dir_save_string_to_file(storage_dir_t *d, /** * As storage_dir_save_bytes_to_file, but associates the data with the - * key-value pairs in <b>labels</b>. Files - * stored in this format can be recovered with storage_dir_map_labeled - * or storage_dir_read_labeled(). + * key-value pairs in <b>labels</b>. Files stored in this format can be + * recovered with storage_dir_map_labeled() or storage_dir_read_labeled(). */ int storage_dir_save_labeled_to_file(storage_dir_t *d, @@ -356,12 +358,12 @@ storage_dir_save_labeled_to_file(storage_dir_t *d, } /** - * Map a file that was created with storage_dir_save_labeled(). On failure, - * return NULL. On success, write a set of newly allocated labels into to - * *<b>labels_out</b>, a pointer to the into *<b>data_out</b>, and the data's - * into *<b>sz_out</b>. On success, also return a tor_mmap_t object whose - * contents should not be used -- it needs to be kept around, though, for as - * long as <b>data_out</b> is going to be valid. + * Map a file that was created with storage_dir_save_labeled_to_file(). On + * failure, return NULL. On success, write a set of newly allocated labels + * into *<b>labels_out</b>, a pointer to the data into *<b>data_out</b>, and + * the data's size into *<b>sz_out</b>. On success, also return a tor_mmap_t + * object whose contents should not be used -- it needs to be kept around, + * though, for as long as <b>data_out</b> is going to be valid. */ tor_mmap_t * storage_dir_map_labeled(storage_dir_t *dir, @@ -407,6 +409,32 @@ storage_dir_read_labeled(storage_dir_t *dir, return result; } +/* Reduce the cached usage amount in <b>d</b> by <b>removed_file_size</b>. + * This function is a no-op if <b>d->usage_known</b> is 0. */ +static void +storage_dir_reduce_usage(storage_dir_t *d, uint64_t removed_file_size) +{ + if (d->usage_known) { + if (! BUG(d->usage < removed_file_size)) { + /* This bug can also be triggered if an external process resized a file + * between the call to storage_dir_get_usage() that last checked + * actual usage (rather than relaying on cached usage), and the call to + * this function. */ + d->usage -= removed_file_size; + } else { + /* If we underflowed the cached directory size, re-check the sizes of all + * the files in the directory. This makes storage_dir_shrink() quadratic, + * but only if a process is continually changing file sizes in the + * storage directory (in which case, we have bigger issues). + * + * We can't just reset usage_known, because storage_dir_shrink() relies + * on knowing the usage. */ + storage_dir_rescan(d); + (void)storage_dir_get_usage(d); + } + } +} + /** * Remove the file called <b>fname</b> from <b>d</b>. */ @@ -426,9 +454,10 @@ storage_dir_remove_file(storage_dir_t *d, } } if (unlink(ipath) == 0) { - d->usage -= size; + storage_dir_reduce_usage(d, size); } else { - log_warn(LD_FS, "Unable to unlink %s", escaped(path)); + log_warn(LD_FS, "Unable to unlink %s while removing file: %s", + escaped(path), strerror(errno)); tor_free(path); return; } @@ -505,9 +534,7 @@ storage_dir_shrink(storage_dir_t *d, int idx = 0; while ((d->usage > target_size || min_to_remove > 0) && idx < n) { if (unlink(sandbox_intern_string(ents[idx].path)) == 0) { - if (! BUG(d->usage < ents[idx].size)) { - d->usage -= ents[idx].size; - } + storage_dir_reduce_usage(d, ents[idx].size); --min_to_remove; } ++idx; diff --git a/src/common/tortls.c b/src/common/tortls.c index fadf52fa0a..44db3aec58 100644 --- a/src/common/tortls.c +++ b/src/common/tortls.c @@ -460,11 +460,11 @@ tor_x509_name_new(const char *cname) * Return a certificate on success, NULL on failure. */ MOCK_IMPL(STATIC X509 *, - tor_tls_create_certificate,(crypto_pk_t *rsa, - crypto_pk_t *rsa_sign, - const char *cname, - const char *cname_sign, - unsigned int cert_lifetime)) +tor_tls_create_certificate,(crypto_pk_t *rsa, + crypto_pk_t *rsa_sign, + const char *cname, + const char *cname_sign, + unsigned int cert_lifetime)) { /* OpenSSL generates self-signed certificates with random 64-bit serial * numbers, so let's do that too. */ @@ -662,7 +662,7 @@ tor_x509_cert_free(tor_x509_cert_t *cert) * Steals a reference to x509_cert. */ MOCK_IMPL(STATIC tor_x509_cert_t *, - tor_x509_cert_new,(X509 *x509_cert)) +tor_x509_cert_new,(X509 *x509_cert)) { tor_x509_cert_t *cert; EVP_PKEY *pkey; @@ -705,11 +705,13 @@ MOCK_IMPL(STATIC tor_x509_cert_t *, return cert; } -/** Return a copy of <b>cert</b> */ +/** Return a new copy of <b>cert</b>. */ tor_x509_cert_t * tor_x509_cert_dup(const tor_x509_cert_t *cert) { - return tor_x509_cert_new(X509_dup(cert->cert)); + tor_assert(cert); + X509 *x509 = cert->cert; + return tor_x509_cert_new(X509_dup(x509)); } /** Read a DER-encoded X509 cert, of length exactly <b>certificate_len</b>, @@ -2047,7 +2049,8 @@ tor_tls_peer_has_cert(tor_tls_t *tls) return 1; } -/** Return the peer certificate, or NULL if there isn't one. */ +/** Return a newly allocated copy of the peer certificate, or NULL if there + * isn't one. */ MOCK_IMPL(tor_x509_cert_t *, tor_tls_get_peer_cert,(tor_tls_t *tls)) { @@ -2059,6 +2062,24 @@ tor_tls_get_peer_cert,(tor_tls_t *tls)) return tor_x509_cert_new(cert); } +/** Return a newly allocated copy of the cerficate we used on the connection, + * or NULL if somehow we didn't use one. */ +MOCK_IMPL(tor_x509_cert_t *, +tor_tls_get_own_cert,(tor_tls_t *tls)) +{ + X509 *cert = SSL_get_certificate(tls->ssl); + tls_log_errors(tls, LOG_WARN, LD_HANDSHAKE, + "getting own-connection certificate"); + if (!cert) + return NULL; + /* Fun inconsistency: SSL_get_peer_certificate increments the reference + * count, but SSL_get_certificate does not. */ + X509 *duplicate = X509_dup(cert); + if (BUG(duplicate == NULL)) + return NULL; + return tor_x509_cert_new(duplicate); +} + /** Warn that a certificate lifetime extends through a certain range. */ static void log_cert_lifetime(int severity, const X509 *cert, const char *problem, diff --git a/src/common/tortls.h b/src/common/tortls.h index fd0186cf90..f430aff70b 100644 --- a/src/common/tortls.h +++ b/src/common/tortls.h @@ -219,6 +219,7 @@ int tor_tls_is_server(tor_tls_t *tls); void tor_tls_free(tor_tls_t *tls); int tor_tls_peer_has_cert(tor_tls_t *tls); MOCK_DECL(tor_x509_cert_t *,tor_tls_get_peer_cert,(tor_tls_t *tls)); +MOCK_DECL(tor_x509_cert_t *,tor_tls_get_own_cert,(tor_tls_t *tls)); int tor_tls_verify(int severity, tor_tls_t *tls, crypto_pk_t **identity); int tor_tls_check_lifetime(int severity, tor_tls_t *tls, time_t now, diff --git a/src/common/util.c b/src/common/util.c index ca2a0c4a9c..5b47028097 100644 --- a/src/common/util.c +++ b/src/common/util.c @@ -1953,7 +1953,7 @@ parse_http_time(const char *date, struct tm *tm) /** Given an <b>interval</b> in seconds, try to write it to the * <b>out_len</b>-byte buffer in <b>out</b> in a human-readable form. - * Return 0 on success, -1 on failure. + * Returns a non-negative integer on success, -1 on failure. */ int format_time_interval(char *out, size_t out_len, long interval) @@ -3045,6 +3045,41 @@ unescape_string(const char *s, char **result, size_t *size_out) } } +/** Removes enclosing quotes from <b>path</b> and unescapes quotes between the + * enclosing quotes. Backslashes are not unescaped. Return the unquoted + * <b>path</b> on sucess or 0 if <b>path</b> is not quoted correctly. */ +char * +get_unquoted_path(const char *path) +{ + size_t len = strlen(path); + + if (len == 0) { + return tor_strdup(""); + } + + int has_start_quote = (path[0] == '\"'); + int has_end_quote = (len > 0 && path[len-1] == '\"'); + if (has_start_quote != has_end_quote || (len == 1 && has_start_quote)) { + return NULL; + } + + char *unquoted_path = tor_malloc(len - has_start_quote - has_end_quote + 1); + char *s = unquoted_path; + size_t i; + for (i = has_start_quote; i < len - has_end_quote; i++) { + if (path[i] == '\"' && (i > 0 && path[i-1] == '\\')) { + *(s-1) = path[i]; + } else if (path[i] != '\"') { + *s++ = path[i]; + } else { /* unescaped quote */ + tor_free(unquoted_path); + return NULL; + } + } + *s = '\0'; + return unquoted_path; +} + /** Expand any homedir prefix on <b>filename</b>; return a newly allocated * string. */ char * @@ -5563,6 +5598,28 @@ clamp_double_to_int64(double number) { int exponent; +#if (defined(__MINGW32__) || defined(__MINGW64__)) && GCC_VERSION >= 409 +/* + Mingw's math.h uses gcc's __builtin_choose_expr() facility to declare + isnan, isfinite, and signbit. But as implemented in at least some + versions of gcc, __builtin_choose_expr() can generate type warnings + even from branches that are not taken. So, suppress those warnings. +*/ +#define PROBLEMATIC_FLOAT_CONVERSION_WARNING +DISABLE_GCC_WARNING(float-conversion) +#endif + +/* + With clang 4.0 we apparently run into "double promotion" warnings here, + since clang thinks we're promoting a double to a long double. + */ +#if defined(__clang__) +#if __has_warning("-Wdouble-promotion") +#define PROBLEMATIC_DOUBLE_PROMOTION_WARNING +DISABLE_GCC_WARNING(double-promotion) +#endif +#endif + /* NaN is a special case that can't be used with the logic below. */ if (isnan(number)) { return 0; @@ -5588,6 +5645,13 @@ clamp_double_to_int64(double number) /* Handle infinities and finite numbers with magnitude >= 2^63. */ return signbit(number) ? INT64_MIN : INT64_MAX; + +#ifdef PROBLEMATIC_DOUBLE_PROMOTION_WARNING +ENABLE_GCC_WARNING(double-promotion) +#endif +#ifdef PROBLEMATIC_FLOAT_CONVERSION_WARNING +ENABLE_GCC_WARNING(float-conversion) +#endif } /** Return a uint64_t value from <b>a</b> in network byte order. */ diff --git a/src/common/util.h b/src/common/util.h index 18eb57f1bc..d56abcee2e 100644 --- a/src/common/util.h +++ b/src/common/util.h @@ -389,6 +389,7 @@ char *read_file_to_str_until_eof(int fd, size_t max_bytes_to_read, size_t *sz_out) ATTR_MALLOC; const char *unescape_string(const char *s, char **result, size_t *size_out); +char *get_unquoted_path(const char *path); char *expand_filename(const char *filename); MOCK_DECL(struct smartlist_t *, tor_listdir, (const char *dirname)); int path_is_relative(const char *filename); diff --git a/src/common/util_bug.h b/src/common/util_bug.h index 7879f880ec..ae7e7a37fd 100644 --- a/src/common/util_bug.h +++ b/src/common/util_bug.h @@ -58,6 +58,19 @@ * return -1; */ +#ifdef __COVERITY__ +#undef BUG +// Coverity defines this in global headers; let's override it. This is a +// magic coverity-only preprocessor thing. +#nodef BUG(x) ((x)?(__coverity_panic__(),1):0) +#endif + +#if defined(__COVERITY__) || defined(__clang_analyzer__) +// We're running with a static analysis tool: let's treat even nonfatal +// assertion failures as something that we need to avoid. +#define ALL_BUGS_ARE_FATAL +#endif + #ifdef ALL_BUGS_ARE_FATAL #define tor_assert_nonfatal_unreached() tor_assert(0) #define tor_assert_nonfatal(cond) tor_assert((cond)) diff --git a/src/common/util_format.h b/src/common/util_format.h index adf48c0077..4af8832bbe 100644 --- a/src/common/util_format.h +++ b/src/common/util_format.h @@ -23,11 +23,11 @@ #define BASE32_BUFSIZE(n) (BASE32_LEN(n) + 1) #define BASE16_BUFSIZE(n) (BASE16_LEN(n) + 1) -#define BASE64_NOPAD_LEN(n) (CEIL_DIV((n) * 4, 3) -#define BASE32_NOPAD_LEN(n) (CEIL_DIV((n) * 8, 5) +#define BASE64_NOPAD_LEN(n) (CEIL_DIV((n) * 4, 3)) +#define BASE32_NOPAD_LEN(n) (CEIL_DIV((n) * 8, 5)) -#define BASE64_NOPAD_BUFSIZE(n) (BASE64_NOPAD_LEN(n) + 1)) -#define BASE32_NOPAD_BUFSIZE(n) (BASE32_NOPAD_LEN(n) + 1)) +#define BASE64_NOPAD_BUFSIZE(n) (BASE64_NOPAD_LEN(n) + 1) +#define BASE32_NOPAD_BUFSIZE(n) (BASE32_NOPAD_LEN(n) + 1) /** @} */ #define BASE64_ENCODE_MULTILINE 1 diff --git a/src/common/workqueue.c b/src/common/workqueue.c index ec6e257b4c..42723224d3 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -25,11 +25,19 @@ #include "orconfig.h" #include "compat.h" #include "compat_threads.h" +#include "crypto.h" #include "util.h" #include "workqueue.h" #include "tor_queue.h" #include "torlog.h" +#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH +#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW +#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) + +TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s); +typedef struct work_tailq_t work_tailq_t; + struct threadpool_s { /** An array of pointers to workerthread_t: one for each running worker * thread. */ @@ -38,8 +46,12 @@ struct threadpool_s { /** Condition variable that we wait on when we have no work, and which * gets signaled when our queue becomes nonempty. */ tor_cond_t condition; - /** Queue of pending work that we have to do. */ - TOR_TAILQ_HEAD(, workqueue_entry_s) work; + /** Queues of pending work that we have to do. The queue with priority + * <b>p</b> is work[p]. */ + work_tailq_t work[WORKQUEUE_N_PRIORITIES]; + + /** Weak RNG, used to decide when to ignore priority. */ + tor_weak_rng_t weak_rng; /** The current 'update generation' of the threadpool. Any thread that is * at an earlier generation needs to run the update function. */ @@ -66,6 +78,11 @@ struct threadpool_s { void *new_thread_state_arg; }; +/** Used to put a workqueue_priority_t value into a bitfield. */ +#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) +/** Number of bits needed to hold all legal values of workqueue_priority_t */ +#define WORKQUEUE_PRIORITY_BITS 2 + struct workqueue_entry_s { /** The next workqueue_entry_t that's pending on the same thread or * reply queue. */ @@ -76,6 +93,8 @@ struct workqueue_entry_s { struct threadpool_s *on_pool; /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; + /** Priority of this entry. */ + workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS; /** Function to run in the worker thread. */ workqueue_reply_t (*fn)(void *state, void *arg); /** Function to run while processing the reply queue. */ @@ -94,9 +113,7 @@ struct replyqueue_s { alert_sockets_t alert; }; -/** A worker thread represents a single thread in a thread pool. To avoid - * contention, each gets its own queue. This breaks the guarantee that that - * queued work will get executed strictly in order. */ +/** A worker thread represents a single thread in a thread pool. */ typedef struct workerthread_s { /** Which thread it this? In range 0..in_pool->n_threads-1 */ int index; @@ -109,6 +126,8 @@ typedef struct workerthread_s { replyqueue_t *reply_queue; /** The current update generation of this thread */ unsigned generation; + /** One over the probability of taking work from a lower-priority queue. */ + int32_t lower_priority_chance; } workerthread_t; static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); @@ -125,6 +144,7 @@ workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), ent->fn = fn; ent->reply_fn = reply_fn; ent->arg = arg; + ent->priority = WQ_PRI_HIGH; return ent; } @@ -161,8 +181,9 @@ workqueue_entry_cancel(workqueue_entry_t *ent) int cancelled = 0; void *result = NULL; tor_mutex_acquire(&ent->on_pool->lock); + workqueue_priority_t prio = ent->priority; if (ent->pending) { - TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work); + TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work); cancelled = 1; result = ent->arg; } @@ -180,8 +201,46 @@ workqueue_entry_cancel(workqueue_entry_t *ent) static int worker_thread_has_work(workerthread_t *thread) { - return !TOR_TAILQ_EMPTY(&thread->in_pool->work) || - thread->generation != thread->in_pool->generation; + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i])) + return 1; + } + return thread->generation != thread->in_pool->generation; +} + +/** Extract the next workqueue_entry_t from the the thread's pool, removing + * it from the relevant queues and marking it as non-pending. + * + * The caller must hold the lock. */ +static workqueue_entry_t * +worker_thread_extract_next_work(workerthread_t *thread) +{ + threadpool_t *pool = thread->in_pool; + work_tailq_t *queue = NULL, *this_queue; + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + this_queue = &pool->work[i]; + if (!TOR_TAILQ_EMPTY(this_queue)) { + queue = this_queue; + if (! tor_weak_random_one_in_n(&pool->weak_rng, + thread->lower_priority_chance)) { + /* Usually we'll just break now, so that we can get out of the loop + * and use the queue where we found work. But with a small + * probability, we'll keep looking for lower priority work, so that + * we don't ignore our low-priority queues entirely. */ + break; + } + } + } + + if (queue == NULL) + return NULL; + + workqueue_entry_t *work = TOR_TAILQ_FIRST(queue); + TOR_TAILQ_REMOVE(queue, work, next_work); + work->pending = 0; + return work; } /** @@ -217,9 +276,9 @@ worker_thread_main(void *thread_) tor_mutex_acquire(&pool->lock); continue; } - work = TOR_TAILQ_FIRST(&pool->work); - TOR_TAILQ_REMOVE(&pool->work, work, next_work); - work->pending = 0; + work = worker_thread_extract_next_work(thread); + if (BUG(work == NULL)) + break; tor_mutex_release(&pool->lock); /* We run the work function without holding the thread lock. This @@ -268,12 +327,14 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) /** Allocate and start a new worker thread to use state object <b>state</b>, * and send responses to <b>replyqueue</b>. */ static workerthread_t * -workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) +workerthread_new(int32_t lower_priority_chance, + void *state, threadpool_t *pool, replyqueue_t *replyqueue) { workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); thr->state = state; thr->reply_queue = replyqueue; thr->in_pool = pool; + thr->lower_priority_chance = lower_priority_chance; if (spawn_func(worker_thread_main, thr) < 0) { //LCOV_EXCL_START @@ -299,24 +360,34 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) * function's responsibility to free the work object. * * On success, return a workqueue_entry_t object that can be passed to - * workqueue_entry_cancel(). On failure, return NULL. + * workqueue_entry_cancel(). On failure, return NULL. (Failure is not + * currently possible, but callers should check anyway.) + * + * Items are executed in a loose priority order -- each thread will usually + * take from the queued work with the highest prioirity, but will occasionally + * visit lower-priority queues to keep them from starving completely. * - * Note that because each thread has its own work queue, work items may not + * Note that because of priorities and thread behavior, work items may not * be executed strictly in order. */ workqueue_entry_t * -threadpool_queue_work(threadpool_t *pool, - workqueue_reply_t (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg) +threadpool_queue_work_priority(threadpool_t *pool, + workqueue_priority_t prio, + workqueue_reply_t (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) { + tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST && + ((int)prio) <= WORKQUEUE_PRIORITY_LAST); + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); ent->on_pool = pool; ent->pending = 1; + ent->priority = prio; tor_mutex_acquire(&pool->lock); - TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); + TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work); tor_cond_signal_one(&pool->condition); @@ -325,6 +396,16 @@ threadpool_queue_work(threadpool_t *pool, return ent; } +/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */ +workqueue_entry_t * +threadpool_queue_work(threadpool_t *pool, + workqueue_reply_t (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg); +} + /** * Queue a copy of a work item for every thread in a pool. This can be used, * for example, to tell the threads to update some parameter in their states. @@ -388,6 +469,14 @@ threadpool_queue_update(threadpool_t *pool, /** Don't have more than this many threads per pool. */ #define MAX_THREADS 1024 +/** For half of our threads, choose lower priority queues with probability + * 1/N for each of these values. Both are chosen somewhat arbitrarily. If + * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks + * stalling forever. If it's too high, we have a risk of low-priority tasks + * grabbing half of the threads. */ +#define CHANCE_PERMISSIVE 37 +#define CHANCE_STRICT INT32_MAX + /** Launch threads until we have <b>n</b>. */ static int threadpool_start_threads(threadpool_t *pool, int n) @@ -404,8 +493,14 @@ threadpool_start_threads(threadpool_t *pool, int n) sizeof(workerthread_t*), n); while (pool->n_threads < n) { + /* For half of our threads, we'll choose lower priorities permissively; + * for the other half, we'll stick more strictly to higher priorities. + * This keeps slow low-priority tasks from taking over completely. */ + int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE; + void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); - workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); + workerthread_t *thr = workerthread_new(chance, + state, pool, pool->reply_queue); if (!thr) { //LCOV_EXCL_START @@ -441,7 +536,15 @@ threadpool_new(int n_threads, pool = tor_malloc_zero(sizeof(threadpool_t)); tor_mutex_init_nonrecursive(&pool->lock); tor_cond_init(&pool->condition); - TOR_TAILQ_INIT(&pool->work); + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + TOR_TAILQ_INIT(&pool->work[i]); + } + { + unsigned seed; + crypto_rand((void*)&seed, sizeof(seed)); + tor_init_weak_random(&pool->weak_rng, seed); + } pool->new_thread_state_fn = new_thread_state_fn; pool->new_thread_state_arg = arg; diff --git a/src/common/workqueue.h b/src/common/workqueue.h index 7b483eb7ac..d2508f5329 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -22,6 +22,20 @@ typedef enum workqueue_reply_t { WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */ } workqueue_reply_t; +/** Possible priorities for work. Lower numeric values are more important. */ +typedef enum workqueue_priority_t { + WQ_PRI_HIGH = 0, + WQ_PRI_MED = 1, + WQ_PRI_LOW = 2, +} workqueue_priority_t; + +workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool, + workqueue_priority_t prio, + workqueue_reply_t (*fn)(void *, + void *), + void (*reply_fn)(void *), + void *arg); + workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, workqueue_reply_t (*fn)(void *, void *), |