diff options
Diffstat (limited to 'src/or/consdiffmgr.c')
-rw-r--r-- | src/or/consdiffmgr.c | 621 |
1 files changed, 517 insertions, 104 deletions
diff --git a/src/or/consdiffmgr.c b/src/or/consdiffmgr.c index d478101c6b..910f842070 100644 --- a/src/or/consdiffmgr.c +++ b/src/or/consdiffmgr.c @@ -32,6 +32,15 @@ /* The valid-after time for a consensus (or for the target consensus of a * diff), encoded as ISO UTC. */ #define LABEL_VALID_AFTER "consensus-valid-after" +/* The fresh-until time for a consensus (or for the target consensus of a + * diff), encoded as ISO UTC. */ +#define LABEL_FRESH_UNTIL "consensus-fresh-until" +/* The valid-until time for a consensus (or for the target consensus of a + * diff), encoded as ISO UTC. */ +#define LABEL_VALID_UNTIL "consensus-valid-until" +/* Comma-separated list of hex-encoded identity digests for the voting + * authorities. */ +#define LABEL_SIGNATORIES "consensus-signatories" /* A hex encoded SHA3 digest of the object, as compressed (if any) */ #define LABEL_SHA3_DIGEST "sha3-digest" /* A hex encoded SHA3 digest of the object before compression. */ @@ -96,6 +105,36 @@ n_diff_compression_methods(void) return ARRAY_LENGTH(compress_diffs_with); } +/** Which methods do we use for precompressing consensuses? */ +static const compress_method_t compress_consensus_with[] = { + ZLIB_METHOD, +#ifdef HAVE_LZMA + LZMA_METHOD, +#endif +#ifdef HAVE_ZSTD + ZSTD_METHOD, +#endif +}; + +/** How many different methods will we try to use for diff compression? */ +STATIC unsigned +n_consensus_compression_methods(void) +{ + return ARRAY_LENGTH(compress_consensus_with); +} + +/** For which compression method do we retain old consensuses? There's no + * need to keep all of them, since we won't be serving them. We'll + * go with ZLIB_METHOD because it's pretty fast and everyone has it. + */ +#define RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD ZLIB_METHOD + +/** Handles pointing to the latest consensus entries as compressed and + * stored. */ +static consensus_cache_entry_handle_t * + latest_consensus[N_CONSENSUS_FLAVORS] + [ARRAY_LENGTH(compress_consensus_with)]; + /** Hashtable node used to remember the current status of the diff * from a given sha3 digest to the current consensus. */ typedef struct cdm_diff_t { @@ -135,13 +174,12 @@ static consdiff_cfg_t consdiff_cfg = { }; static int consdiffmgr_ensure_space_for_files(int n); +static int consensus_queue_compression_work(const char *consensus, + const networkstatus_t *as_parsed); static int consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from, consensus_cache_entry_t *diff_to); static void consdiffmgr_set_cache_flags(void); -/* Just gzip consensuses for now. */ -#define COMPRESS_CONSENSUS_WITH GZIP_METHOD - /* ===== * Hashtable setup * ===== */ @@ -410,11 +448,6 @@ cdm_cache_lookup_consensus(consensus_flavor_t flavor, time_t valid_after) consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); consensus_cache_entry_t *result = NULL; - if (smartlist_len(matches) > 1) { - log_warn(LD_BUG, "How odd; there appear to be two matching consensuses " - "with flavor %s published at %s.", - flavname, formatted_time); - } if (smartlist_len(matches)) { result = smartlist_get(matches, 0); } @@ -458,59 +491,7 @@ consdiffmgr_add_consensus(const char *consensus, } /* We don't have it. Add it to the cache. */ - consdiffmgr_ensure_space_for_files(1); - - { - size_t bodylen = strlen(consensus); - config_line_t *labels = NULL; - char formatted_time[ISO_TIME_LEN+1]; - format_iso_time_nospace(formatted_time, valid_after); - const char *flavname = networkstatus_get_flavor_name(flavor); - - cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED, - (const uint8_t *)consensus, bodylen); - { - const char *start, *end; - if (router_get_networkstatus_v3_signed_boundaries(consensus, - &start, &end) < 0) { - start = consensus; - end = consensus+bodylen; - } - cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED, - (const uint8_t *)start, - end - start); - } - - char *body_compressed = NULL; - size_t size_compressed = 0; - if (tor_compress(&body_compressed, &size_compressed, - consensus, bodylen, COMPRESS_CONSENSUS_WITH) < 0) { - config_free_lines(labels); - return -1; - } - cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST, - (const uint8_t *)body_compressed, size_compressed); - config_line_prepend(&labels, LABEL_COMPRESSION_TYPE, - compression_method_get_name(COMPRESS_CONSENSUS_WITH)); - config_line_prepend(&labels, LABEL_FLAVOR, flavname); - config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time); - config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); - - entry = consensus_cache_add(cdm_cache_get(), - labels, - (const uint8_t *)body_compressed, - size_compressed); - tor_free(body_compressed); - config_free_lines(labels); - } - - if (entry) { - consensus_cache_entry_mark_for_aggressive_release(entry); - consensus_cache_entry_decref(entry); - } - - cdm_cache_dirty = 1; - return entry ? 0 : -1; + return consensus_queue_compression_work(consensus, as_parsed); } /** @@ -543,6 +524,47 @@ sort_and_find_most_recent(smartlist_t *lst) } } +/** Return i such that compress_consensus_with[i] == method. Return + * -1 if no such i exists. */ +static int +consensus_compression_method_pos(compress_method_t method) +{ + unsigned i; + for (i = 0; i < n_consensus_compression_methods(); ++i) { + if (compress_consensus_with[i] == method) { + return i; + } + } + return -1; +} + +/** + * If we know a consensus with the flavor <b>flavor</b> compressed with + * <b>method</b>, set *<b>entry_out</b> to that value. Return values are as + * for consdiffmgr_find_diff_from(). + */ +consdiff_status_t +consdiffmgr_find_consensus(struct consensus_cache_entry_t **entry_out, + consensus_flavor_t flavor, + compress_method_t method) +{ + tor_assert((int)flavor < N_CONSENSUS_FLAVORS); + + int pos = consensus_compression_method_pos(method); + if (pos < 0) { + // We don't compress consensuses with this method. + return CONSDIFF_NOT_FOUND; + } + consensus_cache_entry_handle_t *handle = latest_consensus[flavor][pos]; + if (!handle) + return CONSDIFF_NOT_FOUND; + *entry_out = consensus_cache_entry_handle_get(handle); + if (entry_out) + return CONSDIFF_AVAILABLE; + else + return CONSDIFF_NOT_FOUND; +} + /** * Look up consensus_cache_entry_t for the consensus of type <b>flavor</b>, * from the source consensus with the specified digest (which must be SHA3). @@ -684,6 +706,42 @@ consdiffmgr_cleanup(void) smartlist_clear(diffs); } + // 3. Delete all consensuses except the most recent that are compressed with + // an un-preferred method. + for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) { + const char *flavname = networkstatus_get_flavor_name(flav); + /* Determine the most recent consensus of this flavor */ + consensus_cache_find_all(consensuses, cdm_cache_get(), + LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + consensus_cache_filter_list(consensuses, LABEL_FLAVOR, flavname); + consensus_cache_entry_t *most_recent = + sort_and_find_most_recent(consensuses); + if (most_recent == NULL) + continue; + const char *most_recent_sha3_uncompressed = + consensus_cache_entry_get_value(most_recent, + LABEL_SHA3_DIGEST_UNCOMPRESSED); + const char *retain_methodname = compression_method_get_name( + RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD); + + if (BUG(most_recent_sha3_uncompressed == NULL)) + continue; + SMARTLIST_FOREACH_BEGIN(consensuses, consensus_cache_entry_t *, ent) { + const char *lv_sha3_uncompressed = + consensus_cache_entry_get_value(ent, LABEL_SHA3_DIGEST_UNCOMPRESSED); + if (BUG(! lv_sha3_uncompressed)) + continue; + if (!strcmp(lv_sha3_uncompressed, most_recent_sha3_uncompressed)) + continue; // This _is_ the most recent. + const char *lv_methodname = + consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE); + if (! lv_methodname || strcmp(lv_methodname, retain_methodname)) { + consensus_cache_entry_mark_for_removal(ent); + ++n_to_delete; + } + } SMARTLIST_FOREACH_END(ent); + } + smartlist_free(objects); smartlist_free(consensuses); smartlist_free(diffs); @@ -787,10 +845,14 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor) // 1. find the most recent consensus, and the ones that we might want // to diff to it. + const char *methodname = compression_method_get_name( + RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD); + matches = smartlist_new(); consensus_cache_find_all(matches, cdm_cache_get(), LABEL_FLAVOR, flavname); consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + consensus_cache_filter_list(matches, LABEL_COMPRESSION_TYPE, methodname); consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches); if (!most_recent) { log_info(LD_DIRSERV, "No 'most recent' %s consensus found; " @@ -835,6 +897,10 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor) if (strmap_get(have_diff_from, va) != NULL) continue; /* we already have this one. */ smartlist_add(compute_diffs_from, ent); + /* Since we are not going to serve this as the most recent consensus + * any more, we should stop keeping it mmap'd when it's not in use. + */ + consensus_cache_entry_mark_for_aggressive_release(ent); } SMARTLIST_FOREACH_END(ent); log_info(LD_DIRSERV, @@ -880,6 +946,48 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor) } /** + * Scan the cache for the latest consensuses and add their handles to + * latest_consensus + */ +static void +consdiffmgr_consensus_load(void) +{ + smartlist_t *matches = smartlist_new(); + for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) { + const char *flavname = networkstatus_get_flavor_name(flav); + smartlist_clear(matches); + consensus_cache_find_all(matches, cdm_cache_get(), + LABEL_FLAVOR, flavname); + consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches); + if (! most_recent) + continue; // no consensuses. + const char *most_recent_sha3 = + consensus_cache_entry_get_value(most_recent, + LABEL_SHA3_DIGEST_UNCOMPRESSED); + if (BUG(most_recent_sha3 == NULL)) + continue; // LCOV_EXCL_LINE + consensus_cache_filter_list(matches, LABEL_SHA3_DIGEST_UNCOMPRESSED, + most_recent_sha3); + + // Everything that remains matches the most recent consensus of this + // flavor. + SMARTLIST_FOREACH_BEGIN(matches, consensus_cache_entry_t *, ent) { + const char *lv_compression = + consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE); + compress_method_t method = + compression_method_get_by_name(lv_compression); + int pos = consensus_compression_method_pos(method); + if (pos < 0) + continue; + consensus_cache_entry_handle_free(latest_consensus[flav][pos]); + latest_consensus[flav][pos] = consensus_cache_entry_handle_new(ent); + } SMARTLIST_FOREACH_END(ent); + } + smartlist_free(matches); +} + +/** * Scan the cache for diffs, and add them to the hashtable. */ static void @@ -936,6 +1044,7 @@ consdiffmgr_rescan(void) if (cdm_cache_loaded == 0) { consdiffmgr_diffs_load(); + consdiffmgr_consensus_load(); cdm_cache_loaded = 1; } @@ -1051,6 +1160,14 @@ consdiffmgr_free_all(void) next = HT_NEXT_RMV(cdm_diff_ht, &cdm_diff_ht, diff); cdm_diff_free(this); } + int i; + unsigned j; + for (i = 0; i < N_CONSENSUS_FLAVORS; ++i) { + for (j = 0; j < n_consensus_compression_methods(); ++j) { + consensus_cache_entry_handle_free(latest_consensus[i][j]); + } + } + memset(latest_consensus, 0, sizeof(latest_consensus)); consensus_cache_free(cons_diff_cache); cons_diff_cache = NULL; } @@ -1072,6 +1189,93 @@ typedef struct compressed_result_t { } compressed_result_t; /** + * Compress the bytestring <b>input</b> of length <b>len</b> using the + * <n>n_methods</b> compression methods listed in the array <b>methods</b>. + * + * For each successful compression, set the fields in the <b>results_out</b> + * array in the position corresponding to the compression method. Use + * <b>labels_in</b> as a basis for the labels of the result. + * + * Return 0 if all compression succeeded; -1 if any failed. + */ +static int +compress_multiple(compressed_result_t *results_out, int n_methods, + const compress_method_t *methods, + const uint8_t *input, size_t len, + const config_line_t *labels_in) +{ + int rv = 0; + int i; + for (i = 0; i < n_methods; ++i) { + compress_method_t method = methods[i]; + const char *methodname = compression_method_get_name(method); + char *result; + size_t sz; + if (0 == tor_compress(&result, &sz, (const char*)input, len, method)) { + results_out[i].body = (uint8_t*)result; + results_out[i].bodylen = sz; + results_out[i].labels = config_lines_dup(labels_in); + cdm_labels_prepend_sha3(&results_out[i].labels, LABEL_SHA3_DIGEST, + results_out[i].body, + results_out[i].bodylen); + config_line_prepend(&results_out[i].labels, + LABEL_COMPRESSION_TYPE, + methodname); + } else { + rv = -1; + } + } + return rv; +} + +/** + * Given an array of <b>n</b> compressed_result_t in <b>results</b>, + * as produced by compress_multiple, store them all into the + * consdiffmgr, and store handles to them in the <b>handles_out</b> + * array. + * + * Return CDM_DIFF_PRESENT if any was stored, and CDM_DIFF_ERROR if none + * was stored. + */ +static cdm_diff_status_t +store_multiple(consensus_cache_entry_handle_t **handles_out, + int n, + const compress_method_t *methods, + const compressed_result_t *results, + const char *description) +{ + cdm_diff_status_t status = CDM_DIFF_ERROR; + consdiffmgr_ensure_space_for_files(n); + + int i; + for (i = 0; i < n; ++i) { + compress_method_t method = methods[i]; + uint8_t *body_out = results[i].body; + size_t bodylen_out = results[i].bodylen; + config_line_t *labels = results[i].labels; + const char *methodname = compression_method_get_name(method); + if (body_out && bodylen_out && labels) { + /* Success! Store the results */ + log_info(LD_DIRSERV, "Adding %s, compressed with %s", + description, methodname); + + consensus_cache_entry_t *ent = + consensus_cache_add(cdm_cache_get(), + labels, + body_out, + bodylen_out); + if (BUG(ent == NULL)) + continue; + + status = CDM_DIFF_PRESENT; + handles_out[i] = consensus_cache_entry_handle_new(ent); + consensus_cache_entry_decref(ent); + } + } + return status; +} + +/** * An object passed to a worker thread that will try to produce a consensus * diff. */ @@ -1144,6 +1348,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_) const char *lv_to_valid_after = consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_AFTER); + const char *lv_to_fresh_until = + consensus_cache_entry_get_value(job->diff_to, LABEL_FRESH_UNTIL); + const char *lv_to_valid_until = + consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_UNTIL); + const char *lv_to_signatories = + consensus_cache_entry_get_value(job->diff_to, LABEL_SIGNATORIES); const char *lv_from_valid_after = consensus_cache_entry_get_value(job->diff_from, LABEL_VALID_AFTER); const char *lv_from_digest = @@ -1213,6 +1423,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_) job->out[0].bodylen = difflen; config_line_t *common_labels = NULL; + if (lv_to_valid_until) + config_line_prepend(&common_labels, LABEL_VALID_UNTIL, lv_to_valid_until); + if (lv_to_fresh_until) + config_line_prepend(&common_labels, LABEL_FRESH_UNTIL, lv_to_fresh_until); + if (lv_to_signatories) + config_line_prepend(&common_labels, LABEL_SIGNATORIES, lv_to_signatories); cdm_labels_prepend_sha3(&common_labels, LABEL_SHA3_DIGEST_UNCOMPRESSED, job->out[0].body, @@ -1235,24 +1451,10 @@ consensus_diff_worker_threadfn(void *state_, void *work_) job->out[0].body, job->out[0].bodylen); - unsigned u; - for (u = 1; u < n_diff_compression_methods(); ++u) { - compress_method_t method = compress_diffs_with[u]; - const char *methodname = compression_method_get_name(method); - char *result; - size_t sz; - if (0 == tor_compress(&result, &sz, consensus_diff, difflen, method)) { - job->out[u].body = (uint8_t*)result; - job->out[u].bodylen = sz; - job->out[u].labels = config_lines_dup(common_labels); - cdm_labels_prepend_sha3(&job->out[u].labels, LABEL_SHA3_DIGEST, - job->out[u].body, - job->out[u].bodylen); - config_line_prepend(&job->out[u].labels, - LABEL_COMPRESSION_TYPE, - methodname); - } - } + compress_multiple(job->out+1, + n_diff_compression_methods()-1, + compress_diffs_with+1, + (const uint8_t*)consensus_diff, difflen, common_labels); config_free_lines(common_labels); return WQ_RPL_REPLY; @@ -1318,36 +1520,20 @@ consensus_diff_worker_replyfn(void *work_) cache = 0; } - int status = CDM_DIFF_ERROR; consensus_cache_entry_handle_t *handles[ARRAY_LENGTH(compress_diffs_with)]; memset(handles, 0, sizeof(handles)); - consdiffmgr_ensure_space_for_files(n_diff_compression_methods()); - - unsigned u; - for (u = 0; u < n_diff_compression_methods(); ++u) { - compress_method_t method = compress_diffs_with[u]; - uint8_t *body_out = job->out[u].body; - size_t bodylen_out = job->out[u].bodylen; - config_line_t *labels = job->out[u].labels; - const char *methodname = compression_method_get_name(method); - if (body_out && bodylen_out && labels) { - /* Success! Store the results */ - log_info(LD_DIRSERV, "Adding consensus diff from %s to %s, " - "compressed with %s", - lv_from_digest, lv_to_digest, methodname); + char description[128]; + tor_snprintf(description, sizeof(description), + "consensus diff from %s to %s", + lv_from_digest, lv_to_digest); - consensus_cache_entry_t *ent = - consensus_cache_add(cdm_cache_get(), - labels, - body_out, - bodylen_out); + int status = store_multiple(handles, + n_diff_compression_methods(), + compress_diffs_with, + job->out, + description); - status = CDM_DIFF_PRESENT; - handles[u] = consensus_cache_entry_handle_new(ent); - consensus_cache_entry_decref(ent); - } - } if (status != CDM_DIFF_PRESENT) { /* Failure! Nothing to do but complain */ log_warn(LD_DIRSERV, @@ -1357,6 +1543,7 @@ consensus_diff_worker_replyfn(void *work_) status = CDM_DIFF_ERROR; } + unsigned u; for (u = 0; u < ARRAY_LENGTH(handles); ++u) { compress_method_t method = compress_diffs_with[u]; if (cache) { @@ -1408,3 +1595,229 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from, return -1; } +/** + * Holds requests and replies for consensus_compress_workers. + */ +typedef struct consensus_compress_worker_job_t { + char *consensus; + size_t consensus_len; + consensus_flavor_t flavor; + config_line_t *labels_in; + compressed_result_t out[ARRAY_LENGTH(compress_consensus_with)]; +} consensus_compress_worker_job_t; + +/** + * Free all resources held in <b>job</b> + */ +static void +consensus_compress_worker_job_free(consensus_compress_worker_job_t *job) +{ + if (!job) + return; + tor_free(job->consensus); + config_free_lines(job->labels_in); + unsigned u; + for (u = 0; u < n_consensus_compression_methods(); ++u) { + config_free_lines(job->out[u].labels); + tor_free(job->out[u].body); + } + tor_free(job); +} +/** + * Worker function. This function runs inside a worker thread and receives + * a consensus_compress_worker_job_t as its input. + */ +static workqueue_reply_t +consensus_compress_worker_threadfn(void *state_, void *work_) +{ + (void)state_; + consensus_compress_worker_job_t *job = work_; + consensus_flavor_t flavor = job->flavor; + const char *consensus = job->consensus; + size_t bodylen = job->consensus_len; + + config_line_t *labels = config_lines_dup(job->labels_in); + const char *flavname = networkstatus_get_flavor_name(flavor); + + cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED, + (const uint8_t *)consensus, bodylen); + { + const char *start, *end; + if (router_get_networkstatus_v3_signed_boundaries(consensus, + &start, &end) < 0) { + start = consensus; + end = consensus+bodylen; + } + cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED, + (const uint8_t *)start, + end - start); + } + config_line_prepend(&labels, LABEL_FLAVOR, flavname); + config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS); + + compress_multiple(job->out, + n_consensus_compression_methods(), + compress_consensus_with, + (const uint8_t*)consensus, bodylen, labels); + config_free_lines(labels); + return WQ_RPL_REPLY; +} + +/** + * Worker function: This function runs in the main thread, and receives + * a consensus_diff_compress_job_t that the worker thread has already + * processed. + */ +static void +consensus_compress_worker_replyfn(void *work_) +{ + consensus_compress_worker_job_t *job = work_; + + consensus_cache_entry_handle_t *handles[ + ARRAY_LENGTH(compress_consensus_with)]; + memset(handles, 0, sizeof(handles)); + + store_multiple(handles, + n_consensus_compression_methods(), + compress_consensus_with, + job->out, + "consensus"); + cdm_cache_dirty = 1; + + unsigned u; + consensus_flavor_t f = job->flavor; + tor_assert((int)f < N_CONSENSUS_FLAVORS); + for (u = 0; u < ARRAY_LENGTH(handles); ++u) { + if (handles[u] == NULL) + continue; + consensus_cache_entry_handle_free(latest_consensus[f][u]); + latest_consensus[f][u] = handles[u]; + } + + consensus_compress_worker_job_free(job); +} + +/** + * If true, we compress in worker threads. + */ +static int background_compression = 0; + +/** + * Queue a job to compress <b>consensus</b> and store its compressed + * text in the cache. + */ +static int +consensus_queue_compression_work(const char *consensus, + const networkstatus_t *as_parsed) +{ + tor_assert(consensus); + tor_assert(as_parsed); + + consensus_compress_worker_job_t *job = tor_malloc_zero(sizeof(*job)); + job->consensus = tor_strdup(consensus); + job->consensus_len = strlen(consensus); + job->flavor = as_parsed->flavor; + + char va_str[ISO_TIME_LEN+1]; + char vu_str[ISO_TIME_LEN+1]; + char fu_str[ISO_TIME_LEN+1]; + format_iso_time_nospace(va_str, as_parsed->valid_after); + format_iso_time_nospace(fu_str, as_parsed->fresh_until); + format_iso_time_nospace(vu_str, as_parsed->valid_until); + config_line_append(&job->labels_in, LABEL_VALID_AFTER, va_str); + config_line_append(&job->labels_in, LABEL_FRESH_UNTIL, fu_str); + config_line_append(&job->labels_in, LABEL_VALID_UNTIL, vu_str); + if (as_parsed->voters) { + smartlist_t *hexvoters = smartlist_new(); + SMARTLIST_FOREACH_BEGIN(as_parsed->voters, + networkstatus_voter_info_t *, vi) { + if (smartlist_len(vi->sigs) == 0) + continue; // didn't sign. + char d[HEX_DIGEST_LEN+1]; + base16_encode(d, sizeof(d), vi->identity_digest, DIGEST_LEN); + smartlist_add_strdup(hexvoters, d); + } SMARTLIST_FOREACH_END(vi); + char *signers = smartlist_join_strings(hexvoters, ",", 0, NULL); + config_line_prepend(&job->labels_in, LABEL_SIGNATORIES, signers); + tor_free(signers); + SMARTLIST_FOREACH(hexvoters, char *, cp, tor_free(cp)); + } + + if (background_compression) { + workqueue_entry_t *work; + work = cpuworker_queue_work(consensus_compress_worker_threadfn, + consensus_compress_worker_replyfn, + job); + if (!work) { + consensus_compress_worker_job_free(job); + return -1; + } + + return 0; + } else { + consensus_compress_worker_threadfn(NULL, job); + consensus_compress_worker_replyfn(job); + return 0; + } +} + +/** + * Tell the consdiffmgr backend to compress consensuses in worker threads. + */ +void +consdiffmgr_enable_background_compression(void) +{ + // This isn't the default behavior because it would break unit tests. + background_compression = 1; +} + +/** Read the set of voters from the cached object <b>ent</b> into + * <b>out</b>, as a list of hex-encoded digests. Return 0 on success, + * -1 if no signatories were recorded. */ +int +consensus_cache_entry_get_voter_id_digests(const consensus_cache_entry_t *ent, + smartlist_t *out) +{ + tor_assert(ent); + tor_assert(out); + const char *s; + s = consensus_cache_entry_get_value(ent, LABEL_SIGNATORIES); + if (s == NULL) + return -1; + smartlist_split_string(out, s, ",", SPLIT_SKIP_SPACE|SPLIT_STRIP_SPACE, 0); + return 0; +} + +/** Read the fresh-until time of cached object <b>ent</b> into *<b>out</b> + * and return 0, or return -1 if no such time was recorded. */ +int +consensus_cache_entry_get_fresh_until(const consensus_cache_entry_t *ent, + time_t *out) +{ + tor_assert(ent); + tor_assert(out); + const char *s; + s = consensus_cache_entry_get_value(ent, LABEL_FRESH_UNTIL); + if (s == NULL || parse_iso_time_nospace(s, out) < 0) + return -1; + else + return 0; +} + +/** Read the valid until timestamp from the cached object <b>ent</b> into + * *<b>out</b> and return 0, or return -1 if no such time was recorded. */ +int +consensus_cache_entry_get_valid_until(const consensus_cache_entry_t *ent, + time_t *out) +{ + tor_assert(ent); + tor_assert(out); + + const char *s; + s = consensus_cache_entry_get_value(ent, LABEL_VALID_UNTIL); + if (s == NULL || parse_iso_time_nospace(s, out) < 0) + return -1; + else + return 0; +} + |