diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/evloop/.may_include | 15 | ||||
-rw-r--r-- | src/lib/evloop/compat_libevent.c | 535 | ||||
-rw-r--r-- | src/lib/evloop/compat_libevent.h | 99 | ||||
-rw-r--r-- | src/lib/evloop/include.am | 26 | ||||
-rw-r--r-- | src/lib/evloop/procmon.c | 336 | ||||
-rw-r--r-- | src/lib/evloop/procmon.h | 34 | ||||
-rw-r--r-- | src/lib/evloop/timers.c | 324 | ||||
-rw-r--r-- | src/lib/evloop/timers.h | 31 | ||||
-rw-r--r-- | src/lib/evloop/token_bucket.c | 258 | ||||
-rw-r--r-- | src/lib/evloop/token_bucket.h | 118 | ||||
-rw-r--r-- | src/lib/evloop/workqueue.c | 682 | ||||
-rw-r--r-- | src/lib/evloop/workqueue.h | 65 |
12 files changed, 2523 insertions, 0 deletions
diff --git a/src/lib/evloop/.may_include b/src/lib/evloop/.may_include new file mode 100644 index 0000000000..205f41b38f --- /dev/null +++ b/src/lib/evloop/.may_include @@ -0,0 +1,15 @@ +orconfig.h + +lib/cc/*.h +lib/crypt_ops/*.h +lib/evloop/*.h +lib/intmath/*.h +lib/log/*.h +lib/malloc/*.h +lib/net/*.h +lib/string/*.h +lib/testsupport/*.h +lib/thread/*.h +lib/time/*.h + +src/ext/timeouts/timeout.c diff --git a/src/lib/evloop/compat_libevent.c b/src/lib/evloop/compat_libevent.c new file mode 100644 index 0000000000..9d21cf20bd --- /dev/null +++ b/src/lib/evloop/compat_libevent.c @@ -0,0 +1,535 @@ +/* Copyright (c) 2009-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file compat_libevent.c + * \brief Wrappers and utility functions for Libevent. + */ + +#include "orconfig.h" +#define COMPAT_LIBEVENT_PRIVATE +#include "common/compat_libevent.h" + +#include "lib/crypt_ops/crypto_rand.h" +#include "lib/log/torlog.h" +#include "lib/log/util_bug.h" +#include "lib/string/compat_string.h" + +#include <event2/event.h> +#include <event2/thread.h> +#include <string.h> + +/** A string which, if it appears in a libevent log, should be ignored. */ +static const char *suppress_msg = NULL; +/** Callback function passed to event_set_log() so we can intercept + * log messages from libevent. */ +STATIC void +libevent_logging_callback(int severity, const char *msg) +{ + char buf[1024]; + size_t n; + if (suppress_msg && strstr(msg, suppress_msg)) + return; + n = strlcpy(buf, msg, sizeof(buf)); + if (n && n < sizeof(buf) && buf[n-1] == '\n') { + buf[n-1] = '\0'; + } + switch (severity) { + case _EVENT_LOG_DEBUG: + log_debug(LD_NOCB|LD_NET, "Message from libevent: %s", buf); + break; + case _EVENT_LOG_MSG: + log_info(LD_NOCB|LD_NET, "Message from libevent: %s", buf); + break; + case _EVENT_LOG_WARN: + log_warn(LD_NOCB|LD_GENERAL, "Warning from libevent: %s", buf); + break; + case _EVENT_LOG_ERR: + log_err(LD_NOCB|LD_GENERAL, "Error from libevent: %s", buf); + break; + default: + log_warn(LD_NOCB|LD_GENERAL, "Message [%d] from libevent: %s", + severity, buf); + break; + } +} +/** Set hook to intercept log messages from libevent. */ +void +configure_libevent_logging(void) +{ + event_set_log_callback(libevent_logging_callback); +} + +/** Ignore any libevent log message that contains <b>msg</b>. */ +void +suppress_libevent_log_msg(const char *msg) +{ + suppress_msg = msg; +} + +/* Wrapper for event_free() that tolerates tor_event_free(NULL) */ +void +tor_event_free_(struct event *ev) +{ + if (ev == NULL) + return; + event_free(ev); +} + +/** Global event base for use by the main thread. */ +static struct event_base *the_event_base = NULL; + +/** + * @defgroup postloop post-loop event helpers + * + * If we're not careful, Libevent can susceptible to infinite event chains: + * one event can activate another, whose callback activates another, whose + * callback activates another, ad infinitum. While this is happening, + * Libevent won't be checking timeouts, socket-based events, signals, and so + * on. + * + * We solve this problem by marking some events as "post-loop". A post-loop + * event behaves like any ordinary event, but any events that _it_ activates + * cannot run until Libevent has checked for other events at least once. + * + * @{ */ + +/** + * An event that stops Libevent from running any more events on the current + * iteration of its loop, until it has re-checked for socket events, signal + * events, timeouts, etc. + */ +static struct event *rescan_mainloop_ev = NULL; + +/** + * Callback to implement rescan_mainloop_ev: it simply exits the mainloop, + * and relies on Tor to re-enter the mainloop since no error has occurred. + */ +static void +rescan_mainloop_cb(evutil_socket_t fd, short events, void *arg) +{ + (void)fd; + (void)events; + struct event_base *the_base = arg; + event_base_loopbreak(the_base); +} + +/** @} */ + +/* This is what passes for version detection on OSX. We set + * MACOSX_KQUEUE_IS_BROKEN to true iff we're on a version of OSX before + * 10.4.0 (aka 1040). */ +#ifdef __APPLE__ +#ifdef __ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ +#define MACOSX_KQUEUE_IS_BROKEN \ + (__ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ < 1040) +#else +#define MACOSX_KQUEUE_IS_BROKEN 0 +#endif /* defined(__ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__) */ +#endif /* defined(__APPLE__) */ + +/** Initialize the Libevent library and set up the event base. */ +void +tor_libevent_initialize(tor_libevent_cfg *torcfg) +{ + tor_assert(the_event_base == NULL); + /* some paths below don't use torcfg, so avoid unused variable warnings */ + (void)torcfg; + + { + int attempts = 0; + struct event_config *cfg; + + ++attempts; + cfg = event_config_new(); + tor_assert(cfg); + + /* Telling Libevent not to try to turn locking on can avoid a needless + * socketpair() attempt. */ + event_config_set_flag(cfg, EVENT_BASE_FLAG_NOLOCK); + + if (torcfg->num_cpus > 0) + event_config_set_num_cpus_hint(cfg, torcfg->num_cpus); + + /* We can enable changelist support with epoll, since we don't give + * Libevent any dup'd fds. This lets us avoid some syscalls. */ + event_config_set_flag(cfg, EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST); + + the_event_base = event_base_new_with_config(cfg); + + event_config_free(cfg); + } + + if (!the_event_base) { + /* LCOV_EXCL_START */ + log_err(LD_GENERAL, "Unable to initialize Libevent: cannot continue."); + exit(1); // exit ok: libevent is broken. + /* LCOV_EXCL_STOP */ + } + + rescan_mainloop_ev = event_new(the_event_base, -1, 0, + rescan_mainloop_cb, the_event_base); + if (!rescan_mainloop_ev) { + /* LCOV_EXCL_START */ + log_err(LD_GENERAL, "Unable to create rescan event: cannot continue."); + exit(1); // exit ok: libevent is broken. + /* LCOV_EXCL_STOP */ + } + + log_info(LD_GENERAL, + "Initialized libevent version %s using method %s. Good.", + event_get_version(), tor_libevent_get_method()); +} + +/** Return the current Libevent event base that we're set up to use. */ +MOCK_IMPL(struct event_base *, +tor_libevent_get_base, (void)) +{ + tor_assert(the_event_base != NULL); + return the_event_base; +} + +/** Return the name of the Libevent backend we're using. */ +const char * +tor_libevent_get_method(void) +{ + return event_base_get_method(the_event_base); +} + +/** Return a string representation of the version of the currently running + * version of Libevent. */ +const char * +tor_libevent_get_version_str(void) +{ + return event_get_version(); +} + +/** Return a string representation of the version of Libevent that was used +* at compilation time. */ +const char * +tor_libevent_get_header_version_str(void) +{ + return LIBEVENT_VERSION; +} + +/** Represents a timer that's run every N microseconds by Libevent. */ +struct periodic_timer_t { + /** Underlying event used to implement this periodic event. */ + struct event *ev; + /** The callback we'll be invoking whenever the event triggers */ + void (*cb)(struct periodic_timer_t *, void *); + /** User-supplied data for the callback */ + void *data; +}; + +/** Libevent callback to implement a periodic event. */ +static void +periodic_timer_cb(evutil_socket_t fd, short what, void *arg) +{ + periodic_timer_t *timer = arg; + (void) what; + (void) fd; + timer->cb(timer, timer->data); +} + +/** Create and schedule a new timer that will run every <b>tv</b> in + * the event loop of <b>base</b>. When the timer fires, it will + * run the timer in <b>cb</b> with the user-supplied data in <b>data</b>. */ +periodic_timer_t * +periodic_timer_new(struct event_base *base, + const struct timeval *tv, + void (*cb)(periodic_timer_t *timer, void *data), + void *data) +{ + periodic_timer_t *timer; + tor_assert(base); + tor_assert(tv); + tor_assert(cb); + timer = tor_malloc_zero(sizeof(periodic_timer_t)); + if (!(timer->ev = tor_event_new(base, -1, EV_PERSIST, + periodic_timer_cb, timer))) { + tor_free(timer); + return NULL; + } + timer->cb = cb; + timer->data = data; + periodic_timer_launch(timer, tv); + return timer; +} + +/** + * Launch the timer <b>timer</b> to run at <b>tv</b> from now, and every + * <b>tv</b> thereafter. + * + * If the timer is already enabled, this function does nothing. + */ +void +periodic_timer_launch(periodic_timer_t *timer, const struct timeval *tv) +{ + tor_assert(timer); + if (event_pending(timer->ev, EV_TIMEOUT, NULL)) + return; + event_add(timer->ev, tv); +} + +/** + * Disable the provided <b>timer</b>, but do not free it. + * + * You can reenable the same timer later with periodic_timer_launch. + * + * If the timer is already disabled, this function does nothing. + */ +void +periodic_timer_disable(periodic_timer_t *timer) +{ + tor_assert(timer); + (void) event_del(timer->ev); +} + +/** Stop and free a periodic timer */ +void +periodic_timer_free_(periodic_timer_t *timer) +{ + if (!timer) + return; + tor_event_free(timer->ev); + tor_free(timer); +} + +/** + * Type used to represent events that run directly from the main loop, + * either because they are activated from elsewhere in the code, or + * because they have a simple timeout. + * + * We use this type to avoid exposing Libevent's API throughout the rest + * of the codebase. + * + * This type can't be used for all events: it doesn't handle events that + * are triggered by signals or by sockets. + */ +struct mainloop_event_t { + struct event *ev; + void (*cb)(mainloop_event_t *, void *); + void *userdata; +}; + +/** + * Internal: Implements mainloop event using a libevent event. + */ +static void +mainloop_event_cb(evutil_socket_t fd, short what, void *arg) +{ + (void)fd; + (void)what; + mainloop_event_t *mev = arg; + mev->cb(mev, mev->userdata); +} + +/** + * As mainloop_event_cb, but implements a post-loop event. + */ +static void +mainloop_event_postloop_cb(evutil_socket_t fd, short what, void *arg) +{ + (void)fd; + (void)what; + + /* Note that if rescan_mainloop_ev is already activated, + * event_active() will do nothing: only the first post-loop event that + * happens each time through the event loop will cause it to be + * activated. + * + * Because event_active() puts events on a FIFO queue, every event + * that is made active _after_ rescan_mainloop_ev will get its + * callback run after rescan_mainloop_cb is called -- that is, on the + * next iteration of the loop. + */ + event_active(rescan_mainloop_ev, EV_READ, 1); + + mainloop_event_t *mev = arg; + mev->cb(mev, mev->userdata); +} + +/** + * Helper for mainloop_event_new() and mainloop_event_postloop_new(). + */ +static mainloop_event_t * +mainloop_event_new_impl(int postloop, + void (*cb)(mainloop_event_t *, void *), + void *userdata) +{ + tor_assert(cb); + + struct event_base *base = tor_libevent_get_base(); + mainloop_event_t *mev = tor_malloc_zero(sizeof(mainloop_event_t)); + mev->ev = tor_event_new(base, -1, 0, + postloop ? mainloop_event_postloop_cb : mainloop_event_cb, + mev); + tor_assert(mev->ev); + mev->cb = cb; + mev->userdata = userdata; + return mev; +} + +/** + * Create and return a new mainloop_event_t to run the function <b>cb</b>. + * + * When run, the callback function will be passed the mainloop_event_t + * and <b>userdata</b> as its arguments. The <b>userdata</b> pointer + * must remain valid for as long as the mainloop_event_t event exists: + * it is your responsibility to free it. + * + * The event is not scheduled by default: Use mainloop_event_activate() + * or mainloop_event_schedule() to make it run. + */ +mainloop_event_t * +mainloop_event_new(void (*cb)(mainloop_event_t *, void *), + void *userdata) +{ + return mainloop_event_new_impl(0, cb, userdata); +} + +/** + * As mainloop_event_new(), but create a post-loop event. + * + * A post-loop event behaves like any ordinary event, but any events + * that _it_ activates cannot run until Libevent has checked for other + * events at least once. + */ +mainloop_event_t * +mainloop_event_postloop_new(void (*cb)(mainloop_event_t *, void *), + void *userdata) +{ + return mainloop_event_new_impl(1, cb, userdata); +} + +/** + * Schedule <b>event</b> to run in the main loop, immediately. If it is + * not scheduled, it will run anyway. If it is already scheduled to run + * later, it will run now instead. This function will have no effect if + * the event is already scheduled to run. + * + * This function may only be called from the main thread. + */ +void +mainloop_event_activate(mainloop_event_t *event) +{ + tor_assert(event); + event_active(event->ev, EV_READ, 1); +} + +/** Schedule <b>event</b> to run in the main loop, after a delay of <b>tv</b>. + * + * If the event is scheduled for a different time, cancel it and run + * after this delay instead. If the event is currently pending to run + * <em>now</b>, has no effect. + * + * Do not call this function with <b>tv</b> == NULL -- use + * mainloop_event_activate() instead. + * + * This function may only be called from the main thread. + */ +int +mainloop_event_schedule(mainloop_event_t *event, const struct timeval *tv) +{ + tor_assert(event); + if (BUG(tv == NULL)) { + // LCOV_EXCL_START + mainloop_event_activate(event); + return 0; + // LCOV_EXCL_STOP + } + return event_add(event->ev, tv); +} + +/** Cancel <b>event</b> if it is currently active or pending. (Do nothing if + * the event is not currently active or pending.) */ +void +mainloop_event_cancel(mainloop_event_t *event) +{ + if (!event) + return; + (void) event_del(event->ev); +} + +/** Cancel <b>event</b> and release all storage associated with it. */ +void +mainloop_event_free_(mainloop_event_t *event) +{ + if (!event) + return; + tor_event_free(event->ev); + memset(event, 0xb8, sizeof(*event)); + tor_free(event); +} + +int +tor_init_libevent_rng(void) +{ + int rv = 0; + char buf[256]; + if (evutil_secure_rng_init() < 0) { + rv = -1; + } + crypto_rand(buf, 32); +#ifdef HAVE_EVUTIL_SECURE_RNG_ADD_BYTES + evutil_secure_rng_add_bytes(buf, 32); +#endif + evutil_secure_rng_get_bytes(buf, sizeof(buf)); + return rv; +} + +/** + * Un-initialize libevent in preparation for an exit + */ +void +tor_libevent_free_all(void) +{ + tor_event_free(rescan_mainloop_ev); + if (the_event_base) + event_base_free(the_event_base); + the_event_base = NULL; +} + +/** + * Run the event loop for the provided event_base, handling events until + * something stops it. If <b>once</b> is set, then just poll-and-run + * once, then exit. Return 0 on success, -1 if an error occurred, or 1 + * if we exited because no events were pending or active. + * + * This isn't reentrant or multithreaded. + */ +int +tor_libevent_run_event_loop(struct event_base *base, int once) +{ + const int flags = once ? EVLOOP_ONCE : 0; + return event_base_loop(base, flags); +} + +/** Tell the event loop to exit after <b>delay</b>. If <b>delay</b> is NULL, + * instead exit after we're done running the currently active events. */ +void +tor_libevent_exit_loop_after_delay(struct event_base *base, + const struct timeval *delay) +{ + event_base_loopexit(base, delay); +} + +/** Tell the event loop to exit after running whichever callback is currently + * active. */ +void +tor_libevent_exit_loop_after_callback(struct event_base *base) +{ + event_base_loopbreak(base); +} + +#if defined(TOR_UNIT_TESTS) +/** For testing: called post-fork to make libevent reinitialize + * kernel structures. */ +void +tor_libevent_postfork(void) +{ + int r = event_reinit(tor_libevent_get_base()); + tor_assert(r == 0); +} +#endif /* defined(TOR_UNIT_TESTS) */ diff --git a/src/lib/evloop/compat_libevent.h b/src/lib/evloop/compat_libevent.h new file mode 100644 index 0000000000..0a50cfa667 --- /dev/null +++ b/src/lib/evloop/compat_libevent.h @@ -0,0 +1,99 @@ +/* Copyright (c) 2009-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_COMPAT_LIBEVENT_H +#define TOR_COMPAT_LIBEVENT_H + +#include "orconfig.h" +#include "lib/testsupport/testsupport.h" +#include "lib/malloc/util_malloc.h" + +void configure_libevent_logging(void); +void suppress_libevent_log_msg(const char *msg); + +#define tor_event_new event_new +#define tor_evtimer_new evtimer_new +#define tor_evsignal_new evsignal_new +#define tor_evdns_add_server_port(sock, tcp, cb, data) \ + evdns_add_server_port_with_base(tor_libevent_get_base(), \ + (sock),(tcp),(cb),(data)); + +struct event; +struct event_base; +struct timeval; + +void tor_event_free_(struct event *ev); +#define tor_event_free(ev) \ + FREE_AND_NULL(struct event, tor_event_free_, (ev)) + +typedef struct periodic_timer_t periodic_timer_t; + +periodic_timer_t *periodic_timer_new(struct event_base *base, + const struct timeval *tv, + void (*cb)(periodic_timer_t *timer, void *data), + void *data); +void periodic_timer_free_(periodic_timer_t *); +void periodic_timer_launch(periodic_timer_t *, const struct timeval *tv); +void periodic_timer_disable(periodic_timer_t *); +#define periodic_timer_free(t) \ + FREE_AND_NULL(periodic_timer_t, periodic_timer_free_, (t)) + +typedef struct mainloop_event_t mainloop_event_t; +mainloop_event_t *mainloop_event_new(void (*cb)(mainloop_event_t *, void *), + void *userdata); +mainloop_event_t * mainloop_event_postloop_new( + void (*cb)(mainloop_event_t *, void *), + void *userdata); +void mainloop_event_activate(mainloop_event_t *event); +int mainloop_event_schedule(mainloop_event_t *event, + const struct timeval *delay); +void mainloop_event_cancel(mainloop_event_t *event); +void mainloop_event_free_(mainloop_event_t *event); +#define mainloop_event_free(event) \ + FREE_AND_NULL(mainloop_event_t, mainloop_event_free_, (event)) + +/** Defines a configuration for using libevent with Tor: passed as an argument + * to tor_libevent_initialize() to describe how we want to set up. */ +typedef struct tor_libevent_cfg { + /** How many CPUs should we use (not currently useful). */ + int num_cpus; + /** How many milliseconds should we allow between updating bandwidth limits? + * (Not currently useful). */ + int msec_per_tick; +} tor_libevent_cfg; + +void tor_libevent_initialize(tor_libevent_cfg *cfg); +MOCK_DECL(struct event_base *, tor_libevent_get_base, (void)); +const char *tor_libevent_get_method(void); +void tor_check_libevent_header_compatibility(void); +const char *tor_libevent_get_version_str(void); +const char *tor_libevent_get_header_version_str(void); +void tor_libevent_free_all(void); + +int tor_init_libevent_rng(void); + +#ifdef TOR_UNIT_TESTS +void tor_libevent_postfork(void); +#endif + +int tor_libevent_run_event_loop(struct event_base *base, int once); +void tor_libevent_exit_loop_after_delay(struct event_base *base, + const struct timeval *delay); +void tor_libevent_exit_loop_after_callback(struct event_base *base); + +#ifdef COMPAT_LIBEVENT_PRIVATE + +/** Macro: returns the number of a Libevent version as a 4-byte number, + with the first three bytes representing the major, minor, and patchlevel + respectively of the library. The fourth byte is unused. + + This is equivalent to the format of LIBEVENT_VERSION_NUMBER on Libevent + 2.0.1 or later. */ +#define V(major, minor, patch) \ + (((major) << 24) | ((minor) << 16) | ((patch) << 8)) + +STATIC void +libevent_logging_callback(int severity, const char *msg); +#endif /* defined(COMPAT_LIBEVENT_PRIVATE) */ + +#endif /* !defined(TOR_COMPAT_LIBEVENT_H) */ diff --git a/src/lib/evloop/include.am b/src/lib/evloop/include.am new file mode 100644 index 0000000000..6b0076272a --- /dev/null +++ b/src/lib/evloop/include.am @@ -0,0 +1,26 @@ + +noinst_LIBRARIES += src/lib/libtor-evloop.a + +if UNITTESTS_ENABLED +noinst_LIBRARIES += src/lib/libtor-evloop-testing.a +endif + +src_lib_libtor_evloop_a_SOURCES = \ + src/lib/evloop/compat_libevent.c \ + src/lib/evloop/procmon.c \ + src/lib/evloop/timers.c \ + src/lib/evloop/token_bucket.c \ + src/lib/evloop/workqueue.c + + +src_lib_libtor_evloop_testing_a_SOURCES = \ + $(src_lib_libtor_evloop_a_SOURCES) +src_lib_libtor_evloop_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS) +src_lib_libtor_evloop_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) + +noinst_HEADERS += \ + src/lib/evloop/compat_libevent.h \ + src/lib/evloop/procmon.h \ + src/lib/evloop/timers.h \ + src/lib/evloop/token_bucket.h \ + src/lib/evloop/workqueue.h diff --git a/src/lib/evloop/procmon.c b/src/lib/evloop/procmon.c new file mode 100644 index 0000000000..6c2b3e71e5 --- /dev/null +++ b/src/lib/evloop/procmon.c @@ -0,0 +1,336 @@ +/* Copyright (c) 2011-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file procmon.c + * \brief Process-termination monitor functions + **/ + +#include "common/procmon.h" + +#include "lib/log/torlog.h" +#include "lib/log/util_bug.h" +#include "lib/log/win32err.h" +#include "lib/malloc/util_malloc.h" +#include "lib/string/parse_int.h" + +#ifdef HAVE_SIGNAL_H +#include <signal.h> +#endif +#ifdef HAVE_ERRNO_H +#include <errno.h> +#endif + +#ifdef _WIN32 +#include <winsock2.h> +#include <windows.h> +#endif + +#if (0 == SIZEOF_PID_T) && defined(_WIN32) +/* Windows does not define pid_t sometimes, but _getpid() returns an int. + * Everybody else needs to have a pid_t. */ +typedef int pid_t; +#define PID_T_FORMAT "%d" +#elif (SIZEOF_PID_T == SIZEOF_INT) || (SIZEOF_PID_T == SIZEOF_SHORT) +#define PID_T_FORMAT "%d" +#elif (SIZEOF_PID_T == SIZEOF_LONG) +#define PID_T_FORMAT "%ld" +#elif (SIZEOF_PID_T == 8) +#define PID_T_FORMAT "%"PRId64 +#else +#error Unknown: SIZEOF_PID_T +#endif /* (0 == SIZEOF_PID_T) && defined(_WIN32) || ... */ + +/* Define to 1 if process-termination monitors on this OS and Libevent + version must poll for process termination themselves. */ +#define PROCMON_POLLS 1 +/* Currently we need to poll in some way on all systems. */ + +#ifdef PROCMON_POLLS +static void tor_process_monitor_poll_cb(periodic_timer_t *ev, + void *procmon_); +#endif + +/* This struct may contain pointers into the original process + * specifier string, but it should *never* contain anything which + * needs to be freed. */ +/* DOCDOC parsed_process_specifier_t */ +struct parsed_process_specifier_t { + pid_t pid; +}; + +/** Parse the process specifier given in <b>process_spec</b> into + * *<b>ppspec</b>. Return 0 on success; return -1 and store an error + * message into *<b>msg</b> on failure. The caller must not free the + * returned error message. */ +static int +parse_process_specifier(const char *process_spec, + struct parsed_process_specifier_t *ppspec, + const char **msg) +{ + long pid_l; + int pid_ok = 0; + char *pspec_next; + + /* If we're lucky, long will turn out to be large enough to hold a + * PID everywhere that Tor runs. */ + pid_l = tor_parse_long(process_spec, 10, 1, LONG_MAX, &pid_ok, &pspec_next); + + /* Reserve room in the ‘process specifier’ for additional + * (platform-specific) identifying information beyond the PID, to + * make our process-existence checks a bit less racy in a future + * version. */ + if ((*pspec_next != 0) && (*pspec_next != ' ') && (*pspec_next != ':')) { + pid_ok = 0; + } + + ppspec->pid = (pid_t)(pid_l); + if (!pid_ok || (pid_l != (long)(ppspec->pid))) { + *msg = "invalid PID"; + goto err; + } + + return 0; + err: + return -1; +} + +/* DOCDOC tor_process_monitor_t */ +struct tor_process_monitor_t { + /** Log domain for warning messages. */ + log_domain_mask_t log_domain; + + /** All systems: The best we can do in general is poll for the + * process's existence by PID periodically, and hope that the kernel + * doesn't reassign the same PID to another process between our + * polls. */ + pid_t pid; + +#ifdef _WIN32 + /** Windows-only: Should we poll hproc? If false, poll pid + * instead. */ + int poll_hproc; + + /** Windows-only: Get a handle to the process (if possible) and + * periodically check whether the process we have a handle to has + * ended. */ + HANDLE hproc; + /* XXXX We should have Libevent watch hproc for us, + * if/when some version of Libevent can be told to do so. */ +#endif /* defined(_WIN32) */ + + /* XXXX On Linux, we can and should receive the 22nd + * (space-delimited) field (‘starttime’) of /proc/$PID/stat from the + * owning controller and store it, and poll once in a while to see + * whether it has changed -- if so, the kernel has *definitely* + * reassigned the owning controller's PID and we should exit. On + * FreeBSD, we can do the same trick using either the 8th + * space-delimited field of /proc/$PID/status on the seven FBSD + * systems whose admins have mounted procfs, or the start-time field + * of the process-information structure returned by kvmgetprocs() on + * any system. The latter is ickier. */ + + /* XXXX On FreeBSD (and possibly other kqueue systems), we can and + * should arrange to receive EVFILT_PROC NOTE_EXIT notifications for + * pid, so we don't have to do such a heavyweight poll operation in + * order to avoid the PID-reassignment race condition. (We would + * still need to poll our own kqueue periodically until some version + * of Libevent 2.x learns to receive these events for us.) */ + + /** A Libevent event structure, to either poll for the process's + * existence or receive a notification when the process ends. */ + periodic_timer_t *e; + + /** A callback to be called when the process ends. */ + tor_procmon_callback_t cb; + void *cb_arg; /**< A user-specified pointer to be passed to cb. */ +}; + +/** Verify that the process specifier given in <b>process_spec</b> is + * syntactically valid. Return 0 on success; return -1 and store an + * error message into *<b>msg</b> on failure. The caller must not + * free the returned error message. */ +int +tor_validate_process_specifier(const char *process_spec, + const char **msg) +{ + struct parsed_process_specifier_t ppspec; + + tor_assert(msg != NULL); + *msg = NULL; + + return parse_process_specifier(process_spec, &ppspec, msg); +} + +/* DOCDOC poll_interval_tv */ +static const struct timeval poll_interval_tv = {15, 0}; + +/** Create a process-termination monitor for the process specifier + * given in <b>process_spec</b>. Return a newly allocated + * tor_process_monitor_t on success; return NULL and store an error + * message into *<b>msg</b> on failure. The caller must not free + * the returned error message. + * + * When the monitored process terminates, call + * <b>cb</b>(<b>cb_arg</b>). + */ +tor_process_monitor_t * +tor_process_monitor_new(struct event_base *base, + const char *process_spec, + log_domain_mask_t log_domain, + tor_procmon_callback_t cb, void *cb_arg, + const char **msg) +{ + tor_process_monitor_t *procmon = tor_malloc_zero( + sizeof(tor_process_monitor_t)); + struct parsed_process_specifier_t ppspec; + + tor_assert(msg != NULL); + *msg = NULL; + + if (procmon == NULL) { + *msg = "out of memory"; + goto err; + } + + procmon->log_domain = log_domain; + + if (parse_process_specifier(process_spec, &ppspec, msg)) + goto err; + + procmon->pid = ppspec.pid; + +#ifdef _WIN32 + procmon->hproc = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, + FALSE, + procmon->pid); + + if (procmon->hproc != NULL) { + procmon->poll_hproc = 1; + log_info(procmon->log_domain, "Successfully opened handle to process " + PID_T_FORMAT"; " + "monitoring it.", + procmon->pid); + } else { + /* If we couldn't get a handle to the process, we'll try again the + * first time we poll. */ + log_info(procmon->log_domain, "Failed to open handle to process " + PID_T_FORMAT"; will " + "try again later.", + procmon->pid); + } +#endif /* defined(_WIN32) */ + + procmon->cb = cb; + procmon->cb_arg = cb_arg; + +#ifdef PROCMON_POLLS + procmon->e = periodic_timer_new(base, + &poll_interval_tv, + tor_process_monitor_poll_cb, procmon); +#else /* !(defined(PROCMON_POLLS)) */ +#error OOPS? +#endif /* defined(PROCMON_POLLS) */ + + return procmon; + err: + tor_process_monitor_free(procmon); + return NULL; +} + +#ifdef PROCMON_POLLS +/** Libevent callback to poll for the existence of the process + * monitored by <b>procmon_</b>. */ +static void +tor_process_monitor_poll_cb(periodic_timer_t *event, void *procmon_) +{ + (void)event; + tor_process_monitor_t *procmon = (tor_process_monitor_t *)(procmon_); + int its_dead_jim; + + tor_assert(procmon != NULL); + +#ifdef _WIN32 + if (procmon->poll_hproc) { + DWORD exit_code; + if (!GetExitCodeProcess(procmon->hproc, &exit_code)) { + char *errmsg = format_win32_error(GetLastError()); + log_warn(procmon->log_domain, "Error \"%s\" occurred while polling " + "handle for monitored process "PID_T_FORMAT"; assuming " + "it's dead.", + errmsg, procmon->pid); + tor_free(errmsg); + its_dead_jim = 1; + } else { + its_dead_jim = (exit_code != STILL_ACTIVE); + } + } else { + /* All we can do is try to open the process, and look at the error + * code if it fails again. */ + procmon->hproc = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, + FALSE, + procmon->pid); + + if (procmon->hproc != NULL) { + log_info(procmon->log_domain, "Successfully opened handle to monitored " + "process "PID_T_FORMAT".", + procmon->pid); + its_dead_jim = 0; + procmon->poll_hproc = 1; + } else { + DWORD err_code = GetLastError(); + char *errmsg = format_win32_error(err_code); + + /* When I tested OpenProcess's error codes on Windows 7, I + * received error code 5 (ERROR_ACCESS_DENIED) for PIDs of + * existing processes that I could not open and error code 87 + * (ERROR_INVALID_PARAMETER) for PIDs that were not in use. + * Since the nonexistent-process error code is sane, I'm going + * to assume that all errors other than ERROR_INVALID_PARAMETER + * mean that the process we are monitoring is still alive. */ + its_dead_jim = (err_code == ERROR_INVALID_PARAMETER); + + if (!its_dead_jim) + log_info(procmon->log_domain, "Failed to open handle to monitored " + "process "PID_T_FORMAT", and error code %lu (%s) is not " + "'invalid parameter' -- assuming the process is still alive.", + procmon->pid, + err_code, errmsg); + + tor_free(errmsg); + } + } +#else /* !(defined(_WIN32)) */ + /* Unix makes this part easy, if a bit racy. */ + its_dead_jim = kill(procmon->pid, 0); + its_dead_jim = its_dead_jim && (errno == ESRCH); +#endif /* defined(_WIN32) */ + + tor_log(its_dead_jim ? LOG_NOTICE : LOG_INFO, + procmon->log_domain, "Monitored process "PID_T_FORMAT" is %s.", + procmon->pid, + its_dead_jim ? "dead" : "still alive"); + + if (its_dead_jim) { + procmon->cb(procmon->cb_arg); + } +} +#endif /* defined(PROCMON_POLLS) */ + +/** Free the process-termination monitor <b>procmon</b>. */ +void +tor_process_monitor_free_(tor_process_monitor_t *procmon) +{ + if (procmon == NULL) + return; + +#ifdef _WIN32 + if (procmon->hproc != NULL) + CloseHandle(procmon->hproc); +#endif + + if (procmon->e != NULL) + periodic_timer_free(procmon->e); + + tor_free(procmon); +} diff --git a/src/lib/evloop/procmon.h b/src/lib/evloop/procmon.h new file mode 100644 index 0000000000..b8daeed0db --- /dev/null +++ b/src/lib/evloop/procmon.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2011-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file procmon.h + * \brief Headers for procmon.c + **/ + +#ifndef TOR_PROCMON_H +#define TOR_PROCMON_H + +#include "common/compat_libevent.h" + +#include "lib/log/torlog.h" + +typedef struct tor_process_monitor_t tor_process_monitor_t; + +/* DOCDOC tor_procmon_callback_t */ +typedef void (*tor_procmon_callback_t)(void *); + +int tor_validate_process_specifier(const char *process_spec, + const char **msg); +tor_process_monitor_t *tor_process_monitor_new(struct event_base *base, + const char *process_spec, + log_domain_mask_t log_domain, + tor_procmon_callback_t cb, + void *cb_arg, + const char **msg); +void tor_process_monitor_free_(tor_process_monitor_t *procmon); +#define tor_process_monitor_free(procmon) \ + FREE_AND_NULL(tor_process_monitor_t, tor_process_monitor_free_, (procmon)) + +#endif /* !defined(TOR_PROCMON_H) */ + diff --git a/src/lib/evloop/timers.c b/src/lib/evloop/timers.c new file mode 100644 index 0000000000..ff92a2e447 --- /dev/null +++ b/src/lib/evloop/timers.c @@ -0,0 +1,324 @@ +/* Copyright (c) 2016-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file timers.c + * \brief Wrapper around William Ahern's fast hierarchical timer wheel + * implementation, to tie it in with a libevent backend. + * + * Only use these functions from the main thread. + * + * The main advantage of tor_timer_t over using libevent's timers is that + * they're way more efficient if we need to have thousands or millions of + * them. For more information, see + * http://www.25thandclement.com/~william/projects/timeout.c.html + * + * Periodic timers are available in the backend, but I've turned them off. + * We can turn them back on if needed. + */ + +/* Notes: + * + * Having a way to free all timers on shutdown would free people from the + * need to track them. Not sure if that's clever though. + * + * In an ideal world, Libevent would just switch to use this backend, and we + * could throw this file away. But even if Libevent does switch, we'll be + * stuck with legacy libevents for some time. + */ + +#include "orconfig.h" + +#define TOR_TIMERS_PRIVATE + +#include "common/compat_libevent.h" +#include "common/timers.h" +#include "lib/intmath/muldiv.h" +#include "lib/log/torlog.h" +#include "lib/log/util_bug.h" +#include "lib/malloc/util_malloc.h" +#include "lib/time/compat_time.h" + +#ifdef _WIN32 +// For struct timeval. +#include <winsock2.h> +#endif + +struct timeout_cb { + timer_cb_fn_t cb; + void *arg; +}; + +/* + * These definitions are for timeouts.c and timeouts.h. + */ +#ifdef __GNUC__ +/* We're not exposing any of the functions outside this file. */ +#define TIMEOUT_PUBLIC __attribute__((__unused__)) static +#else +/* We're not exposing any of the functions outside this file. */ +#define TIMEOUT_PUBLIC static +#endif /* defined(__GNUC__) */ +/* We're not using periodic events. */ +#define TIMEOUT_DISABLE_INTERVALS +/* We always know the global_timeouts object, so we don't need each timeout + * to keep a pointer to it. */ +#define TIMEOUT_DISABLE_RELATIVE_ACCESS +/* We're providing our own struct timeout_cb. */ +#define TIMEOUT_CB_OVERRIDE +/* We're going to support timers that are pretty far out in advance. Making + * this big can be inefficient, but having a significant number of timers + * above TIMEOUT_MAX can also be super-inefficient. Choosing 5 here sets + * timeout_max to 2^30 ticks, or 29 hours with our value for USEC_PER_TICK */ +#define WHEEL_NUM 5 +#if SIZEOF_VOID_P == 4 +/* On 32-bit platforms, we want to override wheel_bit, so that timeout.c will + * use 32-bit math. */ +#define WHEEL_BIT 5 +#endif +#include "src/ext/timeouts/timeout.c" + +static struct timeouts *global_timeouts = NULL; +static struct mainloop_event_t *global_timer_event = NULL; + +static monotime_t start_of_time; + +/** We need to choose this value carefully. Because we're using timer wheels, + * it actually costs us to have extra resolution we don't use. So for now, + * I'm going to define our resolution as .1 msec, and hope that's good enough. + * + * Note that two of the most popular libevent backends (epoll without timerfd, + * and windows select), simply can't support sub-millisecond resolution, + * do this is optimistic for a lot of users. + */ +#define USEC_PER_TICK 100 + +/** One million microseconds in a second */ +#define USEC_PER_SEC 1000000 + +/** Check at least once every N seconds. */ +#define MIN_CHECK_SECONDS 3600 + +/** Check at least once every N ticks. */ +#define MIN_CHECK_TICKS \ + (((timeout_t)MIN_CHECK_SECONDS) * (1000000 / USEC_PER_TICK)) + +/** + * Convert the timeval in <b>tv</b> to a timeout_t, and return it. + * + * The output resolution is set by USEC_PER_TICK. Only use this to convert + * delays to number of ticks; the time represented by 0 is undefined. + */ +static timeout_t +tv_to_timeout(const struct timeval *tv) +{ + uint64_t usec = tv->tv_usec; + usec += ((uint64_t)USEC_PER_SEC) * tv->tv_sec; + return usec / USEC_PER_TICK; +} + +/** + * Convert the timeout in <b>t</b> to a timeval in <b>tv_out</b>. Only + * use this for delays, not absolute times. + */ +static void +timeout_to_tv(timeout_t t, struct timeval *tv_out) +{ + t *= USEC_PER_TICK; + tv_out->tv_usec = (int)(t % USEC_PER_SEC); + tv_out->tv_sec = (time_t)(t / USEC_PER_SEC); +} + +/** + * Update the timer <b>tv</b> to the current time in <b>tv</b>. + */ +static void +timer_advance_to_cur_time(const monotime_t *now) +{ + timeout_t cur_tick = CEIL_DIV(monotime_diff_usec(&start_of_time, now), + USEC_PER_TICK); + timeouts_update(global_timeouts, cur_tick); +} + +/** + * Adjust the time at which the libevent timer should fire based on + * the next-expiring time in <b>global_timeouts</b> + */ +static void +libevent_timer_reschedule(void) +{ + monotime_t now; + monotime_get(&now); + timer_advance_to_cur_time(&now); + + timeout_t delay = timeouts_timeout(global_timeouts); + + struct timeval d; + if (delay > MIN_CHECK_TICKS) + delay = MIN_CHECK_TICKS; + timeout_to_tv(delay, &d); + mainloop_event_schedule(global_timer_event, &d); +} + +/** Run the callback of every timer that has expired, based on the current + * output of monotime_get(). */ +STATIC void +timers_run_pending(void) +{ + monotime_t now; + monotime_get(&now); + timer_advance_to_cur_time(&now); + + tor_timer_t *t; + while ((t = timeouts_get(global_timeouts))) { + t->callback.cb(t, t->callback.arg, &now); + } +} + +/** + * Invoked when the libevent timer has expired: see which tor_timer_t events + * have fired, activate their callbacks, and reschedule the libevent timer. + */ +static void +libevent_timer_callback(mainloop_event_t *ev, void *arg) +{ + (void)ev; + (void)arg; + + timers_run_pending(); + + libevent_timer_reschedule(); +} + +/** + * Initialize the timers subsystem. Requires that libevent has already been + * initialized. + */ +void +timers_initialize(void) +{ + if (BUG(global_timeouts)) + return; // LCOV_EXCL_LINE + + timeout_error_t err = 0; + global_timeouts = timeouts_open(0, &err); + if (!global_timeouts) { + // LCOV_EXCL_START -- this can only fail on malloc failure. + log_err(LD_BUG, "Unable to open timer backend: %s", strerror(err)); + tor_assert(0); + // LCOV_EXCL_STOP + } + + monotime_init(); + monotime_get(&start_of_time); + + mainloop_event_t *timer_event; + timer_event = mainloop_event_new(libevent_timer_callback, NULL); + tor_assert(timer_event); + global_timer_event = timer_event; + + libevent_timer_reschedule(); +} + +/** + * Release all storage held in the timers subsystem. Does not fire timers. + */ +void +timers_shutdown(void) +{ + if (global_timer_event) { + mainloop_event_free(global_timer_event); + global_timer_event = NULL; + } + if (global_timeouts) { + timeouts_close(global_timeouts); + global_timeouts = NULL; + } +} + +/** + * Allocate and return a new timer, with given callback and argument. + */ +tor_timer_t * +timer_new(timer_cb_fn_t cb, void *arg) +{ + tor_timer_t *t = tor_malloc(sizeof(tor_timer_t)); + timeout_init(t, 0); + timer_set_cb(t, cb, arg); + return t; +} + +/** + * Release all storage held by <b>t</b>, and unschedule it if was already + * scheduled. + */ +void +timer_free_(tor_timer_t *t) +{ + if (! t) + return; + + timeouts_del(global_timeouts, t); + tor_free(t); +} + +/** + * Change the callback and argument associated with a timer <b>t</b>. + */ +void +timer_set_cb(tor_timer_t *t, timer_cb_fn_t cb, void *arg) +{ + t->callback.cb = cb; + t->callback.arg = arg; +} + +/** + * Set *<b>cb_out</b> (if provided) to this timer's callback function, + * and *<b>arg_out</b> (if provided) to this timer's callback argument. + */ +void +timer_get_cb(const tor_timer_t *t, + timer_cb_fn_t *cb_out, void **arg_out) +{ + if (cb_out) + *cb_out = t->callback.cb; + if (arg_out) + *arg_out = t->callback.arg; +} + +/** + * Schedule the timer t to fire at the current time plus a delay of + * <b>delay</b> microseconds. All times are relative to monotime_get(). + */ +void +timer_schedule(tor_timer_t *t, const struct timeval *tv) +{ + const timeout_t delay = tv_to_timeout(tv); + + monotime_t now; + monotime_get(&now); + timer_advance_to_cur_time(&now); + + /* Take the old timeout value. */ + timeout_t to = timeouts_timeout(global_timeouts); + + timeouts_add(global_timeouts, t, delay); + + /* Should we update the libevent timer? */ + if (to <= delay) { + return; /* we're already going to fire before this timer would trigger. */ + } + libevent_timer_reschedule(); +} + +/** + * Cancel the timer <b>t</b> if it is currently scheduled. (It's okay to call + * this on an unscheduled timer. + */ +void +timer_disable(tor_timer_t *t) +{ + timeouts_del(global_timeouts, t); + /* We don't reschedule the libevent timer here, since it's okay if it fires + * early. */ +} diff --git a/src/lib/evloop/timers.h b/src/lib/evloop/timers.h new file mode 100644 index 0000000000..2348c7b7c1 --- /dev/null +++ b/src/lib/evloop/timers.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2016-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_TIMERS_H +#define TOR_TIMERS_H + +#include "orconfig.h" +#include "lib/testsupport/testsupport.h" + +struct monotime_t; +typedef struct timeout tor_timer_t; +typedef void (*timer_cb_fn_t)(tor_timer_t *, void *, + const struct monotime_t *); +tor_timer_t *timer_new(timer_cb_fn_t cb, void *arg); +void timer_set_cb(tor_timer_t *t, timer_cb_fn_t cb, void *arg); +void timer_get_cb(const tor_timer_t *t, + timer_cb_fn_t *cb_out, void **arg_out); +void timer_schedule(tor_timer_t *t, const struct timeval *delay); +void timer_disable(tor_timer_t *t); +void timer_free_(tor_timer_t *t); +#define timer_free(t) FREE_AND_NULL(tor_timer_t, timer_free_, (t)) + +void timers_initialize(void); +void timers_shutdown(void); + +#ifdef TOR_TIMERS_PRIVATE +STATIC void timers_run_pending(void); +#endif + +#endif /* !defined(TOR_TIMERS_H) */ + diff --git a/src/lib/evloop/token_bucket.c b/src/lib/evloop/token_bucket.c new file mode 100644 index 0000000000..f7b092f612 --- /dev/null +++ b/src/lib/evloop/token_bucket.c @@ -0,0 +1,258 @@ +/* Copyright (c) 2018-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket.c + * \brief Functions to use and manipulate token buckets, used for + * rate-limiting on connections and globally. + * + * Tor uses these token buckets to keep track of bandwidth usage, and + * sometimes other things too. + * + * There are two layers of abstraction here: "raw" token buckets, in which all + * the pieces are decoupled, and "read-write" token buckets, which combine all + * the moving parts into one. + * + * Token buckets may become negative. + **/ + +#define TOKEN_BUCKET_PRIVATE + +#include "common/token_bucket.h" +#include "lib/log/util_bug.h" +#include "lib/intmath/cmp.h" +#include "lib/time/compat_time.h" + +#include <string.h> + +/** + * Set the <b>rate</b> and <b>burst</b> value in a token_bucket_cfg. + * + * Note that the <b>rate</b> value is in arbitrary units, but those units will + * determine the units of token_bucket_raw_dec(), token_bucket_raw_refill, and + * so on. + */ +void +token_bucket_cfg_init(token_bucket_cfg_t *cfg, + uint32_t rate, + uint32_t burst) +{ + tor_assert_nonfatal(rate > 0); + tor_assert_nonfatal(burst > 0); + if (burst > TOKEN_BUCKET_MAX_BURST) + burst = TOKEN_BUCKET_MAX_BURST; + + cfg->rate = rate; + cfg->burst = burst; +} + +/** + * Initialize a raw token bucket and its associated timestamp to the "full" + * state, according to <b>cfg</b>. + */ +void +token_bucket_raw_reset(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg) +{ + bucket->bucket = cfg->burst; +} + +/** + * Adust a preexisting token bucket to respect the new configuration + * <b>cfg</b>, by decreasing its current level if needed. */ +void +token_bucket_raw_adjust(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg) +{ + bucket->bucket = MIN(bucket->bucket, cfg->burst); +} + +/** + * Given an amount of <b>elapsed</b> time units, and a bucket configuration + * <b>cfg</b>, refill the level of <b>bucket</b> accordingly. Note that the + * units of time in <b>elapsed</b> must correspond to those used to set the + * rate in <b>cfg</b>, or the result will be illogical. + */ +int +token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg, + const uint32_t elapsed) +{ + const int was_empty = (bucket->bucket <= 0); + /* The casts here prevent an underflow. + * + * Note that even if the bucket value is negative, subtracting it from + * "burst" will still produce a correct result. If this result is + * ridiculously high, then the "elapsed > gap / rate" check below + * should catch it. */ + const size_t gap = ((size_t)cfg->burst) - ((size_t)bucket->bucket); + + if (elapsed > gap / cfg->rate) { + bucket->bucket = cfg->burst; + } else { + bucket->bucket += cfg->rate * elapsed; + } + + return was_empty && bucket->bucket > 0; +} + +/** + * Decrement a provided bucket by <b>n</b> units. Note that <b>n</b> + * must be nonnegative. + */ +int +token_bucket_raw_dec(token_bucket_raw_t *bucket, + ssize_t n) +{ + if (BUG(n < 0)) + return 0; + const int becomes_empty = bucket->bucket > 0 && n >= bucket->bucket; + bucket->bucket -= n; + return becomes_empty; +} + +/** Convert a rate in bytes per second to a rate in bytes per step */ +STATIC uint32_t +rate_per_sec_to_rate_per_step(uint32_t rate) +{ + /* + The precise calculation we'd want to do is + + (rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize + rounding error, we do it this way instead, and divide last. + */ + uint64_t units = (uint64_t) rate * TICKS_PER_STEP; + uint32_t val = (uint32_t) + (monotime_coarse_stamp_units_to_approx_msec(units) / 1000); + return val ? val : 1; +} + +/** + * Initialize a token bucket in *<b>bucket</b>, set up to allow <b>rate</b> + * bytes per second, with a maximum burst of <b>burst</b> bytes. The bucket + * is created such that <b>now_ts</b> is the current timestamp. The bucket + * starts out full. + */ +void +token_bucket_rw_init(token_bucket_rw_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts) +{ + memset(bucket, 0, sizeof(token_bucket_rw_t)); + token_bucket_rw_adjust(bucket, rate, burst); + token_bucket_rw_reset(bucket, now_ts); +} + +/** + * Change the configured rate (in bytes per second) and burst (in bytes) + * for the token bucket in *<b>bucket</b>. + */ +void +token_bucket_rw_adjust(token_bucket_rw_t *bucket, + uint32_t rate, + uint32_t burst) +{ + token_bucket_cfg_init(&bucket->cfg, + rate_per_sec_to_rate_per_step(rate), + burst); + token_bucket_raw_adjust(&bucket->read_bucket, &bucket->cfg); + token_bucket_raw_adjust(&bucket->write_bucket, &bucket->cfg); +} + +/** + * Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>. + */ +void +token_bucket_rw_reset(token_bucket_rw_t *bucket, + uint32_t now_ts) +{ + token_bucket_raw_reset(&bucket->read_bucket, &bucket->cfg); + token_bucket_raw_reset(&bucket->write_bucket, &bucket->cfg); + bucket->last_refilled_at_timestamp = now_ts; +} + +/** + * Refill <b>bucket</b> as appropriate, given that the current timestamp + * is <b>now_ts</b>. + * + * Return a bitmask containing TB_READ iff read bucket was empty and became + * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty. + */ +int +token_bucket_rw_refill(token_bucket_rw_t *bucket, + uint32_t now_ts) +{ + const uint32_t elapsed_ticks = + (now_ts - bucket->last_refilled_at_timestamp); + if (elapsed_ticks > UINT32_MAX-(300*1000)) { + /* Either about 48 days have passed since the last refill, or the + * monotonic clock has somehow moved backwards. (We're looking at you, + * Windows.). We accept up to a 5 minute jump backwards as + * "unremarkable". + */ + return 0; + } + const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP; + + if (!elapsed_steps) { + /* Note that if less than one whole step elapsed, we don't advance the + * time in last_refilled_at. That's intentional: we want to make sure + * that we add some bytes to it eventually. */ + return 0; + } + + int flags = 0; + if (token_bucket_raw_refill_steps(&bucket->read_bucket, + &bucket->cfg, elapsed_steps)) + flags |= TB_READ; + if (token_bucket_raw_refill_steps(&bucket->write_bucket, + &bucket->cfg, elapsed_steps)) + flags |= TB_WRITE; + + bucket->last_refilled_at_timestamp = now_ts; + return flags; +} + +/** + * Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_rw_dec_read(token_bucket_rw_t *bucket, + ssize_t n) +{ + return token_bucket_raw_dec(&bucket->read_bucket, n); +} + +/** + * Decrement the write token bucket in <b>bucket</b> by <b>n</b> bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_rw_dec_write(token_bucket_rw_t *bucket, + ssize_t n) +{ + return token_bucket_raw_dec(&bucket->write_bucket, n); +} + +/** + * As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single + * operation. Return a bitmask of TB_READ and TB_WRITE to indicate + * which buckets became empty. + */ +int +token_bucket_rw_dec(token_bucket_rw_t *bucket, + ssize_t n_read, ssize_t n_written) +{ + int flags = 0; + if (token_bucket_rw_dec_read(bucket, n_read)) + flags |= TB_READ; + if (token_bucket_rw_dec_write(bucket, n_written)) + flags |= TB_WRITE; + return flags; +} diff --git a/src/lib/evloop/token_bucket.h b/src/lib/evloop/token_bucket.h new file mode 100644 index 0000000000..787317fa1f --- /dev/null +++ b/src/lib/evloop/token_bucket.h @@ -0,0 +1,118 @@ +/* Copyright (c) 2018-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket_rw.h + * \brief Headers for token_bucket_rw.c + **/ + +#ifndef TOR_TOKEN_BUCKET_H +#define TOR_TOKEN_BUCKET_H + +#include "lib/cc/torint.h" +#include "lib/testsupport/testsupport.h" + +/** Largest allowable burst value for a token buffer. */ +#define TOKEN_BUCKET_MAX_BURST INT32_MAX + +/** A generic token buffer configuration: determines the number of tokens + * added to the bucket in each time unit (the "rate"), and the maximum number + * of tokens in the bucket (the "burst") */ +typedef struct token_bucket_cfg_t { + uint32_t rate; + int32_t burst; +} token_bucket_cfg_t; + +/** A raw token bucket, decoupled from its configuration and timestamp. */ +typedef struct token_bucket_raw_t { + int32_t bucket; +} token_bucket_raw_t; + +void token_bucket_cfg_init(token_bucket_cfg_t *cfg, + uint32_t rate, + uint32_t burst); + +void token_bucket_raw_adjust(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg); + +void token_bucket_raw_reset(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg); + +int token_bucket_raw_dec(token_bucket_raw_t *bucket, + ssize_t n); + +int token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg, + const uint32_t elapsed_steps); + +static inline size_t token_bucket_raw_get(const token_bucket_raw_t *bucket); +/** Return the current number of bytes set in a token bucket. */ +static inline size_t +token_bucket_raw_get(const token_bucket_raw_t *bucket) +{ + return bucket->bucket >= 0 ? bucket->bucket : 0; +} + +/** A convenience type containing all the pieces needed for a coupled + * read-bucket and write-bucket that have the same rate limit, and which use + * "timestamp units" (see compat_time.h) for their time. */ +typedef struct token_bucket_rw_t { + token_bucket_cfg_t cfg; + token_bucket_raw_t read_bucket; + token_bucket_raw_t write_bucket; + uint32_t last_refilled_at_timestamp; +} token_bucket_rw_t; + +void token_bucket_rw_init(token_bucket_rw_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts); + +void token_bucket_rw_adjust(token_bucket_rw_t *bucket, + uint32_t rate, uint32_t burst); + +void token_bucket_rw_reset(token_bucket_rw_t *bucket, + uint32_t now_ts); + +#define TB_READ 1 +#define TB_WRITE 2 + +int token_bucket_rw_refill(token_bucket_rw_t *bucket, + uint32_t now_ts); + +int token_bucket_rw_dec_read(token_bucket_rw_t *bucket, + ssize_t n); +int token_bucket_rw_dec_write(token_bucket_rw_t *bucket, + ssize_t n); + +int token_bucket_rw_dec(token_bucket_rw_t *bucket, + ssize_t n_read, ssize_t n_written); + +static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket); +static inline size_t +token_bucket_rw_get_read(const token_bucket_rw_t *bucket) +{ + return token_bucket_raw_get(&bucket->read_bucket); +} + +static inline size_t token_bucket_rw_get_write( + const token_bucket_rw_t *bucket); +static inline size_t +token_bucket_rw_get_write(const token_bucket_rw_t *bucket) +{ + return token_bucket_raw_get(&bucket->write_bucket); +} + +#ifdef TOKEN_BUCKET_PRIVATE + +/* To avoid making the rates too small, we consider units of "steps", + * where a "step" is defined as this many timestamp ticks. Keep this + * a power of two if you can. */ +#define TICKS_PER_STEP 16 + +STATIC uint32_t rate_per_sec_to_rate_per_step(uint32_t rate); + +#endif + +#endif /* TOR_TOKEN_BUCKET_H */ + diff --git a/src/lib/evloop/workqueue.c b/src/lib/evloop/workqueue.c new file mode 100644 index 0000000000..e5254396f9 --- /dev/null +++ b/src/lib/evloop/workqueue.c @@ -0,0 +1,682 @@ + +/* copyright (c) 2013-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file workqueue.c + * + * \brief Implements worker threads, queues of work for them, and mechanisms + * for them to send answers back to the main thread. + * + * The main structure here is a threadpool_t : it manages a set of worker + * threads, a queue of pending work, and a reply queue. Every piece of work + * is a workqueue_entry_t, containing data to process and a function to + * process it with. + * + * The main thread informs the worker threads of pending work by using a + * condition variable. The workers inform the main process of completed work + * by using an alert_sockets_t object, as implemented in compat_threads.c. + * + * The main thread can also queue an "update" that will be handled by all the + * workers. This is useful for updating state that all the workers share. + * + * In Tor today, there is currently only one thread pool, used in cpuworker.c. + */ + +#include "orconfig.h" +#include "common/compat_libevent.h" +#include "common/workqueue.h" + +#include "lib/crypt_ops/crypto_rand.h" +#include "lib/intmath/weakrng.h" +#include "lib/log/ratelim.h" +#include "lib/log/torlog.h" +#include "lib/log/util_bug.h" +#include "lib/net/alertsock.h" +#include "lib/net/socket.h" +#include "lib/thread/threads.h" + +#include "tor_queue.h" +#include <event2/event.h> +#include <string.h> + +#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH +#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW +#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) + +TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s); +typedef struct work_tailq_t work_tailq_t; + +struct threadpool_s { + /** An array of pointers to workerthread_t: one for each running worker + * thread. */ + struct workerthread_s **threads; + + /** Condition variable that we wait on when we have no work, and which + * gets signaled when our queue becomes nonempty. */ + tor_cond_t condition; + /** Queues of pending work that we have to do. The queue with priority + * <b>p</b> is work[p]. */ + work_tailq_t work[WORKQUEUE_N_PRIORITIES]; + + /** Weak RNG, used to decide when to ignore priority. */ + tor_weak_rng_t weak_rng; + + /** The current 'update generation' of the threadpool. Any thread that is + * at an earlier generation needs to run the update function. */ + unsigned generation; + + /** Function that should be run for updates on each thread. */ + workqueue_reply_t (*update_fn)(void *, void *); + /** Function to free update arguments if they can't be run. */ + void (*free_update_arg_fn)(void *); + /** Array of n_threads update arguments. */ + void **update_args; + /** Event to notice when another thread has sent a reply. */ + struct event *reply_event; + void (*reply_cb)(threadpool_t *); + + /** Number of elements in threads. */ + int n_threads; + /** Mutex to protect all the above fields. */ + tor_mutex_t lock; + + /** A reply queue to use when constructing new threads. */ + replyqueue_t *reply_queue; + + /** Functions used to allocate and free thread state. */ + void *(*new_thread_state_fn)(void*); + void (*free_thread_state_fn)(void*); + void *new_thread_state_arg; +}; + +/** Used to put a workqueue_priority_t value into a bitfield. */ +#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) +/** Number of bits needed to hold all legal values of workqueue_priority_t */ +#define WORKQUEUE_PRIORITY_BITS 2 + +struct workqueue_entry_s { + /** The next workqueue_entry_t that's pending on the same thread or + * reply queue. */ + TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; + /** The threadpool to which this workqueue_entry_t was assigned. This field + * is set when the workqueue_entry_t is created, and won't be cleared until + * after it's handled in the main thread. */ + struct threadpool_s *on_pool; + /** True iff this entry is waiting for a worker to start processing it. */ + uint8_t pending; + /** Priority of this entry. */ + workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS; + /** Function to run in the worker thread. */ + workqueue_reply_t (*fn)(void *state, void *arg); + /** Function to run while processing the reply queue. */ + void (*reply_fn)(void *arg); + /** Argument for the above functions. */ + void *arg; +}; + +struct replyqueue_s { + /** Mutex to protect the answers field */ + tor_mutex_t lock; + /** Doubly-linked list of answers that the reply queue needs to handle. */ + TOR_TAILQ_HEAD(, workqueue_entry_s) answers; + + /** Mechanism to wake up the main thread when it is receiving answers. */ + alert_sockets_t alert; +}; + +/** A worker thread represents a single thread in a thread pool. */ +typedef struct workerthread_s { + /** Which thread it this? In range 0..in_pool->n_threads-1 */ + int index; + /** The pool this thread is a part of. */ + struct threadpool_s *in_pool; + /** User-supplied state field that we pass to the worker functions of each + * work item. */ + void *state; + /** Reply queue to which we pass our results. */ + replyqueue_t *reply_queue; + /** The current update generation of this thread */ + unsigned generation; + /** One over the probability of taking work from a lower-priority queue. */ + int32_t lower_priority_chance; +} workerthread_t; + +static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); + +/** Allocate and return a new workqueue_entry_t, set up to run the function + * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main + * thread. See threadpool_queue_work() for full documentation. */ +static workqueue_entry_t * +workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), + void (*reply_fn)(void*), + void *arg) +{ + workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t)); + ent->fn = fn; + ent->reply_fn = reply_fn; + ent->arg = arg; + ent->priority = WQ_PRI_HIGH; + return ent; +} + +#define workqueue_entry_free(ent) \ + FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent)) + +/** + * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on + * any queue. + */ +static void +workqueue_entry_free_(workqueue_entry_t *ent) +{ + if (!ent) + return; + memset(ent, 0xf0, sizeof(*ent)); + tor_free(ent); +} + +/** + * Cancel a workqueue_entry_t that has been returned from + * threadpool_queue_work. + * + * You must not call this function on any work whose reply function has been + * executed in the main thread; that will cause undefined behavior (probably, + * a crash). + * + * If the work is cancelled, this function return the argument passed to the + * work function. It is the caller's responsibility to free this storage. + * + * This function will have no effect if the worker thread has already executed + * or begun to execute the work item. In that case, it will return NULL. + */ +void * +workqueue_entry_cancel(workqueue_entry_t *ent) +{ + int cancelled = 0; + void *result = NULL; + tor_mutex_acquire(&ent->on_pool->lock); + workqueue_priority_t prio = ent->priority; + if (ent->pending) { + TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work); + cancelled = 1; + result = ent->arg; + } + tor_mutex_release(&ent->on_pool->lock); + + if (cancelled) { + workqueue_entry_free(ent); + } + return result; +} + +/**DOCDOC + + must hold lock */ +static int +worker_thread_has_work(workerthread_t *thread) +{ + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i])) + return 1; + } + return thread->generation != thread->in_pool->generation; +} + +/** Extract the next workqueue_entry_t from the the thread's pool, removing + * it from the relevant queues and marking it as non-pending. + * + * The caller must hold the lock. */ +static workqueue_entry_t * +worker_thread_extract_next_work(workerthread_t *thread) +{ + threadpool_t *pool = thread->in_pool; + work_tailq_t *queue = NULL, *this_queue; + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + this_queue = &pool->work[i]; + if (!TOR_TAILQ_EMPTY(this_queue)) { + queue = this_queue; + if (! tor_weak_random_one_in_n(&pool->weak_rng, + thread->lower_priority_chance)) { + /* Usually we'll just break now, so that we can get out of the loop + * and use the queue where we found work. But with a small + * probability, we'll keep looking for lower priority work, so that + * we don't ignore our low-priority queues entirely. */ + break; + } + } + } + + if (queue == NULL) + return NULL; + + workqueue_entry_t *work = TOR_TAILQ_FIRST(queue); + TOR_TAILQ_REMOVE(queue, work, next_work); + work->pending = 0; + return work; +} + +/** + * Main function for the worker thread. + */ +static void +worker_thread_main(void *thread_) +{ + workerthread_t *thread = thread_; + threadpool_t *pool = thread->in_pool; + workqueue_entry_t *work; + workqueue_reply_t result; + + tor_mutex_acquire(&pool->lock); + while (1) { + /* lock must be held at this point. */ + while (worker_thread_has_work(thread)) { + /* lock must be held at this point. */ + if (thread->in_pool->generation != thread->generation) { + void *arg = thread->in_pool->update_args[thread->index]; + thread->in_pool->update_args[thread->index] = NULL; + workqueue_reply_t (*update_fn)(void*,void*) = + thread->in_pool->update_fn; + thread->generation = thread->in_pool->generation; + tor_mutex_release(&pool->lock); + + workqueue_reply_t r = update_fn(thread->state, arg); + + if (r != WQ_RPL_REPLY) { + return; + } + + tor_mutex_acquire(&pool->lock); + continue; + } + work = worker_thread_extract_next_work(thread); + if (BUG(work == NULL)) + break; + tor_mutex_release(&pool->lock); + + /* We run the work function without holding the thread lock. This + * is the main thread's first opportunity to give us more work. */ + result = work->fn(thread->state, work->arg); + + /* Queue the reply for the main thread. */ + queue_reply(thread->reply_queue, work); + + /* We may need to exit the thread. */ + if (result != WQ_RPL_REPLY) { + return; + } + tor_mutex_acquire(&pool->lock); + } + /* At this point the lock is held, and there is no work in this thread's + * queue. */ + + /* TODO: support an idle-function */ + + /* Okay. Now, wait till somebody has work for us. */ + if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { + log_warn(LD_GENERAL, "Fail tor_cond_wait."); + } + } +} + +/** Put a reply on the reply queue. The reply must not currently be on + * any thread's work queue. */ +static void +queue_reply(replyqueue_t *queue, workqueue_entry_t *work) +{ + int was_empty; + tor_mutex_acquire(&queue->lock); + was_empty = TOR_TAILQ_EMPTY(&queue->answers); + TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); + tor_mutex_release(&queue->lock); + + if (was_empty) { + if (queue->alert.alert_fn(queue->alert.write_fd) < 0) { + /* XXXX complain! */ + } + } +} + +/** Allocate and start a new worker thread to use state object <b>state</b>, + * and send responses to <b>replyqueue</b>. */ +static workerthread_t * +workerthread_new(int32_t lower_priority_chance, + void *state, threadpool_t *pool, replyqueue_t *replyqueue) +{ + workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); + thr->state = state; + thr->reply_queue = replyqueue; + thr->in_pool = pool; + thr->lower_priority_chance = lower_priority_chance; + + if (spawn_func(worker_thread_main, thr) < 0) { + //LCOV_EXCL_START + tor_assert_nonfatal_unreached(); + log_err(LD_GENERAL, "Can't launch worker thread."); + tor_free(thr); + return NULL; + //LCOV_EXCL_STOP + } + + return thr; +} + +/** + * Queue an item of work for a thread in a thread pool. The function + * <b>fn</b> will be run in a worker thread, and will receive as arguments the + * thread's state object, and the provided object <b>arg</b>. It must return + * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN. + * + * Regardless of its return value, the function <b>reply_fn</b> will later be + * run in the main thread when it invokes replyqueue_process(), and will + * receive as its argument the same <b>arg</b> object. It's the reply + * function's responsibility to free the work object. + * + * On success, return a workqueue_entry_t object that can be passed to + * workqueue_entry_cancel(). On failure, return NULL. (Failure is not + * currently possible, but callers should check anyway.) + * + * Items are executed in a loose priority order -- each thread will usually + * take from the queued work with the highest prioirity, but will occasionally + * visit lower-priority queues to keep them from starving completely. + * + * Note that because of priorities and thread behavior, work items may not + * be executed strictly in order. + */ +workqueue_entry_t * +threadpool_queue_work_priority(threadpool_t *pool, + workqueue_priority_t prio, + workqueue_reply_t (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST && + ((int)prio) <= WORKQUEUE_PRIORITY_LAST); + + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); + ent->on_pool = pool; + ent->pending = 1; + ent->priority = prio; + + tor_mutex_acquire(&pool->lock); + + TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work); + + tor_cond_signal_one(&pool->condition); + + tor_mutex_release(&pool->lock); + + return ent; +} + +/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */ +workqueue_entry_t * +threadpool_queue_work(threadpool_t *pool, + workqueue_reply_t (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg); +} + +/** + * Queue a copy of a work item for every thread in a pool. This can be used, + * for example, to tell the threads to update some parameter in their states. + * + * Arguments are as for <b>threadpool_queue_work</b>, except that the + * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to + * make a copy of it. + * + * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update + * will be run. If a new update is scheduled before the old update finishes + * running, then the new will replace the old in any threads that haven't run + * it yet. + * + * Return 0 on success, -1 on failure. + */ +int +threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + workqueue_reply_t (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg) +{ + int i, n_threads; + void (*old_args_free_fn)(void *arg); + void **old_args; + void **new_args; + + tor_mutex_acquire(&pool->lock); + n_threads = pool->n_threads; + old_args = pool->update_args; + old_args_free_fn = pool->free_update_arg_fn; + + new_args = tor_calloc(n_threads, sizeof(void*)); + for (i = 0; i < n_threads; ++i) { + if (dup_fn) + new_args[i] = dup_fn(arg); + else + new_args[i] = arg; + } + + pool->update_args = new_args; + pool->free_update_arg_fn = free_fn; + pool->update_fn = fn; + ++pool->generation; + + tor_cond_signal_all(&pool->condition); + + tor_mutex_release(&pool->lock); + + if (old_args) { + for (i = 0; i < n_threads; ++i) { + if (old_args[i] && old_args_free_fn) + old_args_free_fn(old_args[i]); + } + tor_free(old_args); + } + + return 0; +} + +/** Don't have more than this many threads per pool. */ +#define MAX_THREADS 1024 + +/** For half of our threads, choose lower priority queues with probability + * 1/N for each of these values. Both are chosen somewhat arbitrarily. If + * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks + * stalling forever. If it's too high, we have a risk of low-priority tasks + * grabbing half of the threads. */ +#define CHANCE_PERMISSIVE 37 +#define CHANCE_STRICT INT32_MAX + +/** Launch threads until we have <b>n</b>. */ +static int +threadpool_start_threads(threadpool_t *pool, int n) +{ + if (BUG(n < 0)) + return -1; // LCOV_EXCL_LINE + if (n > MAX_THREADS) + n = MAX_THREADS; + + tor_mutex_acquire(&pool->lock); + + if (pool->n_threads < n) + pool->threads = tor_reallocarray(pool->threads, + sizeof(workerthread_t*), n); + + while (pool->n_threads < n) { + /* For half of our threads, we'll choose lower priorities permissively; + * for the other half, we'll stick more strictly to higher priorities. + * This keeps slow low-priority tasks from taking over completely. */ + int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE; + + void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); + workerthread_t *thr = workerthread_new(chance, + state, pool, pool->reply_queue); + + if (!thr) { + //LCOV_EXCL_START + tor_assert_nonfatal_unreached(); + pool->free_thread_state_fn(state); + tor_mutex_release(&pool->lock); + return -1; + //LCOV_EXCL_STOP + } + thr->index = pool->n_threads; + pool->threads[pool->n_threads++] = thr; + } + tor_mutex_release(&pool->lock); + + return 0; +} + +/** + * Construct a new thread pool with <b>n</b> worker threads, configured to + * send their output to <b>replyqueue</b>. The threads' states will be + * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b> + * as its argument. When the threads close, they will call + * <b>free_thread_state_fn</b> on their states. + */ +threadpool_t * +threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg) +{ + threadpool_t *pool; + pool = tor_malloc_zero(sizeof(threadpool_t)); + tor_mutex_init_nonrecursive(&pool->lock); + tor_cond_init(&pool->condition); + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + TOR_TAILQ_INIT(&pool->work[i]); + } + { + unsigned seed; + crypto_rand((void*)&seed, sizeof(seed)); + tor_init_weak_random(&pool->weak_rng, seed); + } + + pool->new_thread_state_fn = new_thread_state_fn; + pool->new_thread_state_arg = arg; + pool->free_thread_state_fn = free_thread_state_fn; + pool->reply_queue = replyqueue; + + if (threadpool_start_threads(pool, n_threads) < 0) { + //LCOV_EXCL_START + tor_assert_nonfatal_unreached(); + tor_cond_uninit(&pool->condition); + tor_mutex_uninit(&pool->lock); + tor_free(pool); + return NULL; + //LCOV_EXCL_STOP + } + + return pool; +} + +/** Return the reply queue associated with a given thread pool. */ +replyqueue_t * +threadpool_get_replyqueue(threadpool_t *tp) +{ + return tp->reply_queue; +} + +/** Allocate a new reply queue. Reply queues are used to pass results from + * worker threads to the main thread. Since the main thread is running an + * IO-centric event loop, it needs to get woken up with means other than a + * condition variable. */ +replyqueue_t * +replyqueue_new(uint32_t alertsocks_flags) +{ + replyqueue_t *rq; + + rq = tor_malloc_zero(sizeof(replyqueue_t)); + if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { + //LCOV_EXCL_START + tor_free(rq); + return NULL; + //LCOV_EXCL_STOP + } + + tor_mutex_init(&rq->lock); + TOR_TAILQ_INIT(&rq->answers); + + return rq; +} + +/** Internal: Run from the libevent mainloop when there is work to handle in + * the reply queue handler. */ +static void +reply_event_cb(evutil_socket_t sock, short events, void *arg) +{ + threadpool_t *tp = arg; + (void) sock; + (void) events; + replyqueue_process(tp->reply_queue); + if (tp->reply_cb) + tp->reply_cb(tp); +} + +/** Register the threadpool <b>tp</b>'s reply queue with the libevent + * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after + * each time there is work to process from the reply queue. Return 0 on + * success, -1 on failure. + */ +int +threadpool_register_reply_event(threadpool_t *tp, + void (*cb)(threadpool_t *tp)) +{ + struct event_base *base = tor_libevent_get_base(); + + if (tp->reply_event) { + tor_event_free(tp->reply_event); + } + tp->reply_event = tor_event_new(base, + tp->reply_queue->alert.read_fd, + EV_READ|EV_PERSIST, + reply_event_cb, + tp); + tor_assert(tp->reply_event); + tp->reply_cb = cb; + return event_add(tp->reply_event, NULL); +} + +/** + * Process all pending replies on a reply queue. The main thread should call + * this function every time the socket returned by replyqueue_get_socket() is + * readable. + */ +void +replyqueue_process(replyqueue_t *queue) +{ + int r = queue->alert.drain_fn(queue->alert.read_fd); + if (r < 0) { + //LCOV_EXCL_START + static ratelim_t warn_limit = RATELIM_INIT(7200); + log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, + "Failure from drain_fd: %s", + tor_socket_strerror(-r)); + //LCOV_EXCL_STOP + } + + tor_mutex_acquire(&queue->lock); + while (!TOR_TAILQ_EMPTY(&queue->answers)) { + /* lock must be held at this point.*/ + workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); + TOR_TAILQ_REMOVE(&queue->answers, work, next_work); + tor_mutex_release(&queue->lock); + work->on_pool = NULL; + + work->reply_fn(work->arg); + workqueue_entry_free(work); + + tor_mutex_acquire(&queue->lock); + } + + tor_mutex_release(&queue->lock); +} diff --git a/src/lib/evloop/workqueue.h b/src/lib/evloop/workqueue.h new file mode 100644 index 0000000000..4e5c424be6 --- /dev/null +++ b/src/lib/evloop/workqueue.h @@ -0,0 +1,65 @@ +/* Copyright (c) 2013-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_WORKQUEUE_H +#define TOR_WORKQUEUE_H + +#include "lib/cc/torint.h" + +/** A replyqueue is used to tell the main thread about the outcome of + * work that we queued for the workers. */ +typedef struct replyqueue_s replyqueue_t; +/** A thread-pool manages starting threads and passing work to them. */ +typedef struct threadpool_s threadpool_t; +/** A workqueue entry represents a request that has been passed to a thread + * pool. */ +typedef struct workqueue_entry_s workqueue_entry_t; + +/** Possible return value from a work function: */ +typedef enum workqueue_reply_t { + WQ_RPL_REPLY = 0, /** indicates success */ + WQ_RPL_ERROR = 1, /** indicates fatal error */ + WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */ +} workqueue_reply_t; + +/** Possible priorities for work. Lower numeric values are more important. */ +typedef enum workqueue_priority_t { + WQ_PRI_HIGH = 0, + WQ_PRI_MED = 1, + WQ_PRI_LOW = 2, +} workqueue_priority_t; + +workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool, + workqueue_priority_t prio, + workqueue_reply_t (*fn)(void *, + void *), + void (*reply_fn)(void *), + void *arg); + +workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, + workqueue_reply_t (*fn)(void *, + void *), + void (*reply_fn)(void *), + void *arg); + +int threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + workqueue_reply_t (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg); +void *workqueue_entry_cancel(workqueue_entry_t *pending_work); +threadpool_t *threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg); +replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp); + +replyqueue_t *replyqueue_new(uint32_t alertsocks_flags); +void replyqueue_process(replyqueue_t *queue); + +struct event_base; +int threadpool_register_reply_event(threadpool_t *tp, + void (*cb)(threadpool_t *tp)); + +#endif /* !defined(TOR_WORKQUEUE_H) */ |