summaryrefslogtreecommitdiff
path: root/src/or/consdiffmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/consdiffmgr.c')
-rw-r--r--src/or/consdiffmgr.c621
1 files changed, 517 insertions, 104 deletions
diff --git a/src/or/consdiffmgr.c b/src/or/consdiffmgr.c
index d478101c6b..910f842070 100644
--- a/src/or/consdiffmgr.c
+++ b/src/or/consdiffmgr.c
@@ -32,6 +32,15 @@
/* The valid-after time for a consensus (or for the target consensus of a
* diff), encoded as ISO UTC. */
#define LABEL_VALID_AFTER "consensus-valid-after"
+/* The fresh-until time for a consensus (or for the target consensus of a
+ * diff), encoded as ISO UTC. */
+#define LABEL_FRESH_UNTIL "consensus-fresh-until"
+/* The valid-until time for a consensus (or for the target consensus of a
+ * diff), encoded as ISO UTC. */
+#define LABEL_VALID_UNTIL "consensus-valid-until"
+/* Comma-separated list of hex-encoded identity digests for the voting
+ * authorities. */
+#define LABEL_SIGNATORIES "consensus-signatories"
/* A hex encoded SHA3 digest of the object, as compressed (if any) */
#define LABEL_SHA3_DIGEST "sha3-digest"
/* A hex encoded SHA3 digest of the object before compression. */
@@ -96,6 +105,36 @@ n_diff_compression_methods(void)
return ARRAY_LENGTH(compress_diffs_with);
}
+/** Which methods do we use for precompressing consensuses? */
+static const compress_method_t compress_consensus_with[] = {
+ ZLIB_METHOD,
+#ifdef HAVE_LZMA
+ LZMA_METHOD,
+#endif
+#ifdef HAVE_ZSTD
+ ZSTD_METHOD,
+#endif
+};
+
+/** How many different methods will we try to use for diff compression? */
+STATIC unsigned
+n_consensus_compression_methods(void)
+{
+ return ARRAY_LENGTH(compress_consensus_with);
+}
+
+/** For which compression method do we retain old consensuses? There's no
+ * need to keep all of them, since we won't be serving them. We'll
+ * go with ZLIB_METHOD because it's pretty fast and everyone has it.
+ */
+#define RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD ZLIB_METHOD
+
+/** Handles pointing to the latest consensus entries as compressed and
+ * stored. */
+static consensus_cache_entry_handle_t *
+ latest_consensus[N_CONSENSUS_FLAVORS]
+ [ARRAY_LENGTH(compress_consensus_with)];
+
/** Hashtable node used to remember the current status of the diff
* from a given sha3 digest to the current consensus. */
typedef struct cdm_diff_t {
@@ -135,13 +174,12 @@ static consdiff_cfg_t consdiff_cfg = {
};
static int consdiffmgr_ensure_space_for_files(int n);
+static int consensus_queue_compression_work(const char *consensus,
+ const networkstatus_t *as_parsed);
static int consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
consensus_cache_entry_t *diff_to);
static void consdiffmgr_set_cache_flags(void);
-/* Just gzip consensuses for now. */
-#define COMPRESS_CONSENSUS_WITH GZIP_METHOD
-
/* =====
* Hashtable setup
* ===== */
@@ -410,11 +448,6 @@ cdm_cache_lookup_consensus(consensus_flavor_t flavor, time_t valid_after)
consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
consensus_cache_entry_t *result = NULL;
- if (smartlist_len(matches) > 1) {
- log_warn(LD_BUG, "How odd; there appear to be two matching consensuses "
- "with flavor %s published at %s.",
- flavname, formatted_time);
- }
if (smartlist_len(matches)) {
result = smartlist_get(matches, 0);
}
@@ -458,59 +491,7 @@ consdiffmgr_add_consensus(const char *consensus,
}
/* We don't have it. Add it to the cache. */
- consdiffmgr_ensure_space_for_files(1);
-
- {
- size_t bodylen = strlen(consensus);
- config_line_t *labels = NULL;
- char formatted_time[ISO_TIME_LEN+1];
- format_iso_time_nospace(formatted_time, valid_after);
- const char *flavname = networkstatus_get_flavor_name(flavor);
-
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
- (const uint8_t *)consensus, bodylen);
- {
- const char *start, *end;
- if (router_get_networkstatus_v3_signed_boundaries(consensus,
- &start, &end) < 0) {
- start = consensus;
- end = consensus+bodylen;
- }
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
- (const uint8_t *)start,
- end - start);
- }
-
- char *body_compressed = NULL;
- size_t size_compressed = 0;
- if (tor_compress(&body_compressed, &size_compressed,
- consensus, bodylen, COMPRESS_CONSENSUS_WITH) < 0) {
- config_free_lines(labels);
- return -1;
- }
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST,
- (const uint8_t *)body_compressed, size_compressed);
- config_line_prepend(&labels, LABEL_COMPRESSION_TYPE,
- compression_method_get_name(COMPRESS_CONSENSUS_WITH));
- config_line_prepend(&labels, LABEL_FLAVOR, flavname);
- config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
- config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
-
- entry = consensus_cache_add(cdm_cache_get(),
- labels,
- (const uint8_t *)body_compressed,
- size_compressed);
- tor_free(body_compressed);
- config_free_lines(labels);
- }
-
- if (entry) {
- consensus_cache_entry_mark_for_aggressive_release(entry);
- consensus_cache_entry_decref(entry);
- }
-
- cdm_cache_dirty = 1;
- return entry ? 0 : -1;
+ return consensus_queue_compression_work(consensus, as_parsed);
}
/**
@@ -543,6 +524,47 @@ sort_and_find_most_recent(smartlist_t *lst)
}
}
+/** Return i such that compress_consensus_with[i] == method. Return
+ * -1 if no such i exists. */
+static int
+consensus_compression_method_pos(compress_method_t method)
+{
+ unsigned i;
+ for (i = 0; i < n_consensus_compression_methods(); ++i) {
+ if (compress_consensus_with[i] == method) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+/**
+ * If we know a consensus with the flavor <b>flavor</b> compressed with
+ * <b>method</b>, set *<b>entry_out</b> to that value. Return values are as
+ * for consdiffmgr_find_diff_from().
+ */
+consdiff_status_t
+consdiffmgr_find_consensus(struct consensus_cache_entry_t **entry_out,
+ consensus_flavor_t flavor,
+ compress_method_t method)
+{
+ tor_assert((int)flavor < N_CONSENSUS_FLAVORS);
+
+ int pos = consensus_compression_method_pos(method);
+ if (pos < 0) {
+ // We don't compress consensuses with this method.
+ return CONSDIFF_NOT_FOUND;
+ }
+ consensus_cache_entry_handle_t *handle = latest_consensus[flavor][pos];
+ if (!handle)
+ return CONSDIFF_NOT_FOUND;
+ *entry_out = consensus_cache_entry_handle_get(handle);
+ if (entry_out)
+ return CONSDIFF_AVAILABLE;
+ else
+ return CONSDIFF_NOT_FOUND;
+}
+
/**
* Look up consensus_cache_entry_t for the consensus of type <b>flavor</b>,
* from the source consensus with the specified digest (which must be SHA3).
@@ -684,6 +706,42 @@ consdiffmgr_cleanup(void)
smartlist_clear(diffs);
}
+ // 3. Delete all consensuses except the most recent that are compressed with
+ // an un-preferred method.
+ for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) {
+ const char *flavname = networkstatus_get_flavor_name(flav);
+ /* Determine the most recent consensus of this flavor */
+ consensus_cache_find_all(consensuses, cdm_cache_get(),
+ LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
+ consensus_cache_filter_list(consensuses, LABEL_FLAVOR, flavname);
+ consensus_cache_entry_t *most_recent =
+ sort_and_find_most_recent(consensuses);
+ if (most_recent == NULL)
+ continue;
+ const char *most_recent_sha3_uncompressed =
+ consensus_cache_entry_get_value(most_recent,
+ LABEL_SHA3_DIGEST_UNCOMPRESSED);
+ const char *retain_methodname = compression_method_get_name(
+ RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD);
+
+ if (BUG(most_recent_sha3_uncompressed == NULL))
+ continue;
+ SMARTLIST_FOREACH_BEGIN(consensuses, consensus_cache_entry_t *, ent) {
+ const char *lv_sha3_uncompressed =
+ consensus_cache_entry_get_value(ent, LABEL_SHA3_DIGEST_UNCOMPRESSED);
+ if (BUG(! lv_sha3_uncompressed))
+ continue;
+ if (!strcmp(lv_sha3_uncompressed, most_recent_sha3_uncompressed))
+ continue; // This _is_ the most recent.
+ const char *lv_methodname =
+ consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE);
+ if (! lv_methodname || strcmp(lv_methodname, retain_methodname)) {
+ consensus_cache_entry_mark_for_removal(ent);
+ ++n_to_delete;
+ }
+ } SMARTLIST_FOREACH_END(ent);
+ }
+
smartlist_free(objects);
smartlist_free(consensuses);
smartlist_free(diffs);
@@ -787,10 +845,14 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
// 1. find the most recent consensus, and the ones that we might want
// to diff to it.
+ const char *methodname = compression_method_get_name(
+ RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD);
+
matches = smartlist_new();
consensus_cache_find_all(matches, cdm_cache_get(),
LABEL_FLAVOR, flavname);
consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
+ consensus_cache_filter_list(matches, LABEL_COMPRESSION_TYPE, methodname);
consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches);
if (!most_recent) {
log_info(LD_DIRSERV, "No 'most recent' %s consensus found; "
@@ -835,6 +897,10 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
if (strmap_get(have_diff_from, va) != NULL)
continue; /* we already have this one. */
smartlist_add(compute_diffs_from, ent);
+ /* Since we are not going to serve this as the most recent consensus
+ * any more, we should stop keeping it mmap'd when it's not in use.
+ */
+ consensus_cache_entry_mark_for_aggressive_release(ent);
} SMARTLIST_FOREACH_END(ent);
log_info(LD_DIRSERV,
@@ -880,6 +946,48 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
}
/**
+ * Scan the cache for the latest consensuses and add their handles to
+ * latest_consensus
+ */
+static void
+consdiffmgr_consensus_load(void)
+{
+ smartlist_t *matches = smartlist_new();
+ for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) {
+ const char *flavname = networkstatus_get_flavor_name(flav);
+ smartlist_clear(matches);
+ consensus_cache_find_all(matches, cdm_cache_get(),
+ LABEL_FLAVOR, flavname);
+ consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
+ consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches);
+ if (! most_recent)
+ continue; // no consensuses.
+ const char *most_recent_sha3 =
+ consensus_cache_entry_get_value(most_recent,
+ LABEL_SHA3_DIGEST_UNCOMPRESSED);
+ if (BUG(most_recent_sha3 == NULL))
+ continue; // LCOV_EXCL_LINE
+ consensus_cache_filter_list(matches, LABEL_SHA3_DIGEST_UNCOMPRESSED,
+ most_recent_sha3);
+
+ // Everything that remains matches the most recent consensus of this
+ // flavor.
+ SMARTLIST_FOREACH_BEGIN(matches, consensus_cache_entry_t *, ent) {
+ const char *lv_compression =
+ consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE);
+ compress_method_t method =
+ compression_method_get_by_name(lv_compression);
+ int pos = consensus_compression_method_pos(method);
+ if (pos < 0)
+ continue;
+ consensus_cache_entry_handle_free(latest_consensus[flav][pos]);
+ latest_consensus[flav][pos] = consensus_cache_entry_handle_new(ent);
+ } SMARTLIST_FOREACH_END(ent);
+ }
+ smartlist_free(matches);
+}
+
+/**
* Scan the cache for diffs, and add them to the hashtable.
*/
static void
@@ -936,6 +1044,7 @@ consdiffmgr_rescan(void)
if (cdm_cache_loaded == 0) {
consdiffmgr_diffs_load();
+ consdiffmgr_consensus_load();
cdm_cache_loaded = 1;
}
@@ -1051,6 +1160,14 @@ consdiffmgr_free_all(void)
next = HT_NEXT_RMV(cdm_diff_ht, &cdm_diff_ht, diff);
cdm_diff_free(this);
}
+ int i;
+ unsigned j;
+ for (i = 0; i < N_CONSENSUS_FLAVORS; ++i) {
+ for (j = 0; j < n_consensus_compression_methods(); ++j) {
+ consensus_cache_entry_handle_free(latest_consensus[i][j]);
+ }
+ }
+ memset(latest_consensus, 0, sizeof(latest_consensus));
consensus_cache_free(cons_diff_cache);
cons_diff_cache = NULL;
}
@@ -1072,6 +1189,93 @@ typedef struct compressed_result_t {
} compressed_result_t;
/**
+ * Compress the bytestring <b>input</b> of length <b>len</b> using the
+ * <n>n_methods</b> compression methods listed in the array <b>methods</b>.
+ *
+ * For each successful compression, set the fields in the <b>results_out</b>
+ * array in the position corresponding to the compression method. Use
+ * <b>labels_in</b> as a basis for the labels of the result.
+ *
+ * Return 0 if all compression succeeded; -1 if any failed.
+ */
+static int
+compress_multiple(compressed_result_t *results_out, int n_methods,
+ const compress_method_t *methods,
+ const uint8_t *input, size_t len,
+ const config_line_t *labels_in)
+{
+ int rv = 0;
+ int i;
+ for (i = 0; i < n_methods; ++i) {
+ compress_method_t method = methods[i];
+ const char *methodname = compression_method_get_name(method);
+ char *result;
+ size_t sz;
+ if (0 == tor_compress(&result, &sz, (const char*)input, len, method)) {
+ results_out[i].body = (uint8_t*)result;
+ results_out[i].bodylen = sz;
+ results_out[i].labels = config_lines_dup(labels_in);
+ cdm_labels_prepend_sha3(&results_out[i].labels, LABEL_SHA3_DIGEST,
+ results_out[i].body,
+ results_out[i].bodylen);
+ config_line_prepend(&results_out[i].labels,
+ LABEL_COMPRESSION_TYPE,
+ methodname);
+ } else {
+ rv = -1;
+ }
+ }
+ return rv;
+}
+
+/**
+ * Given an array of <b>n</b> compressed_result_t in <b>results</b>,
+ * as produced by compress_multiple, store them all into the
+ * consdiffmgr, and store handles to them in the <b>handles_out</b>
+ * array.
+ *
+ * Return CDM_DIFF_PRESENT if any was stored, and CDM_DIFF_ERROR if none
+ * was stored.
+ */
+static cdm_diff_status_t
+store_multiple(consensus_cache_entry_handle_t **handles_out,
+ int n,
+ const compress_method_t *methods,
+ const compressed_result_t *results,
+ const char *description)
+{
+ cdm_diff_status_t status = CDM_DIFF_ERROR;
+ consdiffmgr_ensure_space_for_files(n);
+
+ int i;
+ for (i = 0; i < n; ++i) {
+ compress_method_t method = methods[i];
+ uint8_t *body_out = results[i].body;
+ size_t bodylen_out = results[i].bodylen;
+ config_line_t *labels = results[i].labels;
+ const char *methodname = compression_method_get_name(method);
+ if (body_out && bodylen_out && labels) {
+ /* Success! Store the results */
+ log_info(LD_DIRSERV, "Adding %s, compressed with %s",
+ description, methodname);
+
+ consensus_cache_entry_t *ent =
+ consensus_cache_add(cdm_cache_get(),
+ labels,
+ body_out,
+ bodylen_out);
+ if (BUG(ent == NULL))
+ continue;
+
+ status = CDM_DIFF_PRESENT;
+ handles_out[i] = consensus_cache_entry_handle_new(ent);
+ consensus_cache_entry_decref(ent);
+ }
+ }
+ return status;
+}
+
+/**
* An object passed to a worker thread that will try to produce a consensus
* diff.
*/
@@ -1144,6 +1348,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_)
const char *lv_to_valid_after =
consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_AFTER);
+ const char *lv_to_fresh_until =
+ consensus_cache_entry_get_value(job->diff_to, LABEL_FRESH_UNTIL);
+ const char *lv_to_valid_until =
+ consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_UNTIL);
+ const char *lv_to_signatories =
+ consensus_cache_entry_get_value(job->diff_to, LABEL_SIGNATORIES);
const char *lv_from_valid_after =
consensus_cache_entry_get_value(job->diff_from, LABEL_VALID_AFTER);
const char *lv_from_digest =
@@ -1213,6 +1423,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_)
job->out[0].bodylen = difflen;
config_line_t *common_labels = NULL;
+ if (lv_to_valid_until)
+ config_line_prepend(&common_labels, LABEL_VALID_UNTIL, lv_to_valid_until);
+ if (lv_to_fresh_until)
+ config_line_prepend(&common_labels, LABEL_FRESH_UNTIL, lv_to_fresh_until);
+ if (lv_to_signatories)
+ config_line_prepend(&common_labels, LABEL_SIGNATORIES, lv_to_signatories);
cdm_labels_prepend_sha3(&common_labels,
LABEL_SHA3_DIGEST_UNCOMPRESSED,
job->out[0].body,
@@ -1235,24 +1451,10 @@ consensus_diff_worker_threadfn(void *state_, void *work_)
job->out[0].body,
job->out[0].bodylen);
- unsigned u;
- for (u = 1; u < n_diff_compression_methods(); ++u) {
- compress_method_t method = compress_diffs_with[u];
- const char *methodname = compression_method_get_name(method);
- char *result;
- size_t sz;
- if (0 == tor_compress(&result, &sz, consensus_diff, difflen, method)) {
- job->out[u].body = (uint8_t*)result;
- job->out[u].bodylen = sz;
- job->out[u].labels = config_lines_dup(common_labels);
- cdm_labels_prepend_sha3(&job->out[u].labels, LABEL_SHA3_DIGEST,
- job->out[u].body,
- job->out[u].bodylen);
- config_line_prepend(&job->out[u].labels,
- LABEL_COMPRESSION_TYPE,
- methodname);
- }
- }
+ compress_multiple(job->out+1,
+ n_diff_compression_methods()-1,
+ compress_diffs_with+1,
+ (const uint8_t*)consensus_diff, difflen, common_labels);
config_free_lines(common_labels);
return WQ_RPL_REPLY;
@@ -1318,36 +1520,20 @@ consensus_diff_worker_replyfn(void *work_)
cache = 0;
}
- int status = CDM_DIFF_ERROR;
consensus_cache_entry_handle_t *handles[ARRAY_LENGTH(compress_diffs_with)];
memset(handles, 0, sizeof(handles));
- consdiffmgr_ensure_space_for_files(n_diff_compression_methods());
-
- unsigned u;
- for (u = 0; u < n_diff_compression_methods(); ++u) {
- compress_method_t method = compress_diffs_with[u];
- uint8_t *body_out = job->out[u].body;
- size_t bodylen_out = job->out[u].bodylen;
- config_line_t *labels = job->out[u].labels;
- const char *methodname = compression_method_get_name(method);
- if (body_out && bodylen_out && labels) {
- /* Success! Store the results */
- log_info(LD_DIRSERV, "Adding consensus diff from %s to %s, "
- "compressed with %s",
- lv_from_digest, lv_to_digest, methodname);
+ char description[128];
+ tor_snprintf(description, sizeof(description),
+ "consensus diff from %s to %s",
+ lv_from_digest, lv_to_digest);
- consensus_cache_entry_t *ent =
- consensus_cache_add(cdm_cache_get(),
- labels,
- body_out,
- bodylen_out);
+ int status = store_multiple(handles,
+ n_diff_compression_methods(),
+ compress_diffs_with,
+ job->out,
+ description);
- status = CDM_DIFF_PRESENT;
- handles[u] = consensus_cache_entry_handle_new(ent);
- consensus_cache_entry_decref(ent);
- }
- }
if (status != CDM_DIFF_PRESENT) {
/* Failure! Nothing to do but complain */
log_warn(LD_DIRSERV,
@@ -1357,6 +1543,7 @@ consensus_diff_worker_replyfn(void *work_)
status = CDM_DIFF_ERROR;
}
+ unsigned u;
for (u = 0; u < ARRAY_LENGTH(handles); ++u) {
compress_method_t method = compress_diffs_with[u];
if (cache) {
@@ -1408,3 +1595,229 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
return -1;
}
+/**
+ * Holds requests and replies for consensus_compress_workers.
+ */
+typedef struct consensus_compress_worker_job_t {
+ char *consensus;
+ size_t consensus_len;
+ consensus_flavor_t flavor;
+ config_line_t *labels_in;
+ compressed_result_t out[ARRAY_LENGTH(compress_consensus_with)];
+} consensus_compress_worker_job_t;
+
+/**
+ * Free all resources held in <b>job</b>
+ */
+static void
+consensus_compress_worker_job_free(consensus_compress_worker_job_t *job)
+{
+ if (!job)
+ return;
+ tor_free(job->consensus);
+ config_free_lines(job->labels_in);
+ unsigned u;
+ for (u = 0; u < n_consensus_compression_methods(); ++u) {
+ config_free_lines(job->out[u].labels);
+ tor_free(job->out[u].body);
+ }
+ tor_free(job);
+}
+/**
+ * Worker function. This function runs inside a worker thread and receives
+ * a consensus_compress_worker_job_t as its input.
+ */
+static workqueue_reply_t
+consensus_compress_worker_threadfn(void *state_, void *work_)
+{
+ (void)state_;
+ consensus_compress_worker_job_t *job = work_;
+ consensus_flavor_t flavor = job->flavor;
+ const char *consensus = job->consensus;
+ size_t bodylen = job->consensus_len;
+
+ config_line_t *labels = config_lines_dup(job->labels_in);
+ const char *flavname = networkstatus_get_flavor_name(flavor);
+
+ cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
+ (const uint8_t *)consensus, bodylen);
+ {
+ const char *start, *end;
+ if (router_get_networkstatus_v3_signed_boundaries(consensus,
+ &start, &end) < 0) {
+ start = consensus;
+ end = consensus+bodylen;
+ }
+ cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
+ (const uint8_t *)start,
+ end - start);
+ }
+ config_line_prepend(&labels, LABEL_FLAVOR, flavname);
+ config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
+
+ compress_multiple(job->out,
+ n_consensus_compression_methods(),
+ compress_consensus_with,
+ (const uint8_t*)consensus, bodylen, labels);
+ config_free_lines(labels);
+ return WQ_RPL_REPLY;
+}
+
+/**
+ * Worker function: This function runs in the main thread, and receives
+ * a consensus_diff_compress_job_t that the worker thread has already
+ * processed.
+ */
+static void
+consensus_compress_worker_replyfn(void *work_)
+{
+ consensus_compress_worker_job_t *job = work_;
+
+ consensus_cache_entry_handle_t *handles[
+ ARRAY_LENGTH(compress_consensus_with)];
+ memset(handles, 0, sizeof(handles));
+
+ store_multiple(handles,
+ n_consensus_compression_methods(),
+ compress_consensus_with,
+ job->out,
+ "consensus");
+ cdm_cache_dirty = 1;
+
+ unsigned u;
+ consensus_flavor_t f = job->flavor;
+ tor_assert((int)f < N_CONSENSUS_FLAVORS);
+ for (u = 0; u < ARRAY_LENGTH(handles); ++u) {
+ if (handles[u] == NULL)
+ continue;
+ consensus_cache_entry_handle_free(latest_consensus[f][u]);
+ latest_consensus[f][u] = handles[u];
+ }
+
+ consensus_compress_worker_job_free(job);
+}
+
+/**
+ * If true, we compress in worker threads.
+ */
+static int background_compression = 0;
+
+/**
+ * Queue a job to compress <b>consensus</b> and store its compressed
+ * text in the cache.
+ */
+static int
+consensus_queue_compression_work(const char *consensus,
+ const networkstatus_t *as_parsed)
+{
+ tor_assert(consensus);
+ tor_assert(as_parsed);
+
+ consensus_compress_worker_job_t *job = tor_malloc_zero(sizeof(*job));
+ job->consensus = tor_strdup(consensus);
+ job->consensus_len = strlen(consensus);
+ job->flavor = as_parsed->flavor;
+
+ char va_str[ISO_TIME_LEN+1];
+ char vu_str[ISO_TIME_LEN+1];
+ char fu_str[ISO_TIME_LEN+1];
+ format_iso_time_nospace(va_str, as_parsed->valid_after);
+ format_iso_time_nospace(fu_str, as_parsed->fresh_until);
+ format_iso_time_nospace(vu_str, as_parsed->valid_until);
+ config_line_append(&job->labels_in, LABEL_VALID_AFTER, va_str);
+ config_line_append(&job->labels_in, LABEL_FRESH_UNTIL, fu_str);
+ config_line_append(&job->labels_in, LABEL_VALID_UNTIL, vu_str);
+ if (as_parsed->voters) {
+ smartlist_t *hexvoters = smartlist_new();
+ SMARTLIST_FOREACH_BEGIN(as_parsed->voters,
+ networkstatus_voter_info_t *, vi) {
+ if (smartlist_len(vi->sigs) == 0)
+ continue; // didn't sign.
+ char d[HEX_DIGEST_LEN+1];
+ base16_encode(d, sizeof(d), vi->identity_digest, DIGEST_LEN);
+ smartlist_add_strdup(hexvoters, d);
+ } SMARTLIST_FOREACH_END(vi);
+ char *signers = smartlist_join_strings(hexvoters, ",", 0, NULL);
+ config_line_prepend(&job->labels_in, LABEL_SIGNATORIES, signers);
+ tor_free(signers);
+ SMARTLIST_FOREACH(hexvoters, char *, cp, tor_free(cp));
+ }
+
+ if (background_compression) {
+ workqueue_entry_t *work;
+ work = cpuworker_queue_work(consensus_compress_worker_threadfn,
+ consensus_compress_worker_replyfn,
+ job);
+ if (!work) {
+ consensus_compress_worker_job_free(job);
+ return -1;
+ }
+
+ return 0;
+ } else {
+ consensus_compress_worker_threadfn(NULL, job);
+ consensus_compress_worker_replyfn(job);
+ return 0;
+ }
+}
+
+/**
+ * Tell the consdiffmgr backend to compress consensuses in worker threads.
+ */
+void
+consdiffmgr_enable_background_compression(void)
+{
+ // This isn't the default behavior because it would break unit tests.
+ background_compression = 1;
+}
+
+/** Read the set of voters from the cached object <b>ent</b> into
+ * <b>out</b>, as a list of hex-encoded digests. Return 0 on success,
+ * -1 if no signatories were recorded. */
+int
+consensus_cache_entry_get_voter_id_digests(const consensus_cache_entry_t *ent,
+ smartlist_t *out)
+{
+ tor_assert(ent);
+ tor_assert(out);
+ const char *s;
+ s = consensus_cache_entry_get_value(ent, LABEL_SIGNATORIES);
+ if (s == NULL)
+ return -1;
+ smartlist_split_string(out, s, ",", SPLIT_SKIP_SPACE|SPLIT_STRIP_SPACE, 0);
+ return 0;
+}
+
+/** Read the fresh-until time of cached object <b>ent</b> into *<b>out</b>
+ * and return 0, or return -1 if no such time was recorded. */
+int
+consensus_cache_entry_get_fresh_until(const consensus_cache_entry_t *ent,
+ time_t *out)
+{
+ tor_assert(ent);
+ tor_assert(out);
+ const char *s;
+ s = consensus_cache_entry_get_value(ent, LABEL_FRESH_UNTIL);
+ if (s == NULL || parse_iso_time_nospace(s, out) < 0)
+ return -1;
+ else
+ return 0;
+}
+
+/** Read the valid until timestamp from the cached object <b>ent</b> into
+ * *<b>out</b> and return 0, or return -1 if no such time was recorded. */
+int
+consensus_cache_entry_get_valid_until(const consensus_cache_entry_t *ent,
+ time_t *out)
+{
+ tor_assert(ent);
+ tor_assert(out);
+
+ const char *s;
+ s = consensus_cache_entry_get_value(ent, LABEL_VALID_UNTIL);
+ if (s == NULL || parse_iso_time_nospace(s, out) < 0)
+ return -1;
+ else
+ return 0;
+}
+