diff options
author | Nick Mathewson <nickm@torproject.org> | 2018-07-05 15:13:44 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2018-07-05 15:22:17 -0400 |
commit | 1e417b7275028a50227f57fb71d04c1837ec4b2c (patch) | |
tree | 3cbf379caf8367391980abf3a562b20eb4d63052 /src/common | |
parent | 947de40d198d83e561320afe5d0146f43dc9192a (diff) | |
download | tor-1e417b7275028a50227f57fb71d04c1837ec4b2c.tar.gz tor-1e417b7275028a50227f57fb71d04c1837ec4b2c.zip |
All remaining files in src/common belong to the event loop.
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/Makefile.nmake | 28 | ||||
-rw-r--r-- | src/common/compat_libevent.c | 535 | ||||
-rw-r--r-- | src/common/compat_libevent.h | 99 | ||||
-rw-r--r-- | src/common/include.am | 51 | ||||
-rw-r--r-- | src/common/procmon.c | 336 | ||||
-rw-r--r-- | src/common/procmon.h | 34 | ||||
-rw-r--r-- | src/common/timers.c | 324 | ||||
-rw-r--r-- | src/common/timers.h | 31 | ||||
-rw-r--r-- | src/common/token_bucket.c | 258 | ||||
-rw-r--r-- | src/common/token_bucket.h | 118 | ||||
-rw-r--r-- | src/common/workqueue.c | 682 | ||||
-rw-r--r-- | src/common/workqueue.h | 65 |
12 files changed, 0 insertions, 2561 deletions
diff --git a/src/common/Makefile.nmake b/src/common/Makefile.nmake deleted file mode 100644 index a1c819fffa..0000000000 --- a/src/common/Makefile.nmake +++ /dev/null @@ -1,28 +0,0 @@ -all: libor.lib libor-crypto.lib libor-event.lib - -CFLAGS = /O2 /MT /I ..\win32 /I ..\..\..\build-alpha\include /I ..\common \ - /I ..\ext - -LIBOR_OBJECTS = address.obj backtrace.obj compat.obj container.obj di_ops.obj \ - log.obj memarea.obj mempool.obj procmon.obj sandbox.obj util.obj \ - util_codedigest.obj - -LIBOR_CRYPTO_OBJECTS = aes.obj crypto.obj crypto_format.obj compress.obj compress_zlib.obj \ - tortls.obj crypto_curve25519.obj curve25519-donna.obj - -LIBOR_EVENT_OBJECTS = compat_libevent.obj - -curve25519-donna.obj: ..\ext\curve25519_donna\curve25519-donna.c - $(CC) $(CFLAGS) /D inline=_inline /c ..\ext\curve25519_donna\curve25519-donna.c - -libor.lib: $(LIBOR_OBJECTS) - lib $(LIBOR_OBJECTS) /out:libor.lib - -libor-crypto.lib: $(LIBOR_CRYPTO_OBJECTS) - lib $(LIBOR_CRYPTO_OBJECTS) /out:libor-crypto.lib - -libor-event.lib: $(LIBOR_EVENT_OBJECTS) - lib $(LIBOR_EVENT_OBJECTS) /out:libor-event.lib - -clean: - del *.obj *.lib libor*.lib diff --git a/src/common/compat_libevent.c b/src/common/compat_libevent.c deleted file mode 100644 index 9d21cf20bd..0000000000 --- a/src/common/compat_libevent.c +++ /dev/null @@ -1,535 +0,0 @@ -/* Copyright (c) 2009-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/** - * \file compat_libevent.c - * \brief Wrappers and utility functions for Libevent. - */ - -#include "orconfig.h" -#define COMPAT_LIBEVENT_PRIVATE -#include "common/compat_libevent.h" - -#include "lib/crypt_ops/crypto_rand.h" -#include "lib/log/torlog.h" -#include "lib/log/util_bug.h" -#include "lib/string/compat_string.h" - -#include <event2/event.h> -#include <event2/thread.h> -#include <string.h> - -/** A string which, if it appears in a libevent log, should be ignored. */ -static const char *suppress_msg = NULL; -/** Callback function passed to event_set_log() so we can intercept - * log messages from libevent. */ -STATIC void -libevent_logging_callback(int severity, const char *msg) -{ - char buf[1024]; - size_t n; - if (suppress_msg && strstr(msg, suppress_msg)) - return; - n = strlcpy(buf, msg, sizeof(buf)); - if (n && n < sizeof(buf) && buf[n-1] == '\n') { - buf[n-1] = '\0'; - } - switch (severity) { - case _EVENT_LOG_DEBUG: - log_debug(LD_NOCB|LD_NET, "Message from libevent: %s", buf); - break; - case _EVENT_LOG_MSG: - log_info(LD_NOCB|LD_NET, "Message from libevent: %s", buf); - break; - case _EVENT_LOG_WARN: - log_warn(LD_NOCB|LD_GENERAL, "Warning from libevent: %s", buf); - break; - case _EVENT_LOG_ERR: - log_err(LD_NOCB|LD_GENERAL, "Error from libevent: %s", buf); - break; - default: - log_warn(LD_NOCB|LD_GENERAL, "Message [%d] from libevent: %s", - severity, buf); - break; - } -} -/** Set hook to intercept log messages from libevent. */ -void -configure_libevent_logging(void) -{ - event_set_log_callback(libevent_logging_callback); -} - -/** Ignore any libevent log message that contains <b>msg</b>. */ -void -suppress_libevent_log_msg(const char *msg) -{ - suppress_msg = msg; -} - -/* Wrapper for event_free() that tolerates tor_event_free(NULL) */ -void -tor_event_free_(struct event *ev) -{ - if (ev == NULL) - return; - event_free(ev); -} - -/** Global event base for use by the main thread. */ -static struct event_base *the_event_base = NULL; - -/** - * @defgroup postloop post-loop event helpers - * - * If we're not careful, Libevent can susceptible to infinite event chains: - * one event can activate another, whose callback activates another, whose - * callback activates another, ad infinitum. While this is happening, - * Libevent won't be checking timeouts, socket-based events, signals, and so - * on. - * - * We solve this problem by marking some events as "post-loop". A post-loop - * event behaves like any ordinary event, but any events that _it_ activates - * cannot run until Libevent has checked for other events at least once. - * - * @{ */ - -/** - * An event that stops Libevent from running any more events on the current - * iteration of its loop, until it has re-checked for socket events, signal - * events, timeouts, etc. - */ -static struct event *rescan_mainloop_ev = NULL; - -/** - * Callback to implement rescan_mainloop_ev: it simply exits the mainloop, - * and relies on Tor to re-enter the mainloop since no error has occurred. - */ -static void -rescan_mainloop_cb(evutil_socket_t fd, short events, void *arg) -{ - (void)fd; - (void)events; - struct event_base *the_base = arg; - event_base_loopbreak(the_base); -} - -/** @} */ - -/* This is what passes for version detection on OSX. We set - * MACOSX_KQUEUE_IS_BROKEN to true iff we're on a version of OSX before - * 10.4.0 (aka 1040). */ -#ifdef __APPLE__ -#ifdef __ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ -#define MACOSX_KQUEUE_IS_BROKEN \ - (__ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ < 1040) -#else -#define MACOSX_KQUEUE_IS_BROKEN 0 -#endif /* defined(__ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__) */ -#endif /* defined(__APPLE__) */ - -/** Initialize the Libevent library and set up the event base. */ -void -tor_libevent_initialize(tor_libevent_cfg *torcfg) -{ - tor_assert(the_event_base == NULL); - /* some paths below don't use torcfg, so avoid unused variable warnings */ - (void)torcfg; - - { - int attempts = 0; - struct event_config *cfg; - - ++attempts; - cfg = event_config_new(); - tor_assert(cfg); - - /* Telling Libevent not to try to turn locking on can avoid a needless - * socketpair() attempt. */ - event_config_set_flag(cfg, EVENT_BASE_FLAG_NOLOCK); - - if (torcfg->num_cpus > 0) - event_config_set_num_cpus_hint(cfg, torcfg->num_cpus); - - /* We can enable changelist support with epoll, since we don't give - * Libevent any dup'd fds. This lets us avoid some syscalls. */ - event_config_set_flag(cfg, EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST); - - the_event_base = event_base_new_with_config(cfg); - - event_config_free(cfg); - } - - if (!the_event_base) { - /* LCOV_EXCL_START */ - log_err(LD_GENERAL, "Unable to initialize Libevent: cannot continue."); - exit(1); // exit ok: libevent is broken. - /* LCOV_EXCL_STOP */ - } - - rescan_mainloop_ev = event_new(the_event_base, -1, 0, - rescan_mainloop_cb, the_event_base); - if (!rescan_mainloop_ev) { - /* LCOV_EXCL_START */ - log_err(LD_GENERAL, "Unable to create rescan event: cannot continue."); - exit(1); // exit ok: libevent is broken. - /* LCOV_EXCL_STOP */ - } - - log_info(LD_GENERAL, - "Initialized libevent version %s using method %s. Good.", - event_get_version(), tor_libevent_get_method()); -} - -/** Return the current Libevent event base that we're set up to use. */ -MOCK_IMPL(struct event_base *, -tor_libevent_get_base, (void)) -{ - tor_assert(the_event_base != NULL); - return the_event_base; -} - -/** Return the name of the Libevent backend we're using. */ -const char * -tor_libevent_get_method(void) -{ - return event_base_get_method(the_event_base); -} - -/** Return a string representation of the version of the currently running - * version of Libevent. */ -const char * -tor_libevent_get_version_str(void) -{ - return event_get_version(); -} - -/** Return a string representation of the version of Libevent that was used -* at compilation time. */ -const char * -tor_libevent_get_header_version_str(void) -{ - return LIBEVENT_VERSION; -} - -/** Represents a timer that's run every N microseconds by Libevent. */ -struct periodic_timer_t { - /** Underlying event used to implement this periodic event. */ - struct event *ev; - /** The callback we'll be invoking whenever the event triggers */ - void (*cb)(struct periodic_timer_t *, void *); - /** User-supplied data for the callback */ - void *data; -}; - -/** Libevent callback to implement a periodic event. */ -static void -periodic_timer_cb(evutil_socket_t fd, short what, void *arg) -{ - periodic_timer_t *timer = arg; - (void) what; - (void) fd; - timer->cb(timer, timer->data); -} - -/** Create and schedule a new timer that will run every <b>tv</b> in - * the event loop of <b>base</b>. When the timer fires, it will - * run the timer in <b>cb</b> with the user-supplied data in <b>data</b>. */ -periodic_timer_t * -periodic_timer_new(struct event_base *base, - const struct timeval *tv, - void (*cb)(periodic_timer_t *timer, void *data), - void *data) -{ - periodic_timer_t *timer; - tor_assert(base); - tor_assert(tv); - tor_assert(cb); - timer = tor_malloc_zero(sizeof(periodic_timer_t)); - if (!(timer->ev = tor_event_new(base, -1, EV_PERSIST, - periodic_timer_cb, timer))) { - tor_free(timer); - return NULL; - } - timer->cb = cb; - timer->data = data; - periodic_timer_launch(timer, tv); - return timer; -} - -/** - * Launch the timer <b>timer</b> to run at <b>tv</b> from now, and every - * <b>tv</b> thereafter. - * - * If the timer is already enabled, this function does nothing. - */ -void -periodic_timer_launch(periodic_timer_t *timer, const struct timeval *tv) -{ - tor_assert(timer); - if (event_pending(timer->ev, EV_TIMEOUT, NULL)) - return; - event_add(timer->ev, tv); -} - -/** - * Disable the provided <b>timer</b>, but do not free it. - * - * You can reenable the same timer later with periodic_timer_launch. - * - * If the timer is already disabled, this function does nothing. - */ -void -periodic_timer_disable(periodic_timer_t *timer) -{ - tor_assert(timer); - (void) event_del(timer->ev); -} - -/** Stop and free a periodic timer */ -void -periodic_timer_free_(periodic_timer_t *timer) -{ - if (!timer) - return; - tor_event_free(timer->ev); - tor_free(timer); -} - -/** - * Type used to represent events that run directly from the main loop, - * either because they are activated from elsewhere in the code, or - * because they have a simple timeout. - * - * We use this type to avoid exposing Libevent's API throughout the rest - * of the codebase. - * - * This type can't be used for all events: it doesn't handle events that - * are triggered by signals or by sockets. - */ -struct mainloop_event_t { - struct event *ev; - void (*cb)(mainloop_event_t *, void *); - void *userdata; -}; - -/** - * Internal: Implements mainloop event using a libevent event. - */ -static void -mainloop_event_cb(evutil_socket_t fd, short what, void *arg) -{ - (void)fd; - (void)what; - mainloop_event_t *mev = arg; - mev->cb(mev, mev->userdata); -} - -/** - * As mainloop_event_cb, but implements a post-loop event. - */ -static void -mainloop_event_postloop_cb(evutil_socket_t fd, short what, void *arg) -{ - (void)fd; - (void)what; - - /* Note that if rescan_mainloop_ev is already activated, - * event_active() will do nothing: only the first post-loop event that - * happens each time through the event loop will cause it to be - * activated. - * - * Because event_active() puts events on a FIFO queue, every event - * that is made active _after_ rescan_mainloop_ev will get its - * callback run after rescan_mainloop_cb is called -- that is, on the - * next iteration of the loop. - */ - event_active(rescan_mainloop_ev, EV_READ, 1); - - mainloop_event_t *mev = arg; - mev->cb(mev, mev->userdata); -} - -/** - * Helper for mainloop_event_new() and mainloop_event_postloop_new(). - */ -static mainloop_event_t * -mainloop_event_new_impl(int postloop, - void (*cb)(mainloop_event_t *, void *), - void *userdata) -{ - tor_assert(cb); - - struct event_base *base = tor_libevent_get_base(); - mainloop_event_t *mev = tor_malloc_zero(sizeof(mainloop_event_t)); - mev->ev = tor_event_new(base, -1, 0, - postloop ? mainloop_event_postloop_cb : mainloop_event_cb, - mev); - tor_assert(mev->ev); - mev->cb = cb; - mev->userdata = userdata; - return mev; -} - -/** - * Create and return a new mainloop_event_t to run the function <b>cb</b>. - * - * When run, the callback function will be passed the mainloop_event_t - * and <b>userdata</b> as its arguments. The <b>userdata</b> pointer - * must remain valid for as long as the mainloop_event_t event exists: - * it is your responsibility to free it. - * - * The event is not scheduled by default: Use mainloop_event_activate() - * or mainloop_event_schedule() to make it run. - */ -mainloop_event_t * -mainloop_event_new(void (*cb)(mainloop_event_t *, void *), - void *userdata) -{ - return mainloop_event_new_impl(0, cb, userdata); -} - -/** - * As mainloop_event_new(), but create a post-loop event. - * - * A post-loop event behaves like any ordinary event, but any events - * that _it_ activates cannot run until Libevent has checked for other - * events at least once. - */ -mainloop_event_t * -mainloop_event_postloop_new(void (*cb)(mainloop_event_t *, void *), - void *userdata) -{ - return mainloop_event_new_impl(1, cb, userdata); -} - -/** - * Schedule <b>event</b> to run in the main loop, immediately. If it is - * not scheduled, it will run anyway. If it is already scheduled to run - * later, it will run now instead. This function will have no effect if - * the event is already scheduled to run. - * - * This function may only be called from the main thread. - */ -void -mainloop_event_activate(mainloop_event_t *event) -{ - tor_assert(event); - event_active(event->ev, EV_READ, 1); -} - -/** Schedule <b>event</b> to run in the main loop, after a delay of <b>tv</b>. - * - * If the event is scheduled for a different time, cancel it and run - * after this delay instead. If the event is currently pending to run - * <em>now</b>, has no effect. - * - * Do not call this function with <b>tv</b> == NULL -- use - * mainloop_event_activate() instead. - * - * This function may only be called from the main thread. - */ -int -mainloop_event_schedule(mainloop_event_t *event, const struct timeval *tv) -{ - tor_assert(event); - if (BUG(tv == NULL)) { - // LCOV_EXCL_START - mainloop_event_activate(event); - return 0; - // LCOV_EXCL_STOP - } - return event_add(event->ev, tv); -} - -/** Cancel <b>event</b> if it is currently active or pending. (Do nothing if - * the event is not currently active or pending.) */ -void -mainloop_event_cancel(mainloop_event_t *event) -{ - if (!event) - return; - (void) event_del(event->ev); -} - -/** Cancel <b>event</b> and release all storage associated with it. */ -void -mainloop_event_free_(mainloop_event_t *event) -{ - if (!event) - return; - tor_event_free(event->ev); - memset(event, 0xb8, sizeof(*event)); - tor_free(event); -} - -int -tor_init_libevent_rng(void) -{ - int rv = 0; - char buf[256]; - if (evutil_secure_rng_init() < 0) { - rv = -1; - } - crypto_rand(buf, 32); -#ifdef HAVE_EVUTIL_SECURE_RNG_ADD_BYTES - evutil_secure_rng_add_bytes(buf, 32); -#endif - evutil_secure_rng_get_bytes(buf, sizeof(buf)); - return rv; -} - -/** - * Un-initialize libevent in preparation for an exit - */ -void -tor_libevent_free_all(void) -{ - tor_event_free(rescan_mainloop_ev); - if (the_event_base) - event_base_free(the_event_base); - the_event_base = NULL; -} - -/** - * Run the event loop for the provided event_base, handling events until - * something stops it. If <b>once</b> is set, then just poll-and-run - * once, then exit. Return 0 on success, -1 if an error occurred, or 1 - * if we exited because no events were pending or active. - * - * This isn't reentrant or multithreaded. - */ -int -tor_libevent_run_event_loop(struct event_base *base, int once) -{ - const int flags = once ? EVLOOP_ONCE : 0; - return event_base_loop(base, flags); -} - -/** Tell the event loop to exit after <b>delay</b>. If <b>delay</b> is NULL, - * instead exit after we're done running the currently active events. */ -void -tor_libevent_exit_loop_after_delay(struct event_base *base, - const struct timeval *delay) -{ - event_base_loopexit(base, delay); -} - -/** Tell the event loop to exit after running whichever callback is currently - * active. */ -void -tor_libevent_exit_loop_after_callback(struct event_base *base) -{ - event_base_loopbreak(base); -} - -#if defined(TOR_UNIT_TESTS) -/** For testing: called post-fork to make libevent reinitialize - * kernel structures. */ -void -tor_libevent_postfork(void) -{ - int r = event_reinit(tor_libevent_get_base()); - tor_assert(r == 0); -} -#endif /* defined(TOR_UNIT_TESTS) */ diff --git a/src/common/compat_libevent.h b/src/common/compat_libevent.h deleted file mode 100644 index 0a50cfa667..0000000000 --- a/src/common/compat_libevent.h +++ /dev/null @@ -1,99 +0,0 @@ -/* Copyright (c) 2009-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -#ifndef TOR_COMPAT_LIBEVENT_H -#define TOR_COMPAT_LIBEVENT_H - -#include "orconfig.h" -#include "lib/testsupport/testsupport.h" -#include "lib/malloc/util_malloc.h" - -void configure_libevent_logging(void); -void suppress_libevent_log_msg(const char *msg); - -#define tor_event_new event_new -#define tor_evtimer_new evtimer_new -#define tor_evsignal_new evsignal_new -#define tor_evdns_add_server_port(sock, tcp, cb, data) \ - evdns_add_server_port_with_base(tor_libevent_get_base(), \ - (sock),(tcp),(cb),(data)); - -struct event; -struct event_base; -struct timeval; - -void tor_event_free_(struct event *ev); -#define tor_event_free(ev) \ - FREE_AND_NULL(struct event, tor_event_free_, (ev)) - -typedef struct periodic_timer_t periodic_timer_t; - -periodic_timer_t *periodic_timer_new(struct event_base *base, - const struct timeval *tv, - void (*cb)(periodic_timer_t *timer, void *data), - void *data); -void periodic_timer_free_(periodic_timer_t *); -void periodic_timer_launch(periodic_timer_t *, const struct timeval *tv); -void periodic_timer_disable(periodic_timer_t *); -#define periodic_timer_free(t) \ - FREE_AND_NULL(periodic_timer_t, periodic_timer_free_, (t)) - -typedef struct mainloop_event_t mainloop_event_t; -mainloop_event_t *mainloop_event_new(void (*cb)(mainloop_event_t *, void *), - void *userdata); -mainloop_event_t * mainloop_event_postloop_new( - void (*cb)(mainloop_event_t *, void *), - void *userdata); -void mainloop_event_activate(mainloop_event_t *event); -int mainloop_event_schedule(mainloop_event_t *event, - const struct timeval *delay); -void mainloop_event_cancel(mainloop_event_t *event); -void mainloop_event_free_(mainloop_event_t *event); -#define mainloop_event_free(event) \ - FREE_AND_NULL(mainloop_event_t, mainloop_event_free_, (event)) - -/** Defines a configuration for using libevent with Tor: passed as an argument - * to tor_libevent_initialize() to describe how we want to set up. */ -typedef struct tor_libevent_cfg { - /** How many CPUs should we use (not currently useful). */ - int num_cpus; - /** How many milliseconds should we allow between updating bandwidth limits? - * (Not currently useful). */ - int msec_per_tick; -} tor_libevent_cfg; - -void tor_libevent_initialize(tor_libevent_cfg *cfg); -MOCK_DECL(struct event_base *, tor_libevent_get_base, (void)); -const char *tor_libevent_get_method(void); -void tor_check_libevent_header_compatibility(void); -const char *tor_libevent_get_version_str(void); -const char *tor_libevent_get_header_version_str(void); -void tor_libevent_free_all(void); - -int tor_init_libevent_rng(void); - -#ifdef TOR_UNIT_TESTS -void tor_libevent_postfork(void); -#endif - -int tor_libevent_run_event_loop(struct event_base *base, int once); -void tor_libevent_exit_loop_after_delay(struct event_base *base, - const struct timeval *delay); -void tor_libevent_exit_loop_after_callback(struct event_base *base); - -#ifdef COMPAT_LIBEVENT_PRIVATE - -/** Macro: returns the number of a Libevent version as a 4-byte number, - with the first three bytes representing the major, minor, and patchlevel - respectively of the library. The fourth byte is unused. - - This is equivalent to the format of LIBEVENT_VERSION_NUMBER on Libevent - 2.0.1 or later. */ -#define V(major, minor, patch) \ - (((major) << 24) | ((minor) << 16) | ((patch) << 8)) - -STATIC void -libevent_logging_callback(int severity, const char *msg); -#endif /* defined(COMPAT_LIBEVENT_PRIVATE) */ - -#endif /* !defined(TOR_COMPAT_LIBEVENT_H) */ diff --git a/src/common/include.am b/src/common/include.am deleted file mode 100644 index 8a2d2e1148..0000000000 --- a/src/common/include.am +++ /dev/null @@ -1,51 +0,0 @@ - -noinst_LIBRARIES += \ - src/common/libor.a \ - src/common/libor-event.a - -if UNITTESTS_ENABLED -noinst_LIBRARIES += \ - src/common/libor-testing.a \ - src/common/libor-event-testing.a -endif - -EXTRA_DIST += src/common/Makefile.nmake - -LIBOR_A_SRC = \ - src/common/token_bucket.c \ - src/common/workqueue.c \ - $(libor_extra_source) - -src/common/src_common_libor_testing_a-log.$(OBJEXT) \ - src/common/log.$(OBJEXT): micro-revision.i - -LIBOR_EVENT_A_SRC = \ - src/common/compat_libevent.c \ - src/common/procmon.c \ - src/common/timers.c \ - src/ext/timeouts/timeout.c - -src_common_libor_a_SOURCES = $(LIBOR_A_SRC) -src_common_libor_event_a_SOURCES = $(LIBOR_EVENT_A_SRC) - -if UNITTESTS_ENABLED -src_common_libor_testing_a_SOURCES = $(LIBOR_A_SRC) -src_common_libor_event_testing_a_SOURCES = $(LIBOR_EVENT_A_SRC) -else -src_common_libor_testing_a_SOURCES = -src_common_libor_event_testing_a_SOURCES = -endif - -src_common_libor_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS) -src_common_libor_event_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS) -src_common_libor_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) -src_common_libor_event_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) - -COMMONHEADERS = \ - src/common/compat_libevent.h \ - src/common/procmon.h \ - src/common/timers.h \ - src/common/token_bucket.h \ - src/common/workqueue.h - -noinst_HEADERS+= $(COMMONHEADERS) diff --git a/src/common/procmon.c b/src/common/procmon.c deleted file mode 100644 index 6c2b3e71e5..0000000000 --- a/src/common/procmon.c +++ /dev/null @@ -1,336 +0,0 @@ -/* Copyright (c) 2011-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/** - * \file procmon.c - * \brief Process-termination monitor functions - **/ - -#include "common/procmon.h" - -#include "lib/log/torlog.h" -#include "lib/log/util_bug.h" -#include "lib/log/win32err.h" -#include "lib/malloc/util_malloc.h" -#include "lib/string/parse_int.h" - -#ifdef HAVE_SIGNAL_H -#include <signal.h> -#endif -#ifdef HAVE_ERRNO_H -#include <errno.h> -#endif - -#ifdef _WIN32 -#include <winsock2.h> -#include <windows.h> -#endif - -#if (0 == SIZEOF_PID_T) && defined(_WIN32) -/* Windows does not define pid_t sometimes, but _getpid() returns an int. - * Everybody else needs to have a pid_t. */ -typedef int pid_t; -#define PID_T_FORMAT "%d" -#elif (SIZEOF_PID_T == SIZEOF_INT) || (SIZEOF_PID_T == SIZEOF_SHORT) -#define PID_T_FORMAT "%d" -#elif (SIZEOF_PID_T == SIZEOF_LONG) -#define PID_T_FORMAT "%ld" -#elif (SIZEOF_PID_T == 8) -#define PID_T_FORMAT "%"PRId64 -#else -#error Unknown: SIZEOF_PID_T -#endif /* (0 == SIZEOF_PID_T) && defined(_WIN32) || ... */ - -/* Define to 1 if process-termination monitors on this OS and Libevent - version must poll for process termination themselves. */ -#define PROCMON_POLLS 1 -/* Currently we need to poll in some way on all systems. */ - -#ifdef PROCMON_POLLS -static void tor_process_monitor_poll_cb(periodic_timer_t *ev, - void *procmon_); -#endif - -/* This struct may contain pointers into the original process - * specifier string, but it should *never* contain anything which - * needs to be freed. */ -/* DOCDOC parsed_process_specifier_t */ -struct parsed_process_specifier_t { - pid_t pid; -}; - -/** Parse the process specifier given in <b>process_spec</b> into - * *<b>ppspec</b>. Return 0 on success; return -1 and store an error - * message into *<b>msg</b> on failure. The caller must not free the - * returned error message. */ -static int -parse_process_specifier(const char *process_spec, - struct parsed_process_specifier_t *ppspec, - const char **msg) -{ - long pid_l; - int pid_ok = 0; - char *pspec_next; - - /* If we're lucky, long will turn out to be large enough to hold a - * PID everywhere that Tor runs. */ - pid_l = tor_parse_long(process_spec, 10, 1, LONG_MAX, &pid_ok, &pspec_next); - - /* Reserve room in the ‘process specifier’ for additional - * (platform-specific) identifying information beyond the PID, to - * make our process-existence checks a bit less racy in a future - * version. */ - if ((*pspec_next != 0) && (*pspec_next != ' ') && (*pspec_next != ':')) { - pid_ok = 0; - } - - ppspec->pid = (pid_t)(pid_l); - if (!pid_ok || (pid_l != (long)(ppspec->pid))) { - *msg = "invalid PID"; - goto err; - } - - return 0; - err: - return -1; -} - -/* DOCDOC tor_process_monitor_t */ -struct tor_process_monitor_t { - /** Log domain for warning messages. */ - log_domain_mask_t log_domain; - - /** All systems: The best we can do in general is poll for the - * process's existence by PID periodically, and hope that the kernel - * doesn't reassign the same PID to another process between our - * polls. */ - pid_t pid; - -#ifdef _WIN32 - /** Windows-only: Should we poll hproc? If false, poll pid - * instead. */ - int poll_hproc; - - /** Windows-only: Get a handle to the process (if possible) and - * periodically check whether the process we have a handle to has - * ended. */ - HANDLE hproc; - /* XXXX We should have Libevent watch hproc for us, - * if/when some version of Libevent can be told to do so. */ -#endif /* defined(_WIN32) */ - - /* XXXX On Linux, we can and should receive the 22nd - * (space-delimited) field (‘starttime’) of /proc/$PID/stat from the - * owning controller and store it, and poll once in a while to see - * whether it has changed -- if so, the kernel has *definitely* - * reassigned the owning controller's PID and we should exit. On - * FreeBSD, we can do the same trick using either the 8th - * space-delimited field of /proc/$PID/status on the seven FBSD - * systems whose admins have mounted procfs, or the start-time field - * of the process-information structure returned by kvmgetprocs() on - * any system. The latter is ickier. */ - - /* XXXX On FreeBSD (and possibly other kqueue systems), we can and - * should arrange to receive EVFILT_PROC NOTE_EXIT notifications for - * pid, so we don't have to do such a heavyweight poll operation in - * order to avoid the PID-reassignment race condition. (We would - * still need to poll our own kqueue periodically until some version - * of Libevent 2.x learns to receive these events for us.) */ - - /** A Libevent event structure, to either poll for the process's - * existence or receive a notification when the process ends. */ - periodic_timer_t *e; - - /** A callback to be called when the process ends. */ - tor_procmon_callback_t cb; - void *cb_arg; /**< A user-specified pointer to be passed to cb. */ -}; - -/** Verify that the process specifier given in <b>process_spec</b> is - * syntactically valid. Return 0 on success; return -1 and store an - * error message into *<b>msg</b> on failure. The caller must not - * free the returned error message. */ -int -tor_validate_process_specifier(const char *process_spec, - const char **msg) -{ - struct parsed_process_specifier_t ppspec; - - tor_assert(msg != NULL); - *msg = NULL; - - return parse_process_specifier(process_spec, &ppspec, msg); -} - -/* DOCDOC poll_interval_tv */ -static const struct timeval poll_interval_tv = {15, 0}; - -/** Create a process-termination monitor for the process specifier - * given in <b>process_spec</b>. Return a newly allocated - * tor_process_monitor_t on success; return NULL and store an error - * message into *<b>msg</b> on failure. The caller must not free - * the returned error message. - * - * When the monitored process terminates, call - * <b>cb</b>(<b>cb_arg</b>). - */ -tor_process_monitor_t * -tor_process_monitor_new(struct event_base *base, - const char *process_spec, - log_domain_mask_t log_domain, - tor_procmon_callback_t cb, void *cb_arg, - const char **msg) -{ - tor_process_monitor_t *procmon = tor_malloc_zero( - sizeof(tor_process_monitor_t)); - struct parsed_process_specifier_t ppspec; - - tor_assert(msg != NULL); - *msg = NULL; - - if (procmon == NULL) { - *msg = "out of memory"; - goto err; - } - - procmon->log_domain = log_domain; - - if (parse_process_specifier(process_spec, &ppspec, msg)) - goto err; - - procmon->pid = ppspec.pid; - -#ifdef _WIN32 - procmon->hproc = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, - FALSE, - procmon->pid); - - if (procmon->hproc != NULL) { - procmon->poll_hproc = 1; - log_info(procmon->log_domain, "Successfully opened handle to process " - PID_T_FORMAT"; " - "monitoring it.", - procmon->pid); - } else { - /* If we couldn't get a handle to the process, we'll try again the - * first time we poll. */ - log_info(procmon->log_domain, "Failed to open handle to process " - PID_T_FORMAT"; will " - "try again later.", - procmon->pid); - } -#endif /* defined(_WIN32) */ - - procmon->cb = cb; - procmon->cb_arg = cb_arg; - -#ifdef PROCMON_POLLS - procmon->e = periodic_timer_new(base, - &poll_interval_tv, - tor_process_monitor_poll_cb, procmon); -#else /* !(defined(PROCMON_POLLS)) */ -#error OOPS? -#endif /* defined(PROCMON_POLLS) */ - - return procmon; - err: - tor_process_monitor_free(procmon); - return NULL; -} - -#ifdef PROCMON_POLLS -/** Libevent callback to poll for the existence of the process - * monitored by <b>procmon_</b>. */ -static void -tor_process_monitor_poll_cb(periodic_timer_t *event, void *procmon_) -{ - (void)event; - tor_process_monitor_t *procmon = (tor_process_monitor_t *)(procmon_); - int its_dead_jim; - - tor_assert(procmon != NULL); - -#ifdef _WIN32 - if (procmon->poll_hproc) { - DWORD exit_code; - if (!GetExitCodeProcess(procmon->hproc, &exit_code)) { - char *errmsg = format_win32_error(GetLastError()); - log_warn(procmon->log_domain, "Error \"%s\" occurred while polling " - "handle for monitored process "PID_T_FORMAT"; assuming " - "it's dead.", - errmsg, procmon->pid); - tor_free(errmsg); - its_dead_jim = 1; - } else { - its_dead_jim = (exit_code != STILL_ACTIVE); - } - } else { - /* All we can do is try to open the process, and look at the error - * code if it fails again. */ - procmon->hproc = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, - FALSE, - procmon->pid); - - if (procmon->hproc != NULL) { - log_info(procmon->log_domain, "Successfully opened handle to monitored " - "process "PID_T_FORMAT".", - procmon->pid); - its_dead_jim = 0; - procmon->poll_hproc = 1; - } else { - DWORD err_code = GetLastError(); - char *errmsg = format_win32_error(err_code); - - /* When I tested OpenProcess's error codes on Windows 7, I - * received error code 5 (ERROR_ACCESS_DENIED) for PIDs of - * existing processes that I could not open and error code 87 - * (ERROR_INVALID_PARAMETER) for PIDs that were not in use. - * Since the nonexistent-process error code is sane, I'm going - * to assume that all errors other than ERROR_INVALID_PARAMETER - * mean that the process we are monitoring is still alive. */ - its_dead_jim = (err_code == ERROR_INVALID_PARAMETER); - - if (!its_dead_jim) - log_info(procmon->log_domain, "Failed to open handle to monitored " - "process "PID_T_FORMAT", and error code %lu (%s) is not " - "'invalid parameter' -- assuming the process is still alive.", - procmon->pid, - err_code, errmsg); - - tor_free(errmsg); - } - } -#else /* !(defined(_WIN32)) */ - /* Unix makes this part easy, if a bit racy. */ - its_dead_jim = kill(procmon->pid, 0); - its_dead_jim = its_dead_jim && (errno == ESRCH); -#endif /* defined(_WIN32) */ - - tor_log(its_dead_jim ? LOG_NOTICE : LOG_INFO, - procmon->log_domain, "Monitored process "PID_T_FORMAT" is %s.", - procmon->pid, - its_dead_jim ? "dead" : "still alive"); - - if (its_dead_jim) { - procmon->cb(procmon->cb_arg); - } -} -#endif /* defined(PROCMON_POLLS) */ - -/** Free the process-termination monitor <b>procmon</b>. */ -void -tor_process_monitor_free_(tor_process_monitor_t *procmon) -{ - if (procmon == NULL) - return; - -#ifdef _WIN32 - if (procmon->hproc != NULL) - CloseHandle(procmon->hproc); -#endif - - if (procmon->e != NULL) - periodic_timer_free(procmon->e); - - tor_free(procmon); -} diff --git a/src/common/procmon.h b/src/common/procmon.h deleted file mode 100644 index b8daeed0db..0000000000 --- a/src/common/procmon.h +++ /dev/null @@ -1,34 +0,0 @@ -/* Copyright (c) 2011-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/** - * \file procmon.h - * \brief Headers for procmon.c - **/ - -#ifndef TOR_PROCMON_H -#define TOR_PROCMON_H - -#include "common/compat_libevent.h" - -#include "lib/log/torlog.h" - -typedef struct tor_process_monitor_t tor_process_monitor_t; - -/* DOCDOC tor_procmon_callback_t */ -typedef void (*tor_procmon_callback_t)(void *); - -int tor_validate_process_specifier(const char *process_spec, - const char **msg); -tor_process_monitor_t *tor_process_monitor_new(struct event_base *base, - const char *process_spec, - log_domain_mask_t log_domain, - tor_procmon_callback_t cb, - void *cb_arg, - const char **msg); -void tor_process_monitor_free_(tor_process_monitor_t *procmon); -#define tor_process_monitor_free(procmon) \ - FREE_AND_NULL(tor_process_monitor_t, tor_process_monitor_free_, (procmon)) - -#endif /* !defined(TOR_PROCMON_H) */ - diff --git a/src/common/timers.c b/src/common/timers.c deleted file mode 100644 index ff92a2e447..0000000000 --- a/src/common/timers.c +++ /dev/null @@ -1,324 +0,0 @@ -/* Copyright (c) 2016-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/** - * \file timers.c - * \brief Wrapper around William Ahern's fast hierarchical timer wheel - * implementation, to tie it in with a libevent backend. - * - * Only use these functions from the main thread. - * - * The main advantage of tor_timer_t over using libevent's timers is that - * they're way more efficient if we need to have thousands or millions of - * them. For more information, see - * http://www.25thandclement.com/~william/projects/timeout.c.html - * - * Periodic timers are available in the backend, but I've turned them off. - * We can turn them back on if needed. - */ - -/* Notes: - * - * Having a way to free all timers on shutdown would free people from the - * need to track them. Not sure if that's clever though. - * - * In an ideal world, Libevent would just switch to use this backend, and we - * could throw this file away. But even if Libevent does switch, we'll be - * stuck with legacy libevents for some time. - */ - -#include "orconfig.h" - -#define TOR_TIMERS_PRIVATE - -#include "common/compat_libevent.h" -#include "common/timers.h" -#include "lib/intmath/muldiv.h" -#include "lib/log/torlog.h" -#include "lib/log/util_bug.h" -#include "lib/malloc/util_malloc.h" -#include "lib/time/compat_time.h" - -#ifdef _WIN32 -// For struct timeval. -#include <winsock2.h> -#endif - -struct timeout_cb { - timer_cb_fn_t cb; - void *arg; -}; - -/* - * These definitions are for timeouts.c and timeouts.h. - */ -#ifdef __GNUC__ -/* We're not exposing any of the functions outside this file. */ -#define TIMEOUT_PUBLIC __attribute__((__unused__)) static -#else -/* We're not exposing any of the functions outside this file. */ -#define TIMEOUT_PUBLIC static -#endif /* defined(__GNUC__) */ -/* We're not using periodic events. */ -#define TIMEOUT_DISABLE_INTERVALS -/* We always know the global_timeouts object, so we don't need each timeout - * to keep a pointer to it. */ -#define TIMEOUT_DISABLE_RELATIVE_ACCESS -/* We're providing our own struct timeout_cb. */ -#define TIMEOUT_CB_OVERRIDE -/* We're going to support timers that are pretty far out in advance. Making - * this big can be inefficient, but having a significant number of timers - * above TIMEOUT_MAX can also be super-inefficient. Choosing 5 here sets - * timeout_max to 2^30 ticks, or 29 hours with our value for USEC_PER_TICK */ -#define WHEEL_NUM 5 -#if SIZEOF_VOID_P == 4 -/* On 32-bit platforms, we want to override wheel_bit, so that timeout.c will - * use 32-bit math. */ -#define WHEEL_BIT 5 -#endif -#include "src/ext/timeouts/timeout.c" - -static struct timeouts *global_timeouts = NULL; -static struct mainloop_event_t *global_timer_event = NULL; - -static monotime_t start_of_time; - -/** We need to choose this value carefully. Because we're using timer wheels, - * it actually costs us to have extra resolution we don't use. So for now, - * I'm going to define our resolution as .1 msec, and hope that's good enough. - * - * Note that two of the most popular libevent backends (epoll without timerfd, - * and windows select), simply can't support sub-millisecond resolution, - * do this is optimistic for a lot of users. - */ -#define USEC_PER_TICK 100 - -/** One million microseconds in a second */ -#define USEC_PER_SEC 1000000 - -/** Check at least once every N seconds. */ -#define MIN_CHECK_SECONDS 3600 - -/** Check at least once every N ticks. */ -#define MIN_CHECK_TICKS \ - (((timeout_t)MIN_CHECK_SECONDS) * (1000000 / USEC_PER_TICK)) - -/** - * Convert the timeval in <b>tv</b> to a timeout_t, and return it. - * - * The output resolution is set by USEC_PER_TICK. Only use this to convert - * delays to number of ticks; the time represented by 0 is undefined. - */ -static timeout_t -tv_to_timeout(const struct timeval *tv) -{ - uint64_t usec = tv->tv_usec; - usec += ((uint64_t)USEC_PER_SEC) * tv->tv_sec; - return usec / USEC_PER_TICK; -} - -/** - * Convert the timeout in <b>t</b> to a timeval in <b>tv_out</b>. Only - * use this for delays, not absolute times. - */ -static void -timeout_to_tv(timeout_t t, struct timeval *tv_out) -{ - t *= USEC_PER_TICK; - tv_out->tv_usec = (int)(t % USEC_PER_SEC); - tv_out->tv_sec = (time_t)(t / USEC_PER_SEC); -} - -/** - * Update the timer <b>tv</b> to the current time in <b>tv</b>. - */ -static void -timer_advance_to_cur_time(const monotime_t *now) -{ - timeout_t cur_tick = CEIL_DIV(monotime_diff_usec(&start_of_time, now), - USEC_PER_TICK); - timeouts_update(global_timeouts, cur_tick); -} - -/** - * Adjust the time at which the libevent timer should fire based on - * the next-expiring time in <b>global_timeouts</b> - */ -static void -libevent_timer_reschedule(void) -{ - monotime_t now; - monotime_get(&now); - timer_advance_to_cur_time(&now); - - timeout_t delay = timeouts_timeout(global_timeouts); - - struct timeval d; - if (delay > MIN_CHECK_TICKS) - delay = MIN_CHECK_TICKS; - timeout_to_tv(delay, &d); - mainloop_event_schedule(global_timer_event, &d); -} - -/** Run the callback of every timer that has expired, based on the current - * output of monotime_get(). */ -STATIC void -timers_run_pending(void) -{ - monotime_t now; - monotime_get(&now); - timer_advance_to_cur_time(&now); - - tor_timer_t *t; - while ((t = timeouts_get(global_timeouts))) { - t->callback.cb(t, t->callback.arg, &now); - } -} - -/** - * Invoked when the libevent timer has expired: see which tor_timer_t events - * have fired, activate their callbacks, and reschedule the libevent timer. - */ -static void -libevent_timer_callback(mainloop_event_t *ev, void *arg) -{ - (void)ev; - (void)arg; - - timers_run_pending(); - - libevent_timer_reschedule(); -} - -/** - * Initialize the timers subsystem. Requires that libevent has already been - * initialized. - */ -void -timers_initialize(void) -{ - if (BUG(global_timeouts)) - return; // LCOV_EXCL_LINE - - timeout_error_t err = 0; - global_timeouts = timeouts_open(0, &err); - if (!global_timeouts) { - // LCOV_EXCL_START -- this can only fail on malloc failure. - log_err(LD_BUG, "Unable to open timer backend: %s", strerror(err)); - tor_assert(0); - // LCOV_EXCL_STOP - } - - monotime_init(); - monotime_get(&start_of_time); - - mainloop_event_t *timer_event; - timer_event = mainloop_event_new(libevent_timer_callback, NULL); - tor_assert(timer_event); - global_timer_event = timer_event; - - libevent_timer_reschedule(); -} - -/** - * Release all storage held in the timers subsystem. Does not fire timers. - */ -void -timers_shutdown(void) -{ - if (global_timer_event) { - mainloop_event_free(global_timer_event); - global_timer_event = NULL; - } - if (global_timeouts) { - timeouts_close(global_timeouts); - global_timeouts = NULL; - } -} - -/** - * Allocate and return a new timer, with given callback and argument. - */ -tor_timer_t * -timer_new(timer_cb_fn_t cb, void *arg) -{ - tor_timer_t *t = tor_malloc(sizeof(tor_timer_t)); - timeout_init(t, 0); - timer_set_cb(t, cb, arg); - return t; -} - -/** - * Release all storage held by <b>t</b>, and unschedule it if was already - * scheduled. - */ -void -timer_free_(tor_timer_t *t) -{ - if (! t) - return; - - timeouts_del(global_timeouts, t); - tor_free(t); -} - -/** - * Change the callback and argument associated with a timer <b>t</b>. - */ -void -timer_set_cb(tor_timer_t *t, timer_cb_fn_t cb, void *arg) -{ - t->callback.cb = cb; - t->callback.arg = arg; -} - -/** - * Set *<b>cb_out</b> (if provided) to this timer's callback function, - * and *<b>arg_out</b> (if provided) to this timer's callback argument. - */ -void -timer_get_cb(const tor_timer_t *t, - timer_cb_fn_t *cb_out, void **arg_out) -{ - if (cb_out) - *cb_out = t->callback.cb; - if (arg_out) - *arg_out = t->callback.arg; -} - -/** - * Schedule the timer t to fire at the current time plus a delay of - * <b>delay</b> microseconds. All times are relative to monotime_get(). - */ -void -timer_schedule(tor_timer_t *t, const struct timeval *tv) -{ - const timeout_t delay = tv_to_timeout(tv); - - monotime_t now; - monotime_get(&now); - timer_advance_to_cur_time(&now); - - /* Take the old timeout value. */ - timeout_t to = timeouts_timeout(global_timeouts); - - timeouts_add(global_timeouts, t, delay); - - /* Should we update the libevent timer? */ - if (to <= delay) { - return; /* we're already going to fire before this timer would trigger. */ - } - libevent_timer_reschedule(); -} - -/** - * Cancel the timer <b>t</b> if it is currently scheduled. (It's okay to call - * this on an unscheduled timer. - */ -void -timer_disable(tor_timer_t *t) -{ - timeouts_del(global_timeouts, t); - /* We don't reschedule the libevent timer here, since it's okay if it fires - * early. */ -} diff --git a/src/common/timers.h b/src/common/timers.h deleted file mode 100644 index 2348c7b7c1..0000000000 --- a/src/common/timers.h +++ /dev/null @@ -1,31 +0,0 @@ -/* Copyright (c) 2016-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -#ifndef TOR_TIMERS_H -#define TOR_TIMERS_H - -#include "orconfig.h" -#include "lib/testsupport/testsupport.h" - -struct monotime_t; -typedef struct timeout tor_timer_t; -typedef void (*timer_cb_fn_t)(tor_timer_t *, void *, - const struct monotime_t *); -tor_timer_t *timer_new(timer_cb_fn_t cb, void *arg); -void timer_set_cb(tor_timer_t *t, timer_cb_fn_t cb, void *arg); -void timer_get_cb(const tor_timer_t *t, - timer_cb_fn_t *cb_out, void **arg_out); -void timer_schedule(tor_timer_t *t, const struct timeval *delay); -void timer_disable(tor_timer_t *t); -void timer_free_(tor_timer_t *t); -#define timer_free(t) FREE_AND_NULL(tor_timer_t, timer_free_, (t)) - -void timers_initialize(void); -void timers_shutdown(void); - -#ifdef TOR_TIMERS_PRIVATE -STATIC void timers_run_pending(void); -#endif - -#endif /* !defined(TOR_TIMERS_H) */ - diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c deleted file mode 100644 index f7b092f612..0000000000 --- a/src/common/token_bucket.c +++ /dev/null @@ -1,258 +0,0 @@ -/* Copyright (c) 2018-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/** - * \file token_bucket.c - * \brief Functions to use and manipulate token buckets, used for - * rate-limiting on connections and globally. - * - * Tor uses these token buckets to keep track of bandwidth usage, and - * sometimes other things too. - * - * There are two layers of abstraction here: "raw" token buckets, in which all - * the pieces are decoupled, and "read-write" token buckets, which combine all - * the moving parts into one. - * - * Token buckets may become negative. - **/ - -#define TOKEN_BUCKET_PRIVATE - -#include "common/token_bucket.h" -#include "lib/log/util_bug.h" -#include "lib/intmath/cmp.h" -#include "lib/time/compat_time.h" - -#include <string.h> - -/** - * Set the <b>rate</b> and <b>burst</b> value in a token_bucket_cfg. - * - * Note that the <b>rate</b> value is in arbitrary units, but those units will - * determine the units of token_bucket_raw_dec(), token_bucket_raw_refill, and - * so on. - */ -void -token_bucket_cfg_init(token_bucket_cfg_t *cfg, - uint32_t rate, - uint32_t burst) -{ - tor_assert_nonfatal(rate > 0); - tor_assert_nonfatal(burst > 0); - if (burst > TOKEN_BUCKET_MAX_BURST) - burst = TOKEN_BUCKET_MAX_BURST; - - cfg->rate = rate; - cfg->burst = burst; -} - -/** - * Initialize a raw token bucket and its associated timestamp to the "full" - * state, according to <b>cfg</b>. - */ -void -token_bucket_raw_reset(token_bucket_raw_t *bucket, - const token_bucket_cfg_t *cfg) -{ - bucket->bucket = cfg->burst; -} - -/** - * Adust a preexisting token bucket to respect the new configuration - * <b>cfg</b>, by decreasing its current level if needed. */ -void -token_bucket_raw_adjust(token_bucket_raw_t *bucket, - const token_bucket_cfg_t *cfg) -{ - bucket->bucket = MIN(bucket->bucket, cfg->burst); -} - -/** - * Given an amount of <b>elapsed</b> time units, and a bucket configuration - * <b>cfg</b>, refill the level of <b>bucket</b> accordingly. Note that the - * units of time in <b>elapsed</b> must correspond to those used to set the - * rate in <b>cfg</b>, or the result will be illogical. - */ -int -token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, - const token_bucket_cfg_t *cfg, - const uint32_t elapsed) -{ - const int was_empty = (bucket->bucket <= 0); - /* The casts here prevent an underflow. - * - * Note that even if the bucket value is negative, subtracting it from - * "burst" will still produce a correct result. If this result is - * ridiculously high, then the "elapsed > gap / rate" check below - * should catch it. */ - const size_t gap = ((size_t)cfg->burst) - ((size_t)bucket->bucket); - - if (elapsed > gap / cfg->rate) { - bucket->bucket = cfg->burst; - } else { - bucket->bucket += cfg->rate * elapsed; - } - - return was_empty && bucket->bucket > 0; -} - -/** - * Decrement a provided bucket by <b>n</b> units. Note that <b>n</b> - * must be nonnegative. - */ -int -token_bucket_raw_dec(token_bucket_raw_t *bucket, - ssize_t n) -{ - if (BUG(n < 0)) - return 0; - const int becomes_empty = bucket->bucket > 0 && n >= bucket->bucket; - bucket->bucket -= n; - return becomes_empty; -} - -/** Convert a rate in bytes per second to a rate in bytes per step */ -STATIC uint32_t -rate_per_sec_to_rate_per_step(uint32_t rate) -{ - /* - The precise calculation we'd want to do is - - (rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize - rounding error, we do it this way instead, and divide last. - */ - uint64_t units = (uint64_t) rate * TICKS_PER_STEP; - uint32_t val = (uint32_t) - (monotime_coarse_stamp_units_to_approx_msec(units) / 1000); - return val ? val : 1; -} - -/** - * Initialize a token bucket in *<b>bucket</b>, set up to allow <b>rate</b> - * bytes per second, with a maximum burst of <b>burst</b> bytes. The bucket - * is created such that <b>now_ts</b> is the current timestamp. The bucket - * starts out full. - */ -void -token_bucket_rw_init(token_bucket_rw_t *bucket, - uint32_t rate, - uint32_t burst, - uint32_t now_ts) -{ - memset(bucket, 0, sizeof(token_bucket_rw_t)); - token_bucket_rw_adjust(bucket, rate, burst); - token_bucket_rw_reset(bucket, now_ts); -} - -/** - * Change the configured rate (in bytes per second) and burst (in bytes) - * for the token bucket in *<b>bucket</b>. - */ -void -token_bucket_rw_adjust(token_bucket_rw_t *bucket, - uint32_t rate, - uint32_t burst) -{ - token_bucket_cfg_init(&bucket->cfg, - rate_per_sec_to_rate_per_step(rate), - burst); - token_bucket_raw_adjust(&bucket->read_bucket, &bucket->cfg); - token_bucket_raw_adjust(&bucket->write_bucket, &bucket->cfg); -} - -/** - * Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>. - */ -void -token_bucket_rw_reset(token_bucket_rw_t *bucket, - uint32_t now_ts) -{ - token_bucket_raw_reset(&bucket->read_bucket, &bucket->cfg); - token_bucket_raw_reset(&bucket->write_bucket, &bucket->cfg); - bucket->last_refilled_at_timestamp = now_ts; -} - -/** - * Refill <b>bucket</b> as appropriate, given that the current timestamp - * is <b>now_ts</b>. - * - * Return a bitmask containing TB_READ iff read bucket was empty and became - * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty. - */ -int -token_bucket_rw_refill(token_bucket_rw_t *bucket, - uint32_t now_ts) -{ - const uint32_t elapsed_ticks = - (now_ts - bucket->last_refilled_at_timestamp); - if (elapsed_ticks > UINT32_MAX-(300*1000)) { - /* Either about 48 days have passed since the last refill, or the - * monotonic clock has somehow moved backwards. (We're looking at you, - * Windows.). We accept up to a 5 minute jump backwards as - * "unremarkable". - */ - return 0; - } - const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP; - - if (!elapsed_steps) { - /* Note that if less than one whole step elapsed, we don't advance the - * time in last_refilled_at. That's intentional: we want to make sure - * that we add some bytes to it eventually. */ - return 0; - } - - int flags = 0; - if (token_bucket_raw_refill_steps(&bucket->read_bucket, - &bucket->cfg, elapsed_steps)) - flags |= TB_READ; - if (token_bucket_raw_refill_steps(&bucket->write_bucket, - &bucket->cfg, elapsed_steps)) - flags |= TB_WRITE; - - bucket->last_refilled_at_timestamp = now_ts; - return flags; -} - -/** - * Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes. - * - * Return true if the bucket was nonempty and became empty; return false - * otherwise. - */ -int -token_bucket_rw_dec_read(token_bucket_rw_t *bucket, - ssize_t n) -{ - return token_bucket_raw_dec(&bucket->read_bucket, n); -} - -/** - * Decrement the write token bucket in <b>bucket</b> by <b>n</b> bytes. - * - * Return true if the bucket was nonempty and became empty; return false - * otherwise. - */ -int -token_bucket_rw_dec_write(token_bucket_rw_t *bucket, - ssize_t n) -{ - return token_bucket_raw_dec(&bucket->write_bucket, n); -} - -/** - * As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single - * operation. Return a bitmask of TB_READ and TB_WRITE to indicate - * which buckets became empty. - */ -int -token_bucket_rw_dec(token_bucket_rw_t *bucket, - ssize_t n_read, ssize_t n_written) -{ - int flags = 0; - if (token_bucket_rw_dec_read(bucket, n_read)) - flags |= TB_READ; - if (token_bucket_rw_dec_write(bucket, n_written)) - flags |= TB_WRITE; - return flags; -} diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h deleted file mode 100644 index 787317fa1f..0000000000 --- a/src/common/token_bucket.h +++ /dev/null @@ -1,118 +0,0 @@ -/* Copyright (c) 2018-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/** - * \file token_bucket_rw.h - * \brief Headers for token_bucket_rw.c - **/ - -#ifndef TOR_TOKEN_BUCKET_H -#define TOR_TOKEN_BUCKET_H - -#include "lib/cc/torint.h" -#include "lib/testsupport/testsupport.h" - -/** Largest allowable burst value for a token buffer. */ -#define TOKEN_BUCKET_MAX_BURST INT32_MAX - -/** A generic token buffer configuration: determines the number of tokens - * added to the bucket in each time unit (the "rate"), and the maximum number - * of tokens in the bucket (the "burst") */ -typedef struct token_bucket_cfg_t { - uint32_t rate; - int32_t burst; -} token_bucket_cfg_t; - -/** A raw token bucket, decoupled from its configuration and timestamp. */ -typedef struct token_bucket_raw_t { - int32_t bucket; -} token_bucket_raw_t; - -void token_bucket_cfg_init(token_bucket_cfg_t *cfg, - uint32_t rate, - uint32_t burst); - -void token_bucket_raw_adjust(token_bucket_raw_t *bucket, - const token_bucket_cfg_t *cfg); - -void token_bucket_raw_reset(token_bucket_raw_t *bucket, - const token_bucket_cfg_t *cfg); - -int token_bucket_raw_dec(token_bucket_raw_t *bucket, - ssize_t n); - -int token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, - const token_bucket_cfg_t *cfg, - const uint32_t elapsed_steps); - -static inline size_t token_bucket_raw_get(const token_bucket_raw_t *bucket); -/** Return the current number of bytes set in a token bucket. */ -static inline size_t -token_bucket_raw_get(const token_bucket_raw_t *bucket) -{ - return bucket->bucket >= 0 ? bucket->bucket : 0; -} - -/** A convenience type containing all the pieces needed for a coupled - * read-bucket and write-bucket that have the same rate limit, and which use - * "timestamp units" (see compat_time.h) for their time. */ -typedef struct token_bucket_rw_t { - token_bucket_cfg_t cfg; - token_bucket_raw_t read_bucket; - token_bucket_raw_t write_bucket; - uint32_t last_refilled_at_timestamp; -} token_bucket_rw_t; - -void token_bucket_rw_init(token_bucket_rw_t *bucket, - uint32_t rate, - uint32_t burst, - uint32_t now_ts); - -void token_bucket_rw_adjust(token_bucket_rw_t *bucket, - uint32_t rate, uint32_t burst); - -void token_bucket_rw_reset(token_bucket_rw_t *bucket, - uint32_t now_ts); - -#define TB_READ 1 -#define TB_WRITE 2 - -int token_bucket_rw_refill(token_bucket_rw_t *bucket, - uint32_t now_ts); - -int token_bucket_rw_dec_read(token_bucket_rw_t *bucket, - ssize_t n); -int token_bucket_rw_dec_write(token_bucket_rw_t *bucket, - ssize_t n); - -int token_bucket_rw_dec(token_bucket_rw_t *bucket, - ssize_t n_read, ssize_t n_written); - -static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket); -static inline size_t -token_bucket_rw_get_read(const token_bucket_rw_t *bucket) -{ - return token_bucket_raw_get(&bucket->read_bucket); -} - -static inline size_t token_bucket_rw_get_write( - const token_bucket_rw_t *bucket); -static inline size_t -token_bucket_rw_get_write(const token_bucket_rw_t *bucket) -{ - return token_bucket_raw_get(&bucket->write_bucket); -} - -#ifdef TOKEN_BUCKET_PRIVATE - -/* To avoid making the rates too small, we consider units of "steps", - * where a "step" is defined as this many timestamp ticks. Keep this - * a power of two if you can. */ -#define TICKS_PER_STEP 16 - -STATIC uint32_t rate_per_sec_to_rate_per_step(uint32_t rate); - -#endif - -#endif /* TOR_TOKEN_BUCKET_H */ - diff --git a/src/common/workqueue.c b/src/common/workqueue.c deleted file mode 100644 index e5254396f9..0000000000 --- a/src/common/workqueue.c +++ /dev/null @@ -1,682 +0,0 @@ - -/* copyright (c) 2013-2015, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/** - * \file workqueue.c - * - * \brief Implements worker threads, queues of work for them, and mechanisms - * for them to send answers back to the main thread. - * - * The main structure here is a threadpool_t : it manages a set of worker - * threads, a queue of pending work, and a reply queue. Every piece of work - * is a workqueue_entry_t, containing data to process and a function to - * process it with. - * - * The main thread informs the worker threads of pending work by using a - * condition variable. The workers inform the main process of completed work - * by using an alert_sockets_t object, as implemented in compat_threads.c. - * - * The main thread can also queue an "update" that will be handled by all the - * workers. This is useful for updating state that all the workers share. - * - * In Tor today, there is currently only one thread pool, used in cpuworker.c. - */ - -#include "orconfig.h" -#include "common/compat_libevent.h" -#include "common/workqueue.h" - -#include "lib/crypt_ops/crypto_rand.h" -#include "lib/intmath/weakrng.h" -#include "lib/log/ratelim.h" -#include "lib/log/torlog.h" -#include "lib/log/util_bug.h" -#include "lib/net/alertsock.h" -#include "lib/net/socket.h" -#include "lib/thread/threads.h" - -#include "tor_queue.h" -#include <event2/event.h> -#include <string.h> - -#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH -#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW -#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) - -TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s); -typedef struct work_tailq_t work_tailq_t; - -struct threadpool_s { - /** An array of pointers to workerthread_t: one for each running worker - * thread. */ - struct workerthread_s **threads; - - /** Condition variable that we wait on when we have no work, and which - * gets signaled when our queue becomes nonempty. */ - tor_cond_t condition; - /** Queues of pending work that we have to do. The queue with priority - * <b>p</b> is work[p]. */ - work_tailq_t work[WORKQUEUE_N_PRIORITIES]; - - /** Weak RNG, used to decide when to ignore priority. */ - tor_weak_rng_t weak_rng; - - /** The current 'update generation' of the threadpool. Any thread that is - * at an earlier generation needs to run the update function. */ - unsigned generation; - - /** Function that should be run for updates on each thread. */ - workqueue_reply_t (*update_fn)(void *, void *); - /** Function to free update arguments if they can't be run. */ - void (*free_update_arg_fn)(void *); - /** Array of n_threads update arguments. */ - void **update_args; - /** Event to notice when another thread has sent a reply. */ - struct event *reply_event; - void (*reply_cb)(threadpool_t *); - - /** Number of elements in threads. */ - int n_threads; - /** Mutex to protect all the above fields. */ - tor_mutex_t lock; - - /** A reply queue to use when constructing new threads. */ - replyqueue_t *reply_queue; - - /** Functions used to allocate and free thread state. */ - void *(*new_thread_state_fn)(void*); - void (*free_thread_state_fn)(void*); - void *new_thread_state_arg; -}; - -/** Used to put a workqueue_priority_t value into a bitfield. */ -#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) -/** Number of bits needed to hold all legal values of workqueue_priority_t */ -#define WORKQUEUE_PRIORITY_BITS 2 - -struct workqueue_entry_s { - /** The next workqueue_entry_t that's pending on the same thread or - * reply queue. */ - TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; - /** The threadpool to which this workqueue_entry_t was assigned. This field - * is set when the workqueue_entry_t is created, and won't be cleared until - * after it's handled in the main thread. */ - struct threadpool_s *on_pool; - /** True iff this entry is waiting for a worker to start processing it. */ - uint8_t pending; - /** Priority of this entry. */ - workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS; - /** Function to run in the worker thread. */ - workqueue_reply_t (*fn)(void *state, void *arg); - /** Function to run while processing the reply queue. */ - void (*reply_fn)(void *arg); - /** Argument for the above functions. */ - void *arg; -}; - -struct replyqueue_s { - /** Mutex to protect the answers field */ - tor_mutex_t lock; - /** Doubly-linked list of answers that the reply queue needs to handle. */ - TOR_TAILQ_HEAD(, workqueue_entry_s) answers; - - /** Mechanism to wake up the main thread when it is receiving answers. */ - alert_sockets_t alert; -}; - -/** A worker thread represents a single thread in a thread pool. */ -typedef struct workerthread_s { - /** Which thread it this? In range 0..in_pool->n_threads-1 */ - int index; - /** The pool this thread is a part of. */ - struct threadpool_s *in_pool; - /** User-supplied state field that we pass to the worker functions of each - * work item. */ - void *state; - /** Reply queue to which we pass our results. */ - replyqueue_t *reply_queue; - /** The current update generation of this thread */ - unsigned generation; - /** One over the probability of taking work from a lower-priority queue. */ - int32_t lower_priority_chance; -} workerthread_t; - -static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); - -/** Allocate and return a new workqueue_entry_t, set up to run the function - * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main - * thread. See threadpool_queue_work() for full documentation. */ -static workqueue_entry_t * -workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), - void (*reply_fn)(void*), - void *arg) -{ - workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t)); - ent->fn = fn; - ent->reply_fn = reply_fn; - ent->arg = arg; - ent->priority = WQ_PRI_HIGH; - return ent; -} - -#define workqueue_entry_free(ent) \ - FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent)) - -/** - * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on - * any queue. - */ -static void -workqueue_entry_free_(workqueue_entry_t *ent) -{ - if (!ent) - return; - memset(ent, 0xf0, sizeof(*ent)); - tor_free(ent); -} - -/** - * Cancel a workqueue_entry_t that has been returned from - * threadpool_queue_work. - * - * You must not call this function on any work whose reply function has been - * executed in the main thread; that will cause undefined behavior (probably, - * a crash). - * - * If the work is cancelled, this function return the argument passed to the - * work function. It is the caller's responsibility to free this storage. - * - * This function will have no effect if the worker thread has already executed - * or begun to execute the work item. In that case, it will return NULL. - */ -void * -workqueue_entry_cancel(workqueue_entry_t *ent) -{ - int cancelled = 0; - void *result = NULL; - tor_mutex_acquire(&ent->on_pool->lock); - workqueue_priority_t prio = ent->priority; - if (ent->pending) { - TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work); - cancelled = 1; - result = ent->arg; - } - tor_mutex_release(&ent->on_pool->lock); - - if (cancelled) { - workqueue_entry_free(ent); - } - return result; -} - -/**DOCDOC - - must hold lock */ -static int -worker_thread_has_work(workerthread_t *thread) -{ - unsigned i; - for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { - if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i])) - return 1; - } - return thread->generation != thread->in_pool->generation; -} - -/** Extract the next workqueue_entry_t from the the thread's pool, removing - * it from the relevant queues and marking it as non-pending. - * - * The caller must hold the lock. */ -static workqueue_entry_t * -worker_thread_extract_next_work(workerthread_t *thread) -{ - threadpool_t *pool = thread->in_pool; - work_tailq_t *queue = NULL, *this_queue; - unsigned i; - for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { - this_queue = &pool->work[i]; - if (!TOR_TAILQ_EMPTY(this_queue)) { - queue = this_queue; - if (! tor_weak_random_one_in_n(&pool->weak_rng, - thread->lower_priority_chance)) { - /* Usually we'll just break now, so that we can get out of the loop - * and use the queue where we found work. But with a small - * probability, we'll keep looking for lower priority work, so that - * we don't ignore our low-priority queues entirely. */ - break; - } - } - } - - if (queue == NULL) - return NULL; - - workqueue_entry_t *work = TOR_TAILQ_FIRST(queue); - TOR_TAILQ_REMOVE(queue, work, next_work); - work->pending = 0; - return work; -} - -/** - * Main function for the worker thread. - */ -static void -worker_thread_main(void *thread_) -{ - workerthread_t *thread = thread_; - threadpool_t *pool = thread->in_pool; - workqueue_entry_t *work; - workqueue_reply_t result; - - tor_mutex_acquire(&pool->lock); - while (1) { - /* lock must be held at this point. */ - while (worker_thread_has_work(thread)) { - /* lock must be held at this point. */ - if (thread->in_pool->generation != thread->generation) { - void *arg = thread->in_pool->update_args[thread->index]; - thread->in_pool->update_args[thread->index] = NULL; - workqueue_reply_t (*update_fn)(void*,void*) = - thread->in_pool->update_fn; - thread->generation = thread->in_pool->generation; - tor_mutex_release(&pool->lock); - - workqueue_reply_t r = update_fn(thread->state, arg); - - if (r != WQ_RPL_REPLY) { - return; - } - - tor_mutex_acquire(&pool->lock); - continue; - } - work = worker_thread_extract_next_work(thread); - if (BUG(work == NULL)) - break; - tor_mutex_release(&pool->lock); - - /* We run the work function without holding the thread lock. This - * is the main thread's first opportunity to give us more work. */ - result = work->fn(thread->state, work->arg); - - /* Queue the reply for the main thread. */ - queue_reply(thread->reply_queue, work); - - /* We may need to exit the thread. */ - if (result != WQ_RPL_REPLY) { - return; - } - tor_mutex_acquire(&pool->lock); - } - /* At this point the lock is held, and there is no work in this thread's - * queue. */ - - /* TODO: support an idle-function */ - - /* Okay. Now, wait till somebody has work for us. */ - if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { - log_warn(LD_GENERAL, "Fail tor_cond_wait."); - } - } -} - -/** Put a reply on the reply queue. The reply must not currently be on - * any thread's work queue. */ -static void -queue_reply(replyqueue_t *queue, workqueue_entry_t *work) -{ - int was_empty; - tor_mutex_acquire(&queue->lock); - was_empty = TOR_TAILQ_EMPTY(&queue->answers); - TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); - tor_mutex_release(&queue->lock); - - if (was_empty) { - if (queue->alert.alert_fn(queue->alert.write_fd) < 0) { - /* XXXX complain! */ - } - } -} - -/** Allocate and start a new worker thread to use state object <b>state</b>, - * and send responses to <b>replyqueue</b>. */ -static workerthread_t * -workerthread_new(int32_t lower_priority_chance, - void *state, threadpool_t *pool, replyqueue_t *replyqueue) -{ - workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); - thr->state = state; - thr->reply_queue = replyqueue; - thr->in_pool = pool; - thr->lower_priority_chance = lower_priority_chance; - - if (spawn_func(worker_thread_main, thr) < 0) { - //LCOV_EXCL_START - tor_assert_nonfatal_unreached(); - log_err(LD_GENERAL, "Can't launch worker thread."); - tor_free(thr); - return NULL; - //LCOV_EXCL_STOP - } - - return thr; -} - -/** - * Queue an item of work for a thread in a thread pool. The function - * <b>fn</b> will be run in a worker thread, and will receive as arguments the - * thread's state object, and the provided object <b>arg</b>. It must return - * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN. - * - * Regardless of its return value, the function <b>reply_fn</b> will later be - * run in the main thread when it invokes replyqueue_process(), and will - * receive as its argument the same <b>arg</b> object. It's the reply - * function's responsibility to free the work object. - * - * On success, return a workqueue_entry_t object that can be passed to - * workqueue_entry_cancel(). On failure, return NULL. (Failure is not - * currently possible, but callers should check anyway.) - * - * Items are executed in a loose priority order -- each thread will usually - * take from the queued work with the highest prioirity, but will occasionally - * visit lower-priority queues to keep them from starving completely. - * - * Note that because of priorities and thread behavior, work items may not - * be executed strictly in order. - */ -workqueue_entry_t * -threadpool_queue_work_priority(threadpool_t *pool, - workqueue_priority_t prio, - workqueue_reply_t (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg) -{ - tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST && - ((int)prio) <= WORKQUEUE_PRIORITY_LAST); - - workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); - ent->on_pool = pool; - ent->pending = 1; - ent->priority = prio; - - tor_mutex_acquire(&pool->lock); - - TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work); - - tor_cond_signal_one(&pool->condition); - - tor_mutex_release(&pool->lock); - - return ent; -} - -/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */ -workqueue_entry_t * -threadpool_queue_work(threadpool_t *pool, - workqueue_reply_t (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg) -{ - return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg); -} - -/** - * Queue a copy of a work item for every thread in a pool. This can be used, - * for example, to tell the threads to update some parameter in their states. - * - * Arguments are as for <b>threadpool_queue_work</b>, except that the - * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to - * make a copy of it. - * - * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update - * will be run. If a new update is scheduled before the old update finishes - * running, then the new will replace the old in any threads that haven't run - * it yet. - * - * Return 0 on success, -1 on failure. - */ -int -threadpool_queue_update(threadpool_t *pool, - void *(*dup_fn)(void *), - workqueue_reply_t (*fn)(void *, void *), - void (*free_fn)(void *), - void *arg) -{ - int i, n_threads; - void (*old_args_free_fn)(void *arg); - void **old_args; - void **new_args; - - tor_mutex_acquire(&pool->lock); - n_threads = pool->n_threads; - old_args = pool->update_args; - old_args_free_fn = pool->free_update_arg_fn; - - new_args = tor_calloc(n_threads, sizeof(void*)); - for (i = 0; i < n_threads; ++i) { - if (dup_fn) - new_args[i] = dup_fn(arg); - else - new_args[i] = arg; - } - - pool->update_args = new_args; - pool->free_update_arg_fn = free_fn; - pool->update_fn = fn; - ++pool->generation; - - tor_cond_signal_all(&pool->condition); - - tor_mutex_release(&pool->lock); - - if (old_args) { - for (i = 0; i < n_threads; ++i) { - if (old_args[i] && old_args_free_fn) - old_args_free_fn(old_args[i]); - } - tor_free(old_args); - } - - return 0; -} - -/** Don't have more than this many threads per pool. */ -#define MAX_THREADS 1024 - -/** For half of our threads, choose lower priority queues with probability - * 1/N for each of these values. Both are chosen somewhat arbitrarily. If - * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks - * stalling forever. If it's too high, we have a risk of low-priority tasks - * grabbing half of the threads. */ -#define CHANCE_PERMISSIVE 37 -#define CHANCE_STRICT INT32_MAX - -/** Launch threads until we have <b>n</b>. */ -static int -threadpool_start_threads(threadpool_t *pool, int n) -{ - if (BUG(n < 0)) - return -1; // LCOV_EXCL_LINE - if (n > MAX_THREADS) - n = MAX_THREADS; - - tor_mutex_acquire(&pool->lock); - - if (pool->n_threads < n) - pool->threads = tor_reallocarray(pool->threads, - sizeof(workerthread_t*), n); - - while (pool->n_threads < n) { - /* For half of our threads, we'll choose lower priorities permissively; - * for the other half, we'll stick more strictly to higher priorities. - * This keeps slow low-priority tasks from taking over completely. */ - int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE; - - void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); - workerthread_t *thr = workerthread_new(chance, - state, pool, pool->reply_queue); - - if (!thr) { - //LCOV_EXCL_START - tor_assert_nonfatal_unreached(); - pool->free_thread_state_fn(state); - tor_mutex_release(&pool->lock); - return -1; - //LCOV_EXCL_STOP - } - thr->index = pool->n_threads; - pool->threads[pool->n_threads++] = thr; - } - tor_mutex_release(&pool->lock); - - return 0; -} - -/** - * Construct a new thread pool with <b>n</b> worker threads, configured to - * send their output to <b>replyqueue</b>. The threads' states will be - * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b> - * as its argument. When the threads close, they will call - * <b>free_thread_state_fn</b> on their states. - */ -threadpool_t * -threadpool_new(int n_threads, - replyqueue_t *replyqueue, - void *(*new_thread_state_fn)(void*), - void (*free_thread_state_fn)(void*), - void *arg) -{ - threadpool_t *pool; - pool = tor_malloc_zero(sizeof(threadpool_t)); - tor_mutex_init_nonrecursive(&pool->lock); - tor_cond_init(&pool->condition); - unsigned i; - for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { - TOR_TAILQ_INIT(&pool->work[i]); - } - { - unsigned seed; - crypto_rand((void*)&seed, sizeof(seed)); - tor_init_weak_random(&pool->weak_rng, seed); - } - - pool->new_thread_state_fn = new_thread_state_fn; - pool->new_thread_state_arg = arg; - pool->free_thread_state_fn = free_thread_state_fn; - pool->reply_queue = replyqueue; - - if (threadpool_start_threads(pool, n_threads) < 0) { - //LCOV_EXCL_START - tor_assert_nonfatal_unreached(); - tor_cond_uninit(&pool->condition); - tor_mutex_uninit(&pool->lock); - tor_free(pool); - return NULL; - //LCOV_EXCL_STOP - } - - return pool; -} - -/** Return the reply queue associated with a given thread pool. */ -replyqueue_t * -threadpool_get_replyqueue(threadpool_t *tp) -{ - return tp->reply_queue; -} - -/** Allocate a new reply queue. Reply queues are used to pass results from - * worker threads to the main thread. Since the main thread is running an - * IO-centric event loop, it needs to get woken up with means other than a - * condition variable. */ -replyqueue_t * -replyqueue_new(uint32_t alertsocks_flags) -{ - replyqueue_t *rq; - - rq = tor_malloc_zero(sizeof(replyqueue_t)); - if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { - //LCOV_EXCL_START - tor_free(rq); - return NULL; - //LCOV_EXCL_STOP - } - - tor_mutex_init(&rq->lock); - TOR_TAILQ_INIT(&rq->answers); - - return rq; -} - -/** Internal: Run from the libevent mainloop when there is work to handle in - * the reply queue handler. */ -static void -reply_event_cb(evutil_socket_t sock, short events, void *arg) -{ - threadpool_t *tp = arg; - (void) sock; - (void) events; - replyqueue_process(tp->reply_queue); - if (tp->reply_cb) - tp->reply_cb(tp); -} - -/** Register the threadpool <b>tp</b>'s reply queue with the libevent - * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after - * each time there is work to process from the reply queue. Return 0 on - * success, -1 on failure. - */ -int -threadpool_register_reply_event(threadpool_t *tp, - void (*cb)(threadpool_t *tp)) -{ - struct event_base *base = tor_libevent_get_base(); - - if (tp->reply_event) { - tor_event_free(tp->reply_event); - } - tp->reply_event = tor_event_new(base, - tp->reply_queue->alert.read_fd, - EV_READ|EV_PERSIST, - reply_event_cb, - tp); - tor_assert(tp->reply_event); - tp->reply_cb = cb; - return event_add(tp->reply_event, NULL); -} - -/** - * Process all pending replies on a reply queue. The main thread should call - * this function every time the socket returned by replyqueue_get_socket() is - * readable. - */ -void -replyqueue_process(replyqueue_t *queue) -{ - int r = queue->alert.drain_fn(queue->alert.read_fd); - if (r < 0) { - //LCOV_EXCL_START - static ratelim_t warn_limit = RATELIM_INIT(7200); - log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, - "Failure from drain_fd: %s", - tor_socket_strerror(-r)); - //LCOV_EXCL_STOP - } - - tor_mutex_acquire(&queue->lock); - while (!TOR_TAILQ_EMPTY(&queue->answers)) { - /* lock must be held at this point.*/ - workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); - TOR_TAILQ_REMOVE(&queue->answers, work, next_work); - tor_mutex_release(&queue->lock); - work->on_pool = NULL; - - work->reply_fn(work->arg); - workqueue_entry_free(work); - - tor_mutex_acquire(&queue->lock); - } - - tor_mutex_release(&queue->lock); -} diff --git a/src/common/workqueue.h b/src/common/workqueue.h deleted file mode 100644 index 4e5c424be6..0000000000 --- a/src/common/workqueue.h +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright (c) 2013-2018, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -#ifndef TOR_WORKQUEUE_H -#define TOR_WORKQUEUE_H - -#include "lib/cc/torint.h" - -/** A replyqueue is used to tell the main thread about the outcome of - * work that we queued for the workers. */ -typedef struct replyqueue_s replyqueue_t; -/** A thread-pool manages starting threads and passing work to them. */ -typedef struct threadpool_s threadpool_t; -/** A workqueue entry represents a request that has been passed to a thread - * pool. */ -typedef struct workqueue_entry_s workqueue_entry_t; - -/** Possible return value from a work function: */ -typedef enum workqueue_reply_t { - WQ_RPL_REPLY = 0, /** indicates success */ - WQ_RPL_ERROR = 1, /** indicates fatal error */ - WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */ -} workqueue_reply_t; - -/** Possible priorities for work. Lower numeric values are more important. */ -typedef enum workqueue_priority_t { - WQ_PRI_HIGH = 0, - WQ_PRI_MED = 1, - WQ_PRI_LOW = 2, -} workqueue_priority_t; - -workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool, - workqueue_priority_t prio, - workqueue_reply_t (*fn)(void *, - void *), - void (*reply_fn)(void *), - void *arg); - -workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, - workqueue_reply_t (*fn)(void *, - void *), - void (*reply_fn)(void *), - void *arg); - -int threadpool_queue_update(threadpool_t *pool, - void *(*dup_fn)(void *), - workqueue_reply_t (*fn)(void *, void *), - void (*free_fn)(void *), - void *arg); -void *workqueue_entry_cancel(workqueue_entry_t *pending_work); -threadpool_t *threadpool_new(int n_threads, - replyqueue_t *replyqueue, - void *(*new_thread_state_fn)(void*), - void (*free_thread_state_fn)(void*), - void *arg); -replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp); - -replyqueue_t *replyqueue_new(uint32_t alertsocks_flags); -void replyqueue_process(replyqueue_t *queue); - -struct event_base; -int threadpool_register_reply_event(threadpool_t *tp, - void (*cb)(threadpool_t *tp)); - -#endif /* !defined(TOR_WORKQUEUE_H) */ |