diff options
-rw-r--r-- | src/common/compress.c | 26 | ||||
-rw-r--r-- | src/common/compress.h | 1 | ||||
-rw-r--r-- | src/or/consdiffmgr.c | 621 | ||||
-rw-r--r-- | src/or/consdiffmgr.h | 18 | ||||
-rw-r--r-- | src/or/directory.c | 381 | ||||
-rw-r--r-- | src/or/directory.h | 1 | ||||
-rw-r--r-- | src/or/dirserv.c | 12 | ||||
-rw-r--r-- | src/or/main.c | 1 | ||||
-rw-r--r-- | src/or/networkstatus.c | 18 | ||||
-rw-r--r-- | src/or/networkstatus.h | 4 | ||||
-rw-r--r-- | src/or/or.h | 4 | ||||
-rw-r--r-- | src/test/test_consdiffmgr.c | 6 | ||||
-rw-r--r-- | src/test/test_dir_handle_get.c | 71 |
13 files changed, 899 insertions, 265 deletions
diff --git a/src/common/compress.c b/src/common/compress.c index 6fe4569868..6513029f9c 100644 --- a/src/common/compress.c +++ b/src/common/compress.c @@ -343,6 +343,32 @@ compression_method_get_name(compress_method_t method) return NULL; } +/** Table of compression human readable method names. */ +static const struct { + compress_method_t method; + const char *name; +} compression_method_human_names[] = { + { NO_METHOD, "uncompressed" }, + { GZIP_METHOD, "gzipped" }, + { ZLIB_METHOD, "deflated" }, + { LZMA_METHOD, "LZMA compressed" }, + { ZSTD_METHOD, "Zstandard compressed" }, + { UNKNOWN_METHOD, "unknown encoding" }, +}; + +/** Return a human readable string representation of the compression method + * <b>method</b>, or NULL if the method isn't recognized. */ +const char * +compression_method_get_human_name(compress_method_t method) +{ + unsigned i; + for (i = 0; i < ARRAY_LENGTH(compression_method_human_names); ++i) { + if (method == compression_method_human_names[i].method) + return compression_method_human_names[i].name; + } + return NULL; +} + /** Return the compression method represented by the string <b>name</b>, or * UNKNOWN_METHOD if the string isn't recognized. */ compress_method_t diff --git a/src/common/compress.h b/src/common/compress.h index 5b47c5d458..7c0dc14061 100644 --- a/src/common/compress.h +++ b/src/common/compress.h @@ -50,6 +50,7 @@ int tor_compress_is_compression_bomb(size_t size_in, size_t size_out); int tor_compress_supports_method(compress_method_t method); unsigned tor_compress_get_supported_method_bitmask(void); const char * compression_method_get_name(compress_method_t method); +const char *compression_method_get_human_name(compress_method_t method); compress_method_t compression_method_get_by_name(const char *name); const char *tor_compress_version_str(compress_method_t method); diff --git a/src/or/consdiffmgr.c b/src/or/consdiffmgr.c index d478101c6b..910f842070 100644 --- a/src/or/consdiffmgr.c +++ b/src/or/consdiffmgr.c @@ -32,6 +32,15 @@ /* The valid-after time for a consensus (or for the target consensus of a * diff), encoded as ISO UTC. */ #define LABEL_VALID_AFTER "consensus-valid-after" +/* The fresh-until time for a consensus (or for the target consensus of a + * diff), encoded as ISO UTC. */ +#define LABEL_FRESH_UNTIL "consensus-fresh-until" +/* The valid-until time for a consensus (or for the target consensus of a + * diff), encoded as ISO UTC. */ +#define LABEL_VALID_UNTIL "consensus-valid-until" +/* Comma-separated list of hex-encoded identity digests for the voting + * authorities. */ +#define LABEL_SIGNATORIES "consensus-signatories" /* A hex encoded SHA3 digest of the object, as compressed (if any) */ #define LABEL_SHA3_DIGEST "sha3-digest" /* A hex encoded SHA3 digest of the object before compression. */ @@ -96,6 +105,36 @@ n_diff_compression_methods(void) return ARRAY_LENGTH(compress_diffs_with); } +/** Which methods do we use for precompressing consensuses? */ +static const compress_method_t compress_consensus_with[] = { + ZLIB_METHOD, +#ifdef HAVE_LZMA + LZMA_METHOD, +#endif +#ifdef HAVE_ZSTD + ZSTD_METHOD, +#endif +}; + +/** How many different methods will we try to use for diff compression? */ +STATIC unsigned +n_consensus_compression_methods(void) +{ + return ARRAY_LENGTH(compress_consensus_with); +} + +/** For which compression method do we retain old consensuses? There's no + * need to keep all of them, since we won't be serving them. We'll + * go with ZLIB_METHOD because it's pretty fast and everyone has it. + */ +#define RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD ZLIB_METHOD + +/** Handles pointing to the latest consensus entries as compressed and + * stored. */ +static consensus_cache_entry_handle_t * + latest_consensus[N_CONSENSUS_FLAVORS] + [ARRAY_LENGTH(compress_consensus_with)]; + /** Hashtable node used to remember the current status of the diff * from a given sha3 digest to the current consensus. */ typedef struct cdm_diff_t { @@ -135,13 +174,12 @@ static consdiff_cfg_t consdiff_cfg = { }; static int consdiffmgr_ensure_space_for_files(int n); +static int consensus_queue_compression_work(const char *consensus, + const networkstatus_t *as_parsed); static int consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from, consensus_cache_entry_t *diff_to); static void consdiffmgr_set_cache_flags(void); -/* Just gzip consensuses for now. */ -#define COMPRESS_CONSENSUS_WITH GZIP_METHOD - /* ===== * Hashtable setup * ===== */ @@ -410,11 +448,6 @@ cdm_cache_lookup_consensus(consensus_flavor_t flavor, time_t valid_after) consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); consensus_cache_entry_t *result = NULL; - if (smartlist_len(matches) > 1) { - log_warn(LD_BUG, "How odd; there appear to be two matching consensuses " - "with flavor %s published at %s.", - flavname, formatted_time); - } if (smartlist_len(matches)) { result = smartlist_get(matches, 0); } @@ -458,59 +491,7 @@ consdiffmgr_add_consensus(const char *consensus, } /* We don't have it. Add it to the cache. */ - consdiffmgr_ensure_space_for_files(1); - - { - size_t bodylen = strlen(consensus); - config_line_t *labels = NULL; - char formatted_time[ISO_TIME_LEN+1]; - format_iso_time_nospace(formatted_time, valid_after); - const char *flavname = networkstatus_get_flavor_name(flavor); - - cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED, - (const uint8_t *)consensus, bodylen); - { - const char *start, *end; - if (router_get_networkstatus_v3_signed_boundaries(consensus, - &start, &end) < 0) { - start = consensus; - end = consensus+bodylen; - } - cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED, - (const uint8_t *)start, - end - start); - } - - char *body_compressed = NULL; - size_t size_compressed = 0; - if (tor_compress(&body_compressed, &size_compressed, - consensus, bodylen, COMPRESS_CONSENSUS_WITH) < 0) { - config_free_lines(labels); - return -1; - } - cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST, - (const uint8_t *)body_compressed, size_compressed); - config_line_prepend(&labels, LABEL_COMPRESSION_TYPE, - compression_method_get_name(COMPRESS_CONSENSUS_WITH)); - config_line_prepend(&labels, LABEL_FLAVOR, flavname); - config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time); - config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); - - entry = consensus_cache_add(cdm_cache_get(), - labels, - (const uint8_t *)body_compressed, - size_compressed); - tor_free(body_compressed); - config_free_lines(labels); - } - - if (entry) { - consensus_cache_entry_mark_for_aggressive_release(entry); - consensus_cache_entry_decref(entry); - } - - cdm_cache_dirty = 1; - return entry ? 0 : -1; + return consensus_queue_compression_work(consensus, as_parsed); } /** @@ -543,6 +524,47 @@ sort_and_find_most_recent(smartlist_t *lst) } } +/** Return i such that compress_consensus_with[i] == method. Return + * -1 if no such i exists. */ +static int +consensus_compression_method_pos(compress_method_t method) +{ + unsigned i; + for (i = 0; i < n_consensus_compression_methods(); ++i) { + if (compress_consensus_with[i] == method) { + return i; + } + } + return -1; +} + +/** + * If we know a consensus with the flavor <b>flavor</b> compressed with + * <b>method</b>, set *<b>entry_out</b> to that value. Return values are as + * for consdiffmgr_find_diff_from(). + */ +consdiff_status_t +consdiffmgr_find_consensus(struct consensus_cache_entry_t **entry_out, + consensus_flavor_t flavor, + compress_method_t method) +{ + tor_assert((int)flavor < N_CONSENSUS_FLAVORS); + + int pos = consensus_compression_method_pos(method); + if (pos < 0) { + // We don't compress consensuses with this method. + return CONSDIFF_NOT_FOUND; + } + consensus_cache_entry_handle_t *handle = latest_consensus[flavor][pos]; + if (!handle) + return CONSDIFF_NOT_FOUND; + *entry_out = consensus_cache_entry_handle_get(handle); + if (entry_out) + return CONSDIFF_AVAILABLE; + else + return CONSDIFF_NOT_FOUND; +} + /** * Look up consensus_cache_entry_t for the consensus of type <b>flavor</b>, * from the source consensus with the specified digest (which must be SHA3). @@ -684,6 +706,42 @@ consdiffmgr_cleanup(void) smartlist_clear(diffs); } + // 3. Delete all consensuses except the most recent that are compressed with + // an un-preferred method. + for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) { + const char *flavname = networkstatus_get_flavor_name(flav); + /* Determine the most recent consensus of this flavor */ + consensus_cache_find_all(consensuses, cdm_cache_get(), + LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + consensus_cache_filter_list(consensuses, LABEL_FLAVOR, flavname); + consensus_cache_entry_t *most_recent = + sort_and_find_most_recent(consensuses); + if (most_recent == NULL) + continue; + const char *most_recent_sha3_uncompressed = + consensus_cache_entry_get_value(most_recent, + LABEL_SHA3_DIGEST_UNCOMPRESSED); + const char *retain_methodname = compression_method_get_name( + RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD); + + if (BUG(most_recent_sha3_uncompressed == NULL)) + continue; + SMARTLIST_FOREACH_BEGIN(consensuses, consensus_cache_entry_t *, ent) { + const char *lv_sha3_uncompressed = + consensus_cache_entry_get_value(ent, LABEL_SHA3_DIGEST_UNCOMPRESSED); + if (BUG(! lv_sha3_uncompressed)) + continue; + if (!strcmp(lv_sha3_uncompressed, most_recent_sha3_uncompressed)) + continue; // This _is_ the most recent. + const char *lv_methodname = + consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE); + if (! lv_methodname || strcmp(lv_methodname, retain_methodname)) { + consensus_cache_entry_mark_for_removal(ent); + ++n_to_delete; + } + } SMARTLIST_FOREACH_END(ent); + } + smartlist_free(objects); smartlist_free(consensuses); smartlist_free(diffs); @@ -787,10 +845,14 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor) // 1. find the most recent consensus, and the ones that we might want // to diff to it. + const char *methodname = compression_method_get_name( + RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD); + matches = smartlist_new(); consensus_cache_find_all(matches, cdm_cache_get(), LABEL_FLAVOR, flavname); consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + consensus_cache_filter_list(matches, LABEL_COMPRESSION_TYPE, methodname); consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches); if (!most_recent) { log_info(LD_DIRSERV, "No 'most recent' %s consensus found; " @@ -835,6 +897,10 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor) if (strmap_get(have_diff_from, va) != NULL) continue; /* we already have this one. */ smartlist_add(compute_diffs_from, ent); + /* Since we are not going to serve this as the most recent consensus + * any more, we should stop keeping it mmap'd when it's not in use. + */ + consensus_cache_entry_mark_for_aggressive_release(ent); } SMARTLIST_FOREACH_END(ent); log_info(LD_DIRSERV, @@ -880,6 +946,48 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor) } /** + * Scan the cache for the latest consensuses and add their handles to + * latest_consensus + */ +static void +consdiffmgr_consensus_load(void) +{ + smartlist_t *matches = smartlist_new(); + for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) { + const char *flavname = networkstatus_get_flavor_name(flav); + smartlist_clear(matches); + consensus_cache_find_all(matches, cdm_cache_get(), + LABEL_FLAVOR, flavname); + consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches); + if (! most_recent) + continue; // no consensuses. + const char *most_recent_sha3 = + consensus_cache_entry_get_value(most_recent, + LABEL_SHA3_DIGEST_UNCOMPRESSED); + if (BUG(most_recent_sha3 == NULL)) + continue; // LCOV_EXCL_LINE + consensus_cache_filter_list(matches, LABEL_SHA3_DIGEST_UNCOMPRESSED, + most_recent_sha3); + + // Everything that remains matches the most recent consensus of this + // flavor. + SMARTLIST_FOREACH_BEGIN(matches, consensus_cache_entry_t *, ent) { + const char *lv_compression = + consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE); + compress_method_t method = + compression_method_get_by_name(lv_compression); + int pos = consensus_compression_method_pos(method); + if (pos < 0) + continue; + consensus_cache_entry_handle_free(latest_consensus[flav][pos]); + latest_consensus[flav][pos] = consensus_cache_entry_handle_new(ent); + } SMARTLIST_FOREACH_END(ent); + } + smartlist_free(matches); +} + +/** * Scan the cache for diffs, and add them to the hashtable. */ static void @@ -936,6 +1044,7 @@ consdiffmgr_rescan(void) if (cdm_cache_loaded == 0) { consdiffmgr_diffs_load(); + consdiffmgr_consensus_load(); cdm_cache_loaded = 1; } @@ -1051,6 +1160,14 @@ consdiffmgr_free_all(void) next = HT_NEXT_RMV(cdm_diff_ht, &cdm_diff_ht, diff); cdm_diff_free(this); } + int i; + unsigned j; + for (i = 0; i < N_CONSENSUS_FLAVORS; ++i) { + for (j = 0; j < n_consensus_compression_methods(); ++j) { + consensus_cache_entry_handle_free(latest_consensus[i][j]); + } + } + memset(latest_consensus, 0, sizeof(latest_consensus)); consensus_cache_free(cons_diff_cache); cons_diff_cache = NULL; } @@ -1072,6 +1189,93 @@ typedef struct compressed_result_t { } compressed_result_t; /** + * Compress the bytestring <b>input</b> of length <b>len</b> using the + * <n>n_methods</b> compression methods listed in the array <b>methods</b>. + * + * For each successful compression, set the fields in the <b>results_out</b> + * array in the position corresponding to the compression method. Use + * <b>labels_in</b> as a basis for the labels of the result. + * + * Return 0 if all compression succeeded; -1 if any failed. + */ +static int +compress_multiple(compressed_result_t *results_out, int n_methods, + const compress_method_t *methods, + const uint8_t *input, size_t len, + const config_line_t *labels_in) +{ + int rv = 0; + int i; + for (i = 0; i < n_methods; ++i) { + compress_method_t method = methods[i]; + const char *methodname = compression_method_get_name(method); + char *result; + size_t sz; + if (0 == tor_compress(&result, &sz, (const char*)input, len, method)) { + results_out[i].body = (uint8_t*)result; + results_out[i].bodylen = sz; + results_out[i].labels = config_lines_dup(labels_in); + cdm_labels_prepend_sha3(&results_out[i].labels, LABEL_SHA3_DIGEST, + results_out[i].body, + results_out[i].bodylen); + config_line_prepend(&results_out[i].labels, + LABEL_COMPRESSION_TYPE, + methodname); + } else { + rv = -1; + } + } + return rv; +} + +/** + * Given an array of <b>n</b> compressed_result_t in <b>results</b>, + * as produced by compress_multiple, store them all into the + * consdiffmgr, and store handles to them in the <b>handles_out</b> + * array. + * + * Return CDM_DIFF_PRESENT if any was stored, and CDM_DIFF_ERROR if none + * was stored. + */ +static cdm_diff_status_t +store_multiple(consensus_cache_entry_handle_t **handles_out, + int n, + const compress_method_t *methods, + const compressed_result_t *results, + const char *description) +{ + cdm_diff_status_t status = CDM_DIFF_ERROR; + consdiffmgr_ensure_space_for_files(n); + + int i; + for (i = 0; i < n; ++i) { + compress_method_t method = methods[i]; + uint8_t *body_out = results[i].body; + size_t bodylen_out = results[i].bodylen; + config_line_t *labels = results[i].labels; + const char *methodname = compression_method_get_name(method); + if (body_out && bodylen_out && labels) { + /* Success! Store the results */ + log_info(LD_DIRSERV, "Adding %s, compressed with %s", + description, methodname); + + consensus_cache_entry_t *ent = + consensus_cache_add(cdm_cache_get(), + labels, + body_out, + bodylen_out); + if (BUG(ent == NULL)) + continue; + + status = CDM_DIFF_PRESENT; + handles_out[i] = consensus_cache_entry_handle_new(ent); + consensus_cache_entry_decref(ent); + } + } + return status; +} + +/** * An object passed to a worker thread that will try to produce a consensus * diff. */ @@ -1144,6 +1348,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_) const char *lv_to_valid_after = consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_AFTER); + const char *lv_to_fresh_until = + consensus_cache_entry_get_value(job->diff_to, LABEL_FRESH_UNTIL); + const char *lv_to_valid_until = + consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_UNTIL); + const char *lv_to_signatories = + consensus_cache_entry_get_value(job->diff_to, LABEL_SIGNATORIES); const char *lv_from_valid_after = consensus_cache_entry_get_value(job->diff_from, LABEL_VALID_AFTER); const char *lv_from_digest = @@ -1213,6 +1423,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_) job->out[0].bodylen = difflen; config_line_t *common_labels = NULL; + if (lv_to_valid_until) + config_line_prepend(&common_labels, LABEL_VALID_UNTIL, lv_to_valid_until); + if (lv_to_fresh_until) + config_line_prepend(&common_labels, LABEL_FRESH_UNTIL, lv_to_fresh_until); + if (lv_to_signatories) + config_line_prepend(&common_labels, LABEL_SIGNATORIES, lv_to_signatories); cdm_labels_prepend_sha3(&common_labels, LABEL_SHA3_DIGEST_UNCOMPRESSED, job->out[0].body, @@ -1235,24 +1451,10 @@ consensus_diff_worker_threadfn(void *state_, void *work_) job->out[0].body, job->out[0].bodylen); - unsigned u; - for (u = 1; u < n_diff_compression_methods(); ++u) { - compress_method_t method = compress_diffs_with[u]; - const char *methodname = compression_method_get_name(method); - char *result; - size_t sz; - if (0 == tor_compress(&result, &sz, consensus_diff, difflen, method)) { - job->out[u].body = (uint8_t*)result; - job->out[u].bodylen = sz; - job->out[u].labels = config_lines_dup(common_labels); - cdm_labels_prepend_sha3(&job->out[u].labels, LABEL_SHA3_DIGEST, - job->out[u].body, - job->out[u].bodylen); - config_line_prepend(&job->out[u].labels, - LABEL_COMPRESSION_TYPE, - methodname); - } - } + compress_multiple(job->out+1, + n_diff_compression_methods()-1, + compress_diffs_with+1, + (const uint8_t*)consensus_diff, difflen, common_labels); config_free_lines(common_labels); return WQ_RPL_REPLY; @@ -1318,36 +1520,20 @@ consensus_diff_worker_replyfn(void *work_) cache = 0; } - int status = CDM_DIFF_ERROR; consensus_cache_entry_handle_t *handles[ARRAY_LENGTH(compress_diffs_with)]; memset(handles, 0, sizeof(handles)); - consdiffmgr_ensure_space_for_files(n_diff_compression_methods()); - - unsigned u; - for (u = 0; u < n_diff_compression_methods(); ++u) { - compress_method_t method = compress_diffs_with[u]; - uint8_t *body_out = job->out[u].body; - size_t bodylen_out = job->out[u].bodylen; - config_line_t *labels = job->out[u].labels; - const char *methodname = compression_method_get_name(method); - if (body_out && bodylen_out && labels) { - /* Success! Store the results */ - log_info(LD_DIRSERV, "Adding consensus diff from %s to %s, " - "compressed with %s", - lv_from_digest, lv_to_digest, methodname); + char description[128]; + tor_snprintf(description, sizeof(description), + "consensus diff from %s to %s", + lv_from_digest, lv_to_digest); - consensus_cache_entry_t *ent = - consensus_cache_add(cdm_cache_get(), - labels, - body_out, - bodylen_out); + int status = store_multiple(handles, + n_diff_compression_methods(), + compress_diffs_with, + job->out, + description); - status = CDM_DIFF_PRESENT; - handles[u] = consensus_cache_entry_handle_new(ent); - consensus_cache_entry_decref(ent); - } - } if (status != CDM_DIFF_PRESENT) { /* Failure! Nothing to do but complain */ log_warn(LD_DIRSERV, @@ -1357,6 +1543,7 @@ consensus_diff_worker_replyfn(void *work_) status = CDM_DIFF_ERROR; } + unsigned u; for (u = 0; u < ARRAY_LENGTH(handles); ++u) { compress_method_t method = compress_diffs_with[u]; if (cache) { @@ -1408,3 +1595,229 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from, return -1; } +/** + * Holds requests and replies for consensus_compress_workers. + */ +typedef struct consensus_compress_worker_job_t { + char *consensus; + size_t consensus_len; + consensus_flavor_t flavor; + config_line_t *labels_in; + compressed_result_t out[ARRAY_LENGTH(compress_consensus_with)]; +} consensus_compress_worker_job_t; + +/** + * Free all resources held in <b>job</b> + */ +static void +consensus_compress_worker_job_free(consensus_compress_worker_job_t *job) +{ + if (!job) + return; + tor_free(job->consensus); + config_free_lines(job->labels_in); + unsigned u; + for (u = 0; u < n_consensus_compression_methods(); ++u) { + config_free_lines(job->out[u].labels); + tor_free(job->out[u].body); + } + tor_free(job); +} +/** + * Worker function. This function runs inside a worker thread and receives + * a consensus_compress_worker_job_t as its input. + */ +static workqueue_reply_t +consensus_compress_worker_threadfn(void *state_, void *work_) +{ + (void)state_; + consensus_compress_worker_job_t *job = work_; + consensus_flavor_t flavor = job->flavor; + const char *consensus = job->consensus; + size_t bodylen = job->consensus_len; + + config_line_t *labels = config_lines_dup(job->labels_in); + const char *flavname = networkstatus_get_flavor_name(flavor); + + cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED, + (const uint8_t *)consensus, bodylen); + { + const char *start, *end; + if (router_get_networkstatus_v3_signed_boundaries(consensus, + &start, &end) < 0) { + start = consensus; + end = consensus+bodylen; + } + cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED, + (const uint8_t *)start, + end - start); + } + config_line_prepend(&labels, LABEL_FLAVOR, flavname); + config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + + compress_multiple(job->out, + n_consensus_compression_methods(), + compress_consensus_with, + (const uint8_t*)consensus, bodylen, labels); + config_free_lines(labels); + return WQ_RPL_REPLY; +} + +/** + * Worker function: This function runs in the main thread, and receives + * a consensus_diff_compress_job_t that the worker thread has already + * processed. + */ +static void +consensus_compress_worker_replyfn(void *work_) +{ + consensus_compress_worker_job_t *job = work_; + + consensus_cache_entry_handle_t *handles[ + ARRAY_LENGTH(compress_consensus_with)]; + memset(handles, 0, sizeof(handles)); + + store_multiple(handles, + n_consensus_compression_methods(), + compress_consensus_with, + job->out, + "consensus"); + cdm_cache_dirty = 1; + + unsigned u; + consensus_flavor_t f = job->flavor; + tor_assert((int)f < N_CONSENSUS_FLAVORS); + for (u = 0; u < ARRAY_LENGTH(handles); ++u) { + if (handles[u] == NULL) + continue; + consensus_cache_entry_handle_free(latest_consensus[f][u]); + latest_consensus[f][u] = handles[u]; + } + + consensus_compress_worker_job_free(job); +} + +/** + * If true, we compress in worker threads. + */ +static int background_compression = 0; + +/** + * Queue a job to compress <b>consensus</b> and store its compressed + * text in the cache. + */ +static int +consensus_queue_compression_work(const char *consensus, + const networkstatus_t *as_parsed) +{ + tor_assert(consensus); + tor_assert(as_parsed); + + consensus_compress_worker_job_t *job = tor_malloc_zero(sizeof(*job)); + job->consensus = tor_strdup(consensus); + job->consensus_len = strlen(consensus); + job->flavor = as_parsed->flavor; + + char va_str[ISO_TIME_LEN+1]; + char vu_str[ISO_TIME_LEN+1]; + char fu_str[ISO_TIME_LEN+1]; + format_iso_time_nospace(va_str, as_parsed->valid_after); + format_iso_time_nospace(fu_str, as_parsed->fresh_until); + format_iso_time_nospace(vu_str, as_parsed->valid_until); + config_line_append(&job->labels_in, LABEL_VALID_AFTER, va_str); + config_line_append(&job->labels_in, LABEL_FRESH_UNTIL, fu_str); + config_line_append(&job->labels_in, LABEL_VALID_UNTIL, vu_str); + if (as_parsed->voters) { + smartlist_t *hexvoters = smartlist_new(); + SMARTLIST_FOREACH_BEGIN(as_parsed->voters, + networkstatus_voter_info_t *, vi) { + if (smartlist_len(vi->sigs) == 0) + continue; // didn't sign. + char d[HEX_DIGEST_LEN+1]; + base16_encode(d, sizeof(d), vi->identity_digest, DIGEST_LEN); + smartlist_add_strdup(hexvoters, d); + } SMARTLIST_FOREACH_END(vi); + char *signers = smartlist_join_strings(hexvoters, ",", 0, NULL); + config_line_prepend(&job->labels_in, LABEL_SIGNATORIES, signers); + tor_free(signers); + SMARTLIST_FOREACH(hexvoters, char *, cp, tor_free(cp)); + } + + if (background_compression) { + workqueue_entry_t *work; + work = cpuworker_queue_work(consensus_compress_worker_threadfn, + consensus_compress_worker_replyfn, + job); + if (!work) { + consensus_compress_worker_job_free(job); + return -1; + } + + return 0; + } else { + consensus_compress_worker_threadfn(NULL, job); + consensus_compress_worker_replyfn(job); + return 0; + } +} + +/** + * Tell the consdiffmgr backend to compress consensuses in worker threads. + */ +void +consdiffmgr_enable_background_compression(void) +{ + // This isn't the default behavior because it would break unit tests. + background_compression = 1; +} + +/** Read the set of voters from the cached object <b>ent</b> into + * <b>out</b>, as a list of hex-encoded digests. Return 0 on success, + * -1 if no signatories were recorded. */ +int +consensus_cache_entry_get_voter_id_digests(const consensus_cache_entry_t *ent, + smartlist_t *out) +{ + tor_assert(ent); + tor_assert(out); + const char *s; + s = consensus_cache_entry_get_value(ent, LABEL_SIGNATORIES); + if (s == NULL) + return -1; + smartlist_split_string(out, s, ",", SPLIT_SKIP_SPACE|SPLIT_STRIP_SPACE, 0); + return 0; +} + +/** Read the fresh-until time of cached object <b>ent</b> into *<b>out</b> + * and return 0, or return -1 if no such time was recorded. */ +int +consensus_cache_entry_get_fresh_until(const consensus_cache_entry_t *ent, + time_t *out) +{ + tor_assert(ent); + tor_assert(out); + const char *s; + s = consensus_cache_entry_get_value(ent, LABEL_FRESH_UNTIL); + if (s == NULL || parse_iso_time_nospace(s, out) < 0) + return -1; + else + return 0; +} + +/** Read the valid until timestamp from the cached object <b>ent</b> into + * *<b>out</b> and return 0, or return -1 if no such time was recorded. */ +int +consensus_cache_entry_get_valid_until(const consensus_cache_entry_t *ent, + time_t *out) +{ + tor_assert(ent); + tor_assert(out); + + const char *s; + s = consensus_cache_entry_get_value(ent, LABEL_VALID_UNTIL); + if (s == NULL || parse_iso_time_nospace(s, out) < 0) + return -1; + else + return 0; +} + diff --git a/src/or/consdiffmgr.h b/src/or/consdiffmgr.h index 048dae432c..0325a00bf5 100644 --- a/src/or/consdiffmgr.h +++ b/src/or/consdiffmgr.h @@ -23,6 +23,11 @@ struct consensus_cache_entry_t; // from conscache.h int consdiffmgr_add_consensus(const char *consensus, const networkstatus_t *as_parsed); +consdiff_status_t consdiffmgr_find_consensus( + struct consensus_cache_entry_t **entry_out, + consensus_flavor_t flavor, + compress_method_t method); + consdiff_status_t consdiffmgr_find_diff_from( struct consensus_cache_entry_t **entry_out, consensus_flavor_t flavor, @@ -30,8 +35,20 @@ consdiff_status_t consdiffmgr_find_diff_from( const uint8_t *digest, size_t digestlen, compress_method_t method); + +int consensus_cache_entry_get_voter_id_digests( + const struct consensus_cache_entry_t *ent, + smartlist_t *out); +int consensus_cache_entry_get_fresh_until( + const struct consensus_cache_entry_t *ent, + time_t *out); +int consensus_cache_entry_get_valid_until( + const struct consensus_cache_entry_t *ent, + time_t *out); + void consdiffmgr_rescan(void); int consdiffmgr_cleanup(void); +void consdiffmgr_enable_background_compression(void); void consdiffmgr_configure(const consdiff_cfg_t *cfg); struct sandbox_cfg_elem; int consdiffmgr_register_with_sandbox(struct sandbox_cfg_elem **cfg); @@ -40,6 +57,7 @@ int consdiffmgr_validate(void); #ifdef CONSDIFFMGR_PRIVATE STATIC unsigned n_diff_compression_methods(void); +STATIC unsigned n_consensus_compression_methods(void); STATIC consensus_cache_t *cdm_cache_get(void); STATIC consensus_cache_entry_t *cdm_cache_lookup_consensus( consensus_flavor_t flavor, time_t valid_after); diff --git a/src/or/directory.c b/src/or/directory.c index da92e0f897..d2ac42c3ff 100644 --- a/src/or/directory.c +++ b/src/or/directory.c @@ -13,6 +13,7 @@ #include "config.h" #include "connection.h" #include "connection_edge.h" +#include "conscache.h" #include "consdiff.h" #include "consdiffmgr.h" #include "control.h" @@ -117,7 +118,8 @@ static void dir_routerdesc_download_failed(smartlist_t *failed, int was_descriptor_digests); static void dir_microdesc_download_failed(smartlist_t *failed, int status_code); -static int client_likes_consensus(networkstatus_t *v, const char *want_url); +static int client_likes_consensus(const struct consensus_cache_entry_t *ent, + const char *want_url); static void connection_dir_close_consensus_fetches( dir_connection_t *except_this_one, const char *resource); @@ -1667,6 +1669,7 @@ directory_send_command(dir_connection_t *conn, char decorated_address[128]; smartlist_t *headers = smartlist_new(); char *url; + char *accept_encoding; size_t url_len; char request[8192]; size_t request_len, total_request_len = 0; @@ -1723,6 +1726,12 @@ directory_send_command(dir_connection_t *conn, proxystring[0] = 0; } + /* Add Accept-Encoding. */ + accept_encoding = accept_encoding_header(); + smartlist_add_asprintf(headers, "Accept-Encoding: %s\r\n", + accept_encoding); + tor_free(accept_encoding); + /* Add additional headers, if any */ { config_line_t *h; @@ -2050,16 +2059,15 @@ parse_http_response(const char *headers, int *code, time_t *date, if (!strcmpstart(s, "Content-Encoding: ")) { enc = s+18; break; }); - if (!enc || !strcmp(enc, "identity")) { + + if (enc == NULL) *compression = NO_METHOD; - } else if (!strcmp(enc, "deflate") || !strcmp(enc, "x-deflate")) { - *compression = ZLIB_METHOD; - } else if (!strcmp(enc, "gzip") || !strcmp(enc, "x-gzip")) { - *compression = GZIP_METHOD; - } else { - log_info(LD_HTTP, "Unrecognized content encoding: %s. Trying to deal.", - escaped(enc)); - *compression = UNKNOWN_METHOD; + else { + *compression = compression_method_get_by_name(enc); + + if (*compression == UNKNOWN_METHOD) + log_info(LD_HTTP, "Unrecognized content encoding: %s. Trying to deal.", + escaped(enc)); } } SMARTLIST_FOREACH(parsed_headers, char *, s, tor_free(s)); @@ -2301,37 +2309,31 @@ connection_dir_client_reached_eof(dir_connection_t *conn) if (compression == UNKNOWN_METHOD || guessed != compression) { /* Tell the user if we don't believe what we're told about compression.*/ const char *description1, *description2; - if (compression == ZLIB_METHOD) - description1 = "as deflated"; - else if (compression == GZIP_METHOD) - description1 = "as gzipped"; - else if (compression == NO_METHOD) - description1 = "as uncompressed"; - else - description1 = "with an unknown Content-Encoding"; - if (guessed == ZLIB_METHOD) - description2 = "deflated"; - else if (guessed == GZIP_METHOD) - description2 = "gzipped"; - else if (!plausible) + + description1 = compression_method_get_human_name(compression); + + if (BUG(description1 == NULL)) + description1 = compression_method_get_human_name(UNKNOWN_METHOD); + + if (guessed == UNKNOWN_METHOD && !plausible) description2 = "confusing binary junk"; else - description2 = "uncompressed"; + description2 = compression_method_get_human_name(guessed); - log_info(LD_HTTP, "HTTP body from server '%s:%d' was labeled %s, " + log_info(LD_HTTP, "HTTP body from server '%s:%d' was labeled as %s, " "but it seems to be %s.%s", conn->base_.address, conn->base_.port, description1, description2, (compression>0 && guessed>0)?" Trying both.":""); } - /* Try declared compression first if we can. */ - if (compression == GZIP_METHOD || compression == ZLIB_METHOD) + /* Try declared compression first if we can. + * tor_compress_supports_method() also returns true for NO_METHOD. */ + if (tor_compress_supports_method(compression)) tor_uncompress(&new_body, &new_len, body, body_len, compression, !allow_partial, LOG_PROTOCOL_WARN); /* Okay, if that didn't work, and we think that it was compressed * differently, try that. */ - if (!new_body && - (guessed == GZIP_METHOD || guessed == ZLIB_METHOD) && + if (!new_body && tor_compress_supports_method(guessed) && compression != guessed) tor_uncompress(&new_body, &new_len, body, body_len, guessed, !allow_partial, LOG_PROTOCOL_WARN); @@ -3296,6 +3298,15 @@ static compress_method_t srv_meth_pref_precompressed[] = { NO_METHOD }; +/** Array of compression methods to use (if supported) for serving + * streamed data, ordered from best to worst. */ +static compress_method_t srv_meth_pref_streaming_compression[] = { + ZSTD_METHOD, + ZLIB_METHOD, + GZIP_METHOD, + NO_METHOD +}; + /** Parse the compression methods listed in an Accept-Encoding header <b>h</b>, * and convert them to a bitfield where compression method x is supported if * and only if 1 << x is set in the bitfield. */ @@ -3321,6 +3332,38 @@ parse_accept_encoding_header(const char *h) return result; } +/** Array of compression methods to use (if supported) for requesting + * compressed data, ordered from best to worst. */ +static compress_method_t client_meth_pref[] = { + LZMA_METHOD, + ZSTD_METHOD, + ZLIB_METHOD, + GZIP_METHOD, + NO_METHOD +}; + +/** Return a newly allocated string containing a comma separated list of + * supported encodings. */ +STATIC char * +accept_encoding_header(void) +{ + smartlist_t *methods = smartlist_new(); + char *header = NULL; + compress_method_t method; + unsigned i; + + for (i = 0; i < ARRAY_LENGTH(client_meth_pref); ++i) { + method = client_meth_pref[i]; + if (tor_compress_supports_method(method)) + smartlist_add(methods, (char *)compression_method_get_name(method)); + } + + header = smartlist_join_strings(methods, ", ", 0, NULL); + smartlist_free(methods); + + return header; +} + /** Decide whether a client would accept the consensus we have. * * Clients can say they only want a consensus if it's signed by more @@ -3336,42 +3379,39 @@ parse_accept_encoding_header(const char *h) * consensus, 0 otherwise. */ int -client_likes_consensus(networkstatus_t *v, const char *want_url) +client_likes_consensus(const struct consensus_cache_entry_t *ent, + const char *want_url) { - smartlist_t *want_authorities = smartlist_new(); + smartlist_t *voters = smartlist_new(); int need_at_least; int have = 0; + if (consensus_cache_entry_get_voter_id_digests(ent, voters) != 0) { + return 1; // We don't know the voters; assume the client won't mind. */ + } + + smartlist_t *want_authorities = smartlist_new(); dir_split_resource_into_fingerprints(want_url, want_authorities, NULL, 0); need_at_least = smartlist_len(want_authorities)/2+1; - SMARTLIST_FOREACH_BEGIN(want_authorities, const char *, d) { - char want_digest[DIGEST_LEN]; - size_t want_len = strlen(d)/2; - if (want_len > DIGEST_LEN) - want_len = DIGEST_LEN; - - if (base16_decode(want_digest, DIGEST_LEN, d, want_len*2) - != (int) want_len) { - log_fn(LOG_PROTOCOL_WARN, LD_DIR, - "Failed to decode requested authority digest %s.", escaped(d)); - continue; - }; - SMARTLIST_FOREACH_BEGIN(v->voters, networkstatus_voter_info_t *, vi) { - if (smartlist_len(vi->sigs) && - tor_memeq(vi->identity_digest, want_digest, want_len)) { + SMARTLIST_FOREACH_BEGIN(want_authorities, const char *, want_digest) { + + SMARTLIST_FOREACH_BEGIN(voters, const char *, digest) { + if (!strcasecmpstart(digest, want_digest)) { have++; break; }; - } SMARTLIST_FOREACH_END(vi); + } SMARTLIST_FOREACH_END(digest); /* early exit, if we already have enough */ if (have >= need_at_least) break; - } SMARTLIST_FOREACH_END(d); + } SMARTLIST_FOREACH_END(want_digest); SMARTLIST_FOREACH(want_authorities, char *, d, tor_free(d)); smartlist_free(want_authorities); + SMARTLIST_FOREACH(voters, char *, cp, tor_free(cp)); + smartlist_free(voters); return (have >= need_at_least); } @@ -3578,20 +3618,25 @@ handle_get_frontpage(dir_connection_t *conn, const get_handler_args_t *args) return 0; } -/** Warn that the consensus <b>v</b> of type <b>flavor</b> is too old and will - * not be served to clients. Rate-limit the warning to avoid logging an entry - * on every request. +/** Warn that the cached consensus <b>consensus</b> of type + * <b>flavor</b> is too old and will not be served to clients. Rate-limit the + * warning to avoid logging an entry on every request. */ static void -warn_consensus_is_too_old(networkstatus_t *v, const char *flavor, time_t now) +warn_consensus_is_too_old(const struct consensus_cache_entry_t *consensus, + const char *flavor, time_t now) { #define TOO_OLD_WARNING_INTERVAL (60*60) static ratelim_t warned = RATELIM_INIT(TOO_OLD_WARNING_INTERVAL); char timestamp[ISO_TIME_LEN+1]; + time_t valid_until; char *dupes; + if (consensus_cache_entry_get_valid_until(consensus, &valid_until)) + return; + if ((dupes = rate_limit_log(&warned, now))) { - format_local_iso_time(timestamp, v->valid_until); + format_local_iso_time(timestamp, valid_until); log_warn(LD_DIRSERV, "Our %s%sconsensus is too old, so we will not " "serve it to clients. It was valid until %s local time and we " "continued to serve it for up to 24 hours after it expired.%s", @@ -3632,6 +3677,13 @@ parse_or_diff_from_header(smartlist_t **digests_out, const char *headers) return 0; } +/** Fallback compression method. The fallback compression method is used in + * case a client requests a non-compressed document. We only store compressed + * documents, so we use this compression method to fetch the document and let + * the spooling system do the streaming decompression. + */ +#define FALLBACK_COMPRESS_METHOD ZLIB_METHOD + /** * Try to find the best consensus diff possible in order to serve a client * request for a diff from one of the consensuses in <b>digests</b> to the @@ -3661,9 +3713,82 @@ find_best_diff(const smartlist_t *digests, int flav, } } } SMARTLIST_FOREACH_END(diff_from); + + SMARTLIST_FOREACH_BEGIN(digests, const uint8_t *, diff_from) { + if (consdiffmgr_find_diff_from(&result, flav, DIGEST_SHA3_256, diff_from, + DIGEST256_LEN, FALLBACK_COMPRESS_METHOD) == CONSDIFF_AVAILABLE) { + tor_assert_nonfatal(result); + *compression_used_out = FALLBACK_COMPRESS_METHOD; + return result; + } + } SMARTLIST_FOREACH_END(diff_from); + return NULL; } +/** Lookup the cached consensus document by the flavor found in <b>flav</b>. + * The prefered set of compression methods should be listed in the + * <b>compression_methods</b> bitfield. The compression method chosen (if any) + * is stored in <b>compression_used_out</b>. */ +static struct consensus_cache_entry_t * +find_best_consensus(int flav, + unsigned compression_methods, + compress_method_t *compression_used_out) +{ + struct consensus_cache_entry_t *result = NULL; + unsigned u; + + for (u = 0; u < ARRAY_LENGTH(srv_meth_pref_precompressed); ++u) { + compress_method_t method = srv_meth_pref_precompressed[u]; + + if (0 == (compression_methods & (1u<<method))) + continue; + + if (consdiffmgr_find_consensus(&result, flav, + method) == CONSDIFF_AVAILABLE) { + tor_assert_nonfatal(result); + *compression_used_out = method; + return result; + } + } + + if (consdiffmgr_find_consensus(&result, flav, + FALLBACK_COMPRESS_METHOD) == CONSDIFF_AVAILABLE) { + tor_assert_nonfatal(result); + *compression_used_out = FALLBACK_COMPRESS_METHOD; + return result; + } + + return NULL; +} + +/** Try to find the best supported compression method possible from a given + * <b>compression_methods</b>. Return NO_METHOD if no mutually supported + * compression method could be found. */ +static compress_method_t +find_best_compression_method(unsigned compression_methods, int stream) +{ + unsigned u; + compress_method_t *methods; + size_t length; + + if (stream) { + methods = srv_meth_pref_streaming_compression; + length = ARRAY_LENGTH(srv_meth_pref_streaming_compression); + } else { + methods = srv_meth_pref_precompressed; + length = ARRAY_LENGTH(srv_meth_pref_precompressed); + } + + for (u = 0; u < length; ++u) { + compress_method_t method = methods[u]; + if (compression_methods & (1u<<method)) + return method; + } + + return NO_METHOD; +} + /** Helper function for GET /tor/status-vote/current/consensus */ static int @@ -3671,14 +3796,14 @@ handle_get_current_consensus(dir_connection_t *conn, const get_handler_args_t *args) { const char *url = args->url; - const int compressed = args->compression_supported & (1u << ZLIB_METHOD); + const compress_method_t compress_method = + find_best_compression_method(args->compression_supported, 0); const time_t if_modified_since = args->if_modified_since; int clear_spool = 0; /* v3 network status fetch. */ long lifetime = NETWORKSTATUS_CACHE_LIFETIME; - networkstatus_t *v; time_t now = time(NULL); const char *want_fps = NULL; char *flavor = NULL; @@ -3704,18 +3829,44 @@ handle_get_current_consensus(dir_connection_t *conn, want_fps = url+strlen(CONSENSUS_URL_PREFIX); } - v = networkstatus_get_latest_consensus_by_flavor(flav); + struct consensus_cache_entry_t *cached_consensus = NULL; + smartlist_t *diff_from_digests = NULL; + compress_method_t compression_used = NO_METHOD; + if (!parse_or_diff_from_header(&diff_from_digests, args->headers)) { + tor_assert(diff_from_digests); + cached_consensus = find_best_diff(diff_from_digests, flav, + args->compression_supported, + &compression_used); + SMARTLIST_FOREACH(diff_from_digests, uint8_t *, d, tor_free(d)); + smartlist_free(diff_from_digests); + } + + if (! cached_consensus) { + cached_consensus = find_best_consensus(flav, + args->compression_supported, + &compression_used); + } - if (v && !networkstatus_consensus_reasonably_live(v, now)) { + time_t fresh_until, valid_until; + int have_fresh_until = 0, have_valid_until = 0; + if (cached_consensus) { + have_fresh_until = + !consensus_cache_entry_get_fresh_until(cached_consensus, &fresh_until); + have_valid_until = + !consensus_cache_entry_get_valid_until(cached_consensus, &valid_until); + } + + if (cached_consensus && have_valid_until && + !networkstatus_valid_until_is_reasonably_live(valid_until, now)) { write_http_status_line(conn, 404, "Consensus is too old"); - warn_consensus_is_too_old(v, flavor, now); + warn_consensus_is_too_old(cached_consensus, flavor, now); geoip_note_ns_response(GEOIP_REJECT_NOT_FOUND); tor_free(flavor); goto done; } - if (v && want_fps && - !client_likes_consensus(v, want_fps)) { + if (cached_consensus && want_fps && + !client_likes_consensus(cached_consensus, want_fps)) { write_http_status_line(conn, 404, "Consensus not signed by sufficient " "number of requested authorities"); geoip_note_ns_response(GEOIP_REJECT_NOT_ENOUGH_SIGS); @@ -3723,48 +3874,23 @@ handle_get_current_consensus(dir_connection_t *conn, goto done; } - struct consensus_cache_entry_t *cached_diff = NULL; - smartlist_t *diff_from_digests = NULL; - compress_method_t compression_used = NO_METHOD; - if (!parse_or_diff_from_header(&diff_from_digests, args->headers)) { - tor_assert(diff_from_digests); - cached_diff = find_best_diff(diff_from_digests, flav, - args->compression_supported, - &compression_used); - SMARTLIST_FOREACH(diff_from_digests, uint8_t *, d, tor_free(d)); - smartlist_free(diff_from_digests); - } - conn->spool = smartlist_new(); clear_spool = 1; { spooled_resource_t *spooled; - if (cached_diff) { - spooled = spooled_resource_new_from_cache_entry(cached_diff); - } else if (flavor) { - spooled = spooled_resource_new(DIR_SPOOL_NETWORKSTATUS, - (uint8_t*)flavor, strlen(flavor)); - compression_used = compressed ? ZLIB_METHOD : NO_METHOD; - } else { - spooled = spooled_resource_new(DIR_SPOOL_NETWORKSTATUS, - NULL, 0); - compression_used = compressed ? ZLIB_METHOD : NO_METHOD; + if (cached_consensus) { + spooled = spooled_resource_new_from_cache_entry(cached_consensus); + smartlist_add(conn->spool, spooled); } tor_free(flavor); - smartlist_add(conn->spool, spooled); } - lifetime = (v && v->fresh_until > now) ? v->fresh_until - now : 0; - if (!smartlist_len(conn->spool)) { /* we failed to create/cache cp */ - write_http_status_line(conn, 503, "Network status object unavailable"); - geoip_note_ns_response(GEOIP_REJECT_UNAVAILABLE); - goto done; - } + lifetime = (have_fresh_until && fresh_until > now) ? fresh_until - now : 0; size_t size_guess = 0; int n_expired = 0; dirserv_spool_remove_missing_and_guess_size(conn, if_modified_since, - compressed, + compress_method != NO_METHOD, &size_guess, &n_expired); @@ -3803,11 +3929,17 @@ handle_get_current_consensus(dir_connection_t *conn, } clear_spool = 0; + + // The compress_method might have been NO_METHOD, but we store the data + // compressed. Decompress them using `compression_used`. See fallback code in + // find_best_consensus() and find_best_diff(). write_http_response_header(conn, -1, - compression_used, + compress_method == NO_METHOD ? + NO_METHOD : compression_used, smartlist_len(conn->spool) == 1 ? lifetime : 0); - if (! compressed) - conn->compress_state = tor_compress_new(0, ZLIB_METHOD, + + if (compress_method == NO_METHOD) + conn->compress_state = tor_compress_new(0, compression_used, HIGH_COMPRESSION); /* Prime the connection with some data. */ @@ -3828,7 +3960,8 @@ static int handle_get_status_vote(dir_connection_t *conn, const get_handler_args_t *args) { const char *url = args->url; - const int compressed = args->compression_supported & (1u << ZLIB_METHOD); + const compress_method_t compress_method = + find_best_compression_method(args->compression_supported, 1); { int current; ssize_t body_len = 0; @@ -3885,11 +4018,12 @@ handle_get_status_vote(dir_connection_t *conn, const get_handler_args_t *args) goto vote_done; } SMARTLIST_FOREACH(dir_items, cached_dir_t *, d, - body_len += compressed ? d->dir_z_len : d->dir_len); + body_len += compress_method != NO_METHOD ? + d->dir_compressed_len : d->dir_len); estimated_len += body_len; SMARTLIST_FOREACH(items, const char *, item, { size_t ln = strlen(item); - if (compressed) { + if (compress_method != NO_METHOD) { estimated_len += ln/2; } else { body_len += ln; estimated_len += ln; @@ -3901,12 +4035,12 @@ handle_get_status_vote(dir_connection_t *conn, const get_handler_args_t *args) goto vote_done; } write_http_response_header(conn, body_len ? body_len : -1, - compressed ? ZLIB_METHOD : NO_METHOD, + compress_method, lifetime); if (smartlist_len(items)) { - if (compressed) { - conn->compress_state = tor_compress_new(1, ZLIB_METHOD, + if (compress_method != NO_METHOD) { + conn->compress_state = tor_compress_new(1, compress_method, choose_compression_level(estimated_len)); SMARTLIST_FOREACH(items, const char *, c, connection_write_to_buf_compress(c, strlen(c), conn, 0)); @@ -3917,8 +4051,10 @@ handle_get_status_vote(dir_connection_t *conn, const get_handler_args_t *args) } } else { SMARTLIST_FOREACH(dir_items, cached_dir_t *, d, - connection_write_to_buf(compressed ? d->dir_z : d->dir, - compressed ? d->dir_z_len : d->dir_len, + connection_write_to_buf(compress_method != NO_METHOD ? + d->dir_compressed : d->dir, + compress_method != NO_METHOD ? + d->dir_compressed_len : d->dir_len, TO_CONN(conn))); } vote_done: @@ -3936,7 +4072,8 @@ static int handle_get_microdesc(dir_connection_t *conn, const get_handler_args_t *args) { const char *url = args->url; - const int compressed = args->compression_supported & (1u << ZLIB_METHOD); + const compress_method_t compress_method = + find_best_compression_method(args->compression_supported, 1); int clear_spool = 1; { conn->spool = smartlist_new(); @@ -3947,7 +4084,8 @@ handle_get_microdesc(dir_connection_t *conn, const get_handler_args_t *args) DSR_DIGEST256|DSR_BASE64|DSR_SORT_UNIQ); size_t size_guess = 0; - dirserv_spool_remove_missing_and_guess_size(conn, 0, compressed, + dirserv_spool_remove_missing_and_guess_size(conn, 0, + compress_method != NO_METHOD, &size_guess, NULL); if (smartlist_len(conn->spool) == 0) { write_http_status_line(conn, 404, "Not found"); @@ -3963,11 +4101,11 @@ handle_get_microdesc(dir_connection_t *conn, const get_handler_args_t *args) clear_spool = 0; write_http_response_header(conn, -1, - compressed ? ZLIB_METHOD : NO_METHOD, + compress_method, MICRODESC_CACHE_LIFETIME); - if (compressed) - conn->compress_state = tor_compress_new(1, ZLIB_METHOD, + if (compress_method != NO_METHOD) + conn->compress_state = tor_compress_new(1, compress_method, choose_compression_level(size_guess)); const int initial_flush_result = connection_dirserv_flushed_some(conn); @@ -3988,7 +4126,8 @@ static int handle_get_descriptor(dir_connection_t *conn, const get_handler_args_t *args) { const char *url = args->url; - const int compressed = args->compression_supported & (1u << ZLIB_METHOD); + const compress_method_t compress_method = + find_best_compression_method(args->compression_supported, 1); const or_options_t *options = get_options(); int clear_spool = 1; if (!strcmpstart(url,"/tor/server/") || @@ -4027,8 +4166,8 @@ handle_get_descriptor(dir_connection_t *conn, const get_handler_args_t *args) size_t size_guess = 0; int n_expired = 0; dirserv_spool_remove_missing_and_guess_size(conn, publish_cutoff, - compressed, &size_guess, - &n_expired); + compress_method != NO_METHOD, + &size_guess, &n_expired); /* If we are the bridge authority and the descriptor is a bridge * descriptor, remember that we served this descriptor for desc stats. */ @@ -4058,11 +4197,9 @@ handle_get_descriptor(dir_connection_t *conn, const get_handler_args_t *args) dir_conn_clear_spool(conn); goto done; } - write_http_response_header(conn, -1, - compressed ? ZLIB_METHOD : NO_METHOD, - cache_lifetime); - if (compressed) - conn->compress_state = tor_compress_new(1, ZLIB_METHOD, + write_http_response_header(conn, -1, compress_method, cache_lifetime); + if (compress_method != NO_METHOD) + conn->compress_state = tor_compress_new(1, compress_method, choose_compression_level(size_guess)); clear_spool = 0; /* Prime the connection with some data. */ @@ -4083,7 +4220,8 @@ static int handle_get_keys(dir_connection_t *conn, const get_handler_args_t *args) { const char *url = args->url; - const int compressed = args->compression_supported & (1u << ZLIB_METHOD); + const compress_method_t compress_method = + find_best_compression_method(args->compression_supported, 1); const time_t if_modified_since = args->if_modified_since; { smartlist_t *certs = smartlist_new(); @@ -4146,16 +4284,19 @@ handle_get_keys(dir_connection_t *conn, const get_handler_args_t *args) SMARTLIST_FOREACH(certs, authority_cert_t *, c, len += c->cache_info.signed_descriptor_len); - if (global_write_bucket_low(TO_CONN(conn), compressed?len/2:len, 2)) { + if (global_write_bucket_low(TO_CONN(conn), + compress_method != NO_METHOD ? len/2 : len, + 2)) { write_http_status_line(conn, 503, "Directory busy, try again later"); goto keys_done; } - write_http_response_header(conn, compressed?-1:len, - compressed ? ZLIB_METHOD : NO_METHOD, + write_http_response_header(conn, + compress_method != NO_METHOD ? -1 : len, + compress_method, 60*60); - if (compressed) { - conn->compress_state = tor_compress_new(1, ZLIB_METHOD, + if (compress_method != NO_METHOD) { + conn->compress_state = tor_compress_new(1, compress_method, choose_compression_level(len)); SMARTLIST_FOREACH(certs, authority_cert_t *, c, connection_write_to_buf_compress( diff --git a/src/or/directory.h b/src/or/directory.h index 125333da37..a015c7045d 100644 --- a/src/or/directory.h +++ b/src/or/directory.h @@ -160,6 +160,7 @@ struct get_handler_args_t; STATIC int handle_get_hs_descriptor_v3(dir_connection_t *conn, const struct get_handler_args_t *args); STATIC int directory_handle_command(dir_connection_t *conn); +STATIC char *accept_encoding_header(void); #endif diff --git a/src/or/dirserv.c b/src/or/dirserv.c index 77e99fbc17..2b10a09ead 100644 --- a/src/or/dirserv.c +++ b/src/or/dirserv.c @@ -1184,8 +1184,8 @@ new_cached_dir(char *s, time_t published) d->dir = s; d->dir_len = strlen(s); d->published = published; - if (tor_compress(&(d->dir_z), &(d->dir_z_len), d->dir, d->dir_len, - ZLIB_METHOD)) { + if (tor_compress(&(d->dir_compressed), &(d->dir_compressed_len), + d->dir, d->dir_len, ZLIB_METHOD)) { log_warn(LD_BUG, "Error compressing directory"); } return d; @@ -1196,7 +1196,7 @@ static void clear_cached_dir(cached_dir_t *d) { tor_free(d->dir); - tor_free(d->dir_z); + tor_free(d->dir_compressed); memset(d, 0, sizeof(cached_dir_t)); } @@ -3513,7 +3513,7 @@ spooled_resource_estimate_size(const spooled_resource_t *spooled, if (cached == NULL) { return 0; } - size_t result = compressed ? cached->dir_z_len : cached->dir_len; + size_t result = compressed ? cached->dir_compressed_len : cached->dir_len; return result; } } @@ -3572,8 +3572,8 @@ spooled_resource_flush_some(spooled_resource_t *spooled, int64_t total_len; const char *ptr; if (cached) { - total_len = cached->dir_z_len; - ptr = cached->dir_z; + total_len = cached->dir_compressed_len; + ptr = cached->dir_compressed; } else { total_len = spooled->cce_len; ptr = (const char *)spooled->cce_body; diff --git a/src/or/main.c b/src/or/main.c index f1a8cfb96e..9f0c29cf0b 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -2482,6 +2482,7 @@ do_main_loop(void) /* launch cpuworkers. Need to do this *after* we've read the onion key. */ cpu_init(); } + consdiffmgr_enable_background_compression(); /* Setup shared random protocol subsystem. */ if (authdir_mode_v3(get_options())) { diff --git a/src/or/networkstatus.c b/src/or/networkstatus.c index bd106fd9a6..fffd1078be 100644 --- a/src/or/networkstatus.c +++ b/src/or/networkstatus.c @@ -1407,16 +1407,24 @@ networkstatus_get_live_consensus,(time_t now)) * Return 1 if the consensus is reasonably live, or 0 if it is too old. */ int -networkstatus_consensus_reasonably_live(networkstatus_t *consensus, time_t now) +networkstatus_consensus_reasonably_live(const networkstatus_t *consensus, + time_t now) { -#define REASONABLY_LIVE_TIME (24*60*60) if (BUG(!consensus)) return 0; - if (now <= consensus->valid_until + REASONABLY_LIVE_TIME) - return 1; + return networkstatus_valid_until_is_reasonably_live(consensus->valid_until, + now); +} - return 0; +/** As networkstatus_consensus_reasonably_live, but takes a valid_until + * time rather than an entire consensus. */ +int +networkstatus_valid_until_is_reasonably_live(time_t valid_until, + time_t now) +{ +#define REASONABLY_LIVE_TIME (24*60*60) + return (now <= valid_until + REASONABLY_LIVE_TIME); } /* XXXX remove this in favor of get_live_consensus. But actually, diff --git a/src/or/networkstatus.h b/src/or/networkstatus.h index 37a5a3b7a0..e774c4d266 100644 --- a/src/or/networkstatus.h +++ b/src/or/networkstatus.h @@ -81,8 +81,10 @@ MOCK_DECL(networkstatus_t *,networkstatus_get_latest_consensus,(void)); MOCK_DECL(networkstatus_t *,networkstatus_get_latest_consensus_by_flavor, (consensus_flavor_t f)); MOCK_DECL(networkstatus_t *, networkstatus_get_live_consensus,(time_t now)); -int networkstatus_consensus_reasonably_live(networkstatus_t *consensus, +int networkstatus_consensus_reasonably_live(const networkstatus_t *consensus, time_t now); +int networkstatus_valid_until_is_reasonably_live(time_t valid_until, + time_t now); networkstatus_t *networkstatus_get_reasonably_live_consensus(time_t now, int flavor); MOCK_DECL(int, networkstatus_consensus_is_bootstrapping,(time_t now)); diff --git a/src/or/or.h b/src/or/or.h index 297ec47fc1..acbf8cebbb 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1934,9 +1934,9 @@ typedef struct addr_policy_t { * compressed form. */ typedef struct cached_dir_t { char *dir; /**< Contents of this object, NUL-terminated. */ - char *dir_z; /**< Compressed contents of this object. */ + char *dir_compressed; /**< Compressed contents of this object. */ size_t dir_len; /**< Length of <b>dir</b> (not counting its NUL). */ - size_t dir_z_len; /**< Length of <b>dir_z</b>. */ + size_t dir_compressed_len; /**< Length of <b>dir_compressed</b>. */ time_t published; /**< When was this object published. */ common_digests_t digests; /**< Digests of this object (networkstatus only) */ /** Sha3 digest (also ns only) */ diff --git a/src/test/test_consdiffmgr.c b/src/test/test_consdiffmgr.c index 9d0c5b5b71..2e5dd59292 100644 --- a/src/test/test_consdiffmgr.c +++ b/src/test/test_consdiffmgr.c @@ -759,9 +759,11 @@ test_consdiffmgr_cleanup_old_diffs(void *arg) consensus_cache_entry_incref(hold_ent); // incref, so it is preserved. /* Now add an even-more-recent consensus; this should make all previous - * diffs deletable */ + * diffs deletable, and make delete */ tt_int_op(0, OP_EQ, consdiffmgr_add_consensus(md_body[3], md_ns[3])); - tt_int_op(2 * n_diff_compression_methods(), OP_EQ, consdiffmgr_cleanup()); + tt_int_op(2 * n_diff_compression_methods() + + (n_consensus_compression_methods() - 1) , OP_EQ, + consdiffmgr_cleanup()); tt_int_op(CONSDIFF_NOT_FOUND, OP_EQ, lookup_diff_from(&ent, FLAV_MICRODESC, md_body[0])); diff --git a/src/test/test_dir_handle_get.c b/src/test/test_dir_handle_get.c index c98938b2db..7d40db4543 100644 --- a/src/test/test_dir_handle_get.c +++ b/src/test/test_dir_handle_get.c @@ -12,6 +12,7 @@ #include "or.h" #include "config.h" #include "connection.h" +#include "consdiffmgr.h" #include "directory.h" #include "test.h" #include "compress.h" @@ -465,6 +466,8 @@ init_mock_options(void) mock_options = tor_malloc(sizeof(or_options_t)); memset(mock_options, 0, sizeof(or_options_t)); mock_options->TestingTorNetwork = 1; + mock_options->DataDirectory = tor_strdup(get_fname_rnd("datadir_tmp")); + check_private_dir(mock_options->DataDirectory, CPD_CREATE, NULL); } static const or_options_t * @@ -501,14 +504,6 @@ test_dir_handle_get_micro_d(void *data) /* SETUP */ init_mock_options(); - const char *fn = get_fname("dir_handle_datadir_test1"); - mock_options->DataDirectory = tor_strdup(fn); - -#ifdef _WIN32 - tt_int_op(0, OP_EQ, mkdir(mock_options->DataDirectory)); -#else - tt_int_op(0, OP_EQ, mkdir(mock_options->DataDirectory, 0700)); -#endif /* Add microdesc to cache */ crypto_digest256(digest, microdesc, strlen(microdesc), DIGEST_SHA256); @@ -568,14 +563,6 @@ test_dir_handle_get_micro_d_server_busy(void *data) /* SETUP */ init_mock_options(); - const char *fn = get_fname("dir_handle_datadir_test2"); - mock_options->DataDirectory = tor_strdup(fn); - -#ifdef _WIN32 - tt_int_op(0, OP_EQ, mkdir(mock_options->DataDirectory)); -#else - tt_int_op(0, OP_EQ, mkdir(mock_options->DataDirectory, 0700)); -#endif /* Add microdesc to cache */ crypto_digest256(digest, microdesc, strlen(microdesc), DIGEST_SHA256); @@ -1621,8 +1608,13 @@ test_dir_handle_get_status_vote_current_consensus_ns_not_enough_sigs(void* d) /* init mock */ mock_ns_val = tor_malloc_zero(sizeof(networkstatus_t)); mock_ns_val->flavor = FLAV_NS; + mock_ns_val->type = NS_TYPE_CONSENSUS; mock_ns_val->voters = smartlist_new(); - mock_ns_val->valid_until = time(NULL); + mock_ns_val->valid_after = time(NULL) - 1800; + mock_ns_val->valid_until = time(NULL) - 60; + + #define NETWORK_STATUS "some network status string" + consdiffmgr_add_consensus(NETWORK_STATUS, mock_ns_val); /* init mock */ init_mock_options(); @@ -1710,15 +1702,23 @@ test_dir_handle_get_status_vote_current_consensus_too_old(void *data) (void)data; mock_ns_val = tor_malloc_zero(sizeof(networkstatus_t)); + mock_ns_val->type = NS_TYPE_CONSENSUS; mock_ns_val->flavor = FLAV_MICRODESC; - mock_ns_val->valid_until = time(NULL) - (60 * 60 * 24) - 1; + mock_ns_val->valid_after = time(NULL) - (24 * 60 * 60 + 1800); + mock_ns_val->fresh_until = time(NULL) - (24 * 60 * 60 + 900); + mock_ns_val->valid_until = time(NULL) - (24 * 60 * 60 + 20); + + #define NETWORK_STATUS "some network status string" + consdiffmgr_add_consensus(NETWORK_STATUS, mock_ns_val); init_mock_options(); + MOCK(get_options, mock_get_options); MOCK(connection_write_to_buf_impl_, connection_write_to_buf_mock); MOCK(networkstatus_get_latest_consensus_by_flavor, mock_ns_get_by_flavor); conn = new_dir_conn(); + TO_CONN(conn)->address = tor_strdup("127.0.0.1"); setup_capture_of_logs(LOG_WARN); @@ -1734,6 +1734,17 @@ test_dir_handle_get_status_vote_current_consensus_too_old(void *data) tor_free(header); teardown_capture_of_logs(); + tor_free(mock_ns_val); + + mock_ns_val = tor_malloc_zero(sizeof(networkstatus_t)); + mock_ns_val->type = NS_TYPE_CONSENSUS; + mock_ns_val->flavor = FLAV_NS; + mock_ns_val->valid_after = time(NULL) - (24 * 60 * 60 + 1800); + mock_ns_val->fresh_until = time(NULL) - (24 * 60 * 60 + 900); + mock_ns_val->valid_until = time(NULL) - (24 * 60 * 60 + 20); + + #define NETWORK_STATUS "some network status string" + consdiffmgr_add_consensus(NETWORK_STATUS, mock_ns_val); setup_capture_of_logs(LOG_WARN); @@ -1772,16 +1783,26 @@ static void status_vote_current_consensus_ns_test(char **header, char **body, size_t *body_len) { - common_digests_t digests; - uint8_t sha3[DIGEST256_LEN]; dir_connection_t *conn = NULL; #define NETWORK_STATUS "some network status string" +#if 0 + common_digests_t digests; + uint8_t sha3[DIGEST256_LEN]; memset(&digests, 0x60, sizeof(digests)); memset(sha3, 0x06, sizeof(sha3)); dirserv_set_cached_consensus_networkstatus(NETWORK_STATUS, "ns", &digests, sha3, time(NULL)); +#endif + networkstatus_t *ns = tor_malloc_zero(sizeof(networkstatus_t)); + ns->type = NS_TYPE_CONSENSUS; + ns->flavor = FLAV_NS; + ns->valid_after = time(NULL) - 1800; + ns->fresh_until = time(NULL) - 900; + ns->valid_until = time(NULL) - 60; + consdiffmgr_add_consensus(NETWORK_STATUS, ns); + networkstatus_vote_free(ns); MOCK(connection_write_to_buf_impl_, connection_write_to_buf_mock); @@ -2592,11 +2613,11 @@ struct testcase_t dir_handle_get_tests[] = { DIR_HANDLE_CMD(status_vote_current_authority, 0), DIR_HANDLE_CMD(status_vote_next_authority_not_found, 0), DIR_HANDLE_CMD(status_vote_next_authority, 0), - DIR_HANDLE_CMD(status_vote_current_consensus_ns_not_enough_sigs, 0), - DIR_HANDLE_CMD(status_vote_current_consensus_ns_not_found, 0), - DIR_HANDLE_CMD(status_vote_current_consensus_too_old, 0), - DIR_HANDLE_CMD(status_vote_current_consensus_ns_busy, 0), - DIR_HANDLE_CMD(status_vote_current_consensus_ns, 0), + DIR_HANDLE_CMD(status_vote_current_consensus_ns_not_enough_sigs, TT_FORK), + DIR_HANDLE_CMD(status_vote_current_consensus_ns_not_found, TT_FORK), + DIR_HANDLE_CMD(status_vote_current_consensus_too_old, TT_FORK), + DIR_HANDLE_CMD(status_vote_current_consensus_ns_busy, TT_FORK), + DIR_HANDLE_CMD(status_vote_current_consensus_ns, TT_FORK), DIR_HANDLE_CMD(status_vote_current_d_not_found, 0), DIR_HANDLE_CMD(status_vote_next_d_not_found, 0), DIR_HANDLE_CMD(status_vote_d, 0), |