aboutsummaryrefslogtreecommitdiff
path: root/src/feature/stats/bwhist.c
blob: 06ad48e5c3c9f0e26af640840a70e6333f5814a0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
/* Copyright (c) 2001 Matej Pfajfar.
 * Copyright (c) 2001-2004, Roger Dingledine.
 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
 * Copyright (c) 2007-2020, The Tor Project, Inc. */
/* See LICENSE for licensing information */

/**
 * @file bwhist.c
 * @brief Tracking for relay bandwidth history
 *
 * This module handles bandwidth usage history, used by relays to
 * self-report how much bandwidth they've used for different
 * purposes over last day or so, in order to generate the
 * {dirreq-,}{read,write}-history lines in that they publish.
 **/

#define BWHIST_PRIVATE
#include "orconfig.h"
#include "core/or/or.h"
#include "feature/stats/bwhist.h"

#include "app/config/config.h"
#include "app/config/statefile.h"
#include "feature/relay/routermode.h"

#include "feature/stats/bw_array_st.h"
#include "app/config/or_state_st.h"
#include "app/config/or_options_st.h"

/** Shift the current period of b forward by one. */
STATIC void
commit_max(bw_array_t *b)
{
  /* Store total from current period. */
  b->totals[b->next_max_idx] = b->total_in_period;
  /* Store maximum from current period. */
  b->maxima[b->next_max_idx++] = b->max_total;
  /* Advance next_period and next_max_idx */
  b->next_period += NUM_SECS_BW_SUM_INTERVAL;
  if (b->next_max_idx == NUM_TOTALS)
    b->next_max_idx = 0;
  if (b->num_maxes_set < NUM_TOTALS)
    ++b->num_maxes_set;
  /* Reset max_total. */
  b->max_total = 0;
  /* Reset total_in_period. */
  b->total_in_period = 0;
}

/** Shift the current observation time of <b>b</b> forward by one second. */
STATIC void
advance_obs(bw_array_t *b)
{
  int nextidx;
  uint64_t total;

  /* Calculate the total bandwidth for the last NUM_SECS_ROLLING_MEASURE
   * seconds; adjust max_total as needed.*/
  total = b->total_obs + b->obs[b->cur_obs_idx];
  if (total > b->max_total)
    b->max_total = total;

  nextidx = b->cur_obs_idx+1;
  if (nextidx == NUM_SECS_ROLLING_MEASURE)
    nextidx = 0;

  b->total_obs = total - b->obs[nextidx];
  b->obs[nextidx]=0;
  b->cur_obs_idx = nextidx;

  if (++b->cur_obs_time >= b->next_period)
    commit_max(b);
}

/** Add <b>n</b> bytes to the number of bytes in <b>b</b> for second
 * <b>when</b>. */
STATIC void
add_obs(bw_array_t *b, time_t when, uint64_t n)
{
  if (when < b->cur_obs_time)
    return; /* Don't record data in the past. */

  /* If we're currently adding observations for an earlier second than
   * 'when', advance b->cur_obs_time and b->cur_obs_idx by an
   * appropriate number of seconds, and do all the other housekeeping. */
  while (when > b->cur_obs_time) {
    /* Doing this one second at a time is potentially inefficient, if we start
       with a state file that is very old.  Fortunately, it doesn't seem to
       show up in profiles, so we can just ignore it for now. */
    advance_obs(b);
  }

  b->obs[b->cur_obs_idx] += n;
  b->total_in_period += n;
}

/** Allocate, initialize, and return a new bw_array. */
STATIC bw_array_t *
bw_array_new(void)
{
  bw_array_t *b;
  time_t start;
  b = tor_malloc_zero(sizeof(bw_array_t));
  start = time(NULL);
  b->cur_obs_time = start;
  b->next_period = start + NUM_SECS_BW_SUM_INTERVAL;
  return b;
}

/** Free storage held by bandwidth array <b>b</b>. */
STATIC void
bw_array_free_(bw_array_t *b)
{
  if (!b) {
    return;
  }

  tor_free(b);
}

/** Recent history of bandwidth observations for (all) read operations. */
static bw_array_t *read_array = NULL;
/** Recent history of bandwidth observations for IPv6 read operations. */
static bw_array_t *read_array_ipv6 = NULL;
/** Recent history of bandwidth observations for (all) write operations. */
STATIC bw_array_t *write_array = NULL;
/** Recent history of bandwidth observations for IPv6 write operations. */
static bw_array_t *write_array_ipv6 = NULL;
/** Recent history of bandwidth observations for read operations for the
    directory protocol. */
static bw_array_t *dir_read_array = NULL;
/** Recent history of bandwidth observations for write operations for the
    directory protocol. */
static bw_array_t *dir_write_array = NULL;

/** Set up structures for bandwidth history, clearing them if they already
 * exist. */
void
bwhist_init(void)
{
  bw_array_free(read_array);
  bw_array_free(read_array_ipv6);
  bw_array_free(write_array);
  bw_array_free(write_array_ipv6);
  bw_array_free(dir_read_array);
  bw_array_free(dir_write_array);

  read_array = bw_array_new();
  read_array_ipv6 = bw_array_new();
  write_array = bw_array_new();
  write_array_ipv6 = bw_array_new();
  dir_read_array = bw_array_new();
  dir_write_array = bw_array_new();
}

/** Remember that we read <b>num_bytes</b> bytes in second <b>when</b>.
 *
 * Add num_bytes to the current running total for <b>when</b>.
 *
 * <b>when</b> can go back to time, but it's safe to ignore calls
 * earlier than the latest <b>when</b> you've heard of.
 */
void
bwhist_note_bytes_written(uint64_t num_bytes, time_t when, bool ipv6)
{
/* Maybe a circular array for recent seconds, and step to a new point
 * every time a new second shows up. Or simpler is to just to have
 * a normal array and push down each item every second; it's short.
 */
/* When a new second has rolled over, compute the sum of the bytes we've
 * seen over when-1 to when-1-NUM_SECS_ROLLING_MEASURE, and stick it
 * somewhere. See bwhist_bandwidth_assess() below.
 */
  add_obs(write_array, when, num_bytes);
  if (ipv6)
    add_obs(write_array_ipv6, when, num_bytes);
}

/** Remember that we wrote <b>num_bytes</b> bytes in second <b>when</b>.
 * (like bwhist_note_bytes_written() above)
 */
void
bwhist_note_bytes_read(uint64_t num_bytes, time_t when, bool ipv6)
{
/* if we're smart, we can make this func and the one above share code */
  add_obs(read_array, when, num_bytes);
  if (ipv6)
    add_obs(read_array_ipv6, when, num_bytes);
}

/** Remember that we wrote <b>num_bytes</b> directory bytes in second
 * <b>when</b>. (like bwhist_note_bytes_written() above)
 */
void
bwhist_note_dir_bytes_written(uint64_t num_bytes, time_t when)
{
  add_obs(dir_write_array, when, num_bytes);
}

/** Remember that we read <b>num_bytes</b> directory bytes in second
 * <b>when</b>. (like bwhist_note_bytes_written() above)
 */
void
bwhist_note_dir_bytes_read(uint64_t num_bytes, time_t when)
{
  add_obs(dir_read_array, when, num_bytes);
}

/**
 * Helper: Return the largest value in b->maxima.  (This is equal to the
 * most bandwidth used in any NUM_SECS_ROLLING_MEASURE period for the last
 * NUM_SECS_BW_SUM_IS_VALID seconds.)
 *
 * Also include the current period if we have been observing it for
 * at least min_observation_time seconds.
 */
STATIC uint64_t
find_largest_max(bw_array_t *b, int min_observation_time)
{
  int i;
  uint64_t max;
  time_t period_start = b->next_period - NUM_SECS_BW_SUM_INTERVAL;
  if (b->cur_obs_time > period_start + min_observation_time)
    max = b->max_total;
  else
    max = 0;
  for (i=0; i<NUM_TOTALS; ++i) {
    if (b->maxima[i]>max)
      max = b->maxima[i];
  }
  return max;
}

/** Find the largest sums in the past NUM_SECS_BW_SUM_IS_VALID (roughly)
 * seconds. Find one sum for reading and one for writing. They don't have
 * to be at the same time.
 *
 * Return the smaller of these sums, divided by NUM_SECS_ROLLING_MEASURE.
 */
MOCK_IMPL(int,
bwhist_bandwidth_assess,(void))
{
  uint64_t w,r;
  int min_obs_time = get_options()->TestingMinTimeToReportBandwidth;
  r = find_largest_max(read_array, min_obs_time);
  w = find_largest_max(write_array, min_obs_time);
  if (r>w)
    return (int)(((double)w)/NUM_SECS_ROLLING_MEASURE);
  else
    return (int)(((double)r)/NUM_SECS_ROLLING_MEASURE);
}

/** Print the bandwidth history of b (either [dir-]read_array or
 * [dir-]write_array) into the buffer pointed to by buf.  The format is
 * simply comma separated numbers, from oldest to newest.
 *
 * It returns the number of bytes written.
 */
STATIC size_t
bwhist_fill_bandwidth_history(char *buf, size_t len, const bw_array_t *b)
{
  char *cp = buf;
  int i, n;
  const or_options_t *options = get_options();
  uint64_t cutoff;

  if (b->num_maxes_set <= b->next_max_idx) {
    /* We haven't been through the circular array yet; time starts at i=0.*/
    i = 0;
  } else {
    /* We've been around the array at least once.  The next i to be
       overwritten is the oldest. */
    i = b->next_max_idx;
  }

  if (options->RelayBandwidthRate) {
    /* We don't want to report that we used more bandwidth than the max we're
     * willing to relay; otherwise everybody will know how much traffic
     * we used ourself. */
    cutoff = options->RelayBandwidthRate * NUM_SECS_BW_SUM_INTERVAL;
  } else {
    cutoff = UINT64_MAX;
  }

  for (n=0; n<b->num_maxes_set; ++n,++i) {
    uint64_t total;
    if (i >= NUM_TOTALS)
      i -= NUM_TOTALS;
    tor_assert(i < NUM_TOTALS);
    /* Round the bandwidth used down to the nearest 1k. */
    total = b->totals[i] & ~0x3ff;
    if (total > cutoff)
      total = cutoff;

    if (n==(b->num_maxes_set-1))
      tor_snprintf(cp, len-(cp-buf), "%"PRIu64, (total));
    else
      tor_snprintf(cp, len-(cp-buf), "%"PRIu64",", (total));
    cp += strlen(cp);
  }
  return cp-buf;
}

/** Encode a single bandwidth history line into <b>buf</b>. */
static void
bwhist_get_one_bandwidth_line(buf_t *buf, const char *desc,
                              const bw_array_t *b)
{
  /* [dirreq-](read|write)-history yyyy-mm-dd HH:MM:SS (n s) n,n,n... */
  /* The n,n,n part above. Largest representation of a uint64_t is 20 chars
   * long, plus the comma. */
#define MAX_HIST_VALUE_LEN (21*NUM_TOTALS)

  char tmp[MAX_HIST_VALUE_LEN];
  char end[ISO_TIME_LEN+1];

  size_t slen = bwhist_fill_bandwidth_history(tmp, MAX_HIST_VALUE_LEN, b);
  /* If we don't have anything to write, skip to the next entry. */
  if (slen == 0)
    return;

  format_iso_time(end, b->next_period-NUM_SECS_BW_SUM_INTERVAL);
  buf_add_printf(buf, "%s %s (%d s) %s\n",
                 desc, end, NUM_SECS_BW_SUM_INTERVAL, tmp);
}

/** Allocate and return lines for representing this server's bandwidth
 * history in its descriptor. We publish these lines in our extra-info
 * descriptor.
 */
char *
bwhist_get_bandwidth_lines(void)
{
  buf_t *buf = buf_new();

  bwhist_get_one_bandwidth_line(buf, "write-history", write_array);
  bwhist_get_one_bandwidth_line(buf, "read-history", read_array);
  bwhist_get_one_bandwidth_line(buf, "ipv6-write-history", write_array_ipv6);
  bwhist_get_one_bandwidth_line(buf, "ipv6-read-history", read_array_ipv6);
  bwhist_get_one_bandwidth_line(buf, "dirreq-write-history", dir_write_array);
  bwhist_get_one_bandwidth_line(buf, "dirreq-read-history", dir_read_array);

  char *result = buf_extract(buf, NULL);
  buf_free(buf);
  return result;
}

/** Write a single bw_array_t into the Values, Ends, Interval, and Maximum
 * entries of an or_state_t. Done before writing out a new state file. */
static void
bwhist_update_bwhist_state_section(or_state_t *state,
                                     const bw_array_t *b,
                                     smartlist_t **s_values,
                                     smartlist_t **s_maxima,
                                     time_t *s_begins,
                                     int *s_interval)
{
  int i,j;
  uint64_t maxval;

  if (*s_values) {
    SMARTLIST_FOREACH(*s_values, char *, val, tor_free(val));
    smartlist_free(*s_values);
  }
  if (*s_maxima) {
    SMARTLIST_FOREACH(*s_maxima, char *, val, tor_free(val));
    smartlist_free(*s_maxima);
  }
  if (! server_mode(get_options())) {
    /* Clients don't need to store bandwidth history persistently;
     * force these values to the defaults. */
    /* FFFF we should pull the default out of config.c's state table,
     * so we don't have two defaults. */
    if (*s_begins != 0 || *s_interval != 900) {
      time_t now = time(NULL);
      time_t save_at = get_options()->AvoidDiskWrites ? now+3600 : now+600;
      or_state_mark_dirty(state, save_at);
    }
    *s_begins = 0;
    *s_interval = 900;
    *s_values = smartlist_new();
    *s_maxima = smartlist_new();
    return;
  }
  *s_begins = b->next_period;
  *s_interval = NUM_SECS_BW_SUM_INTERVAL;

  *s_values = smartlist_new();
  *s_maxima = smartlist_new();
  /* Set i to first position in circular array */
  i = (b->num_maxes_set <= b->next_max_idx) ? 0 : b->next_max_idx;
  for (j=0; j < b->num_maxes_set; ++j,++i) {
    if (i >= NUM_TOTALS)
      i = 0;
    smartlist_add_asprintf(*s_values, "%"PRIu64,
                           (b->totals[i] & ~0x3ff));
    maxval = b->maxima[i] / NUM_SECS_ROLLING_MEASURE;
    smartlist_add_asprintf(*s_maxima, "%"PRIu64,
                           (maxval & ~0x3ff));
  }
  smartlist_add_asprintf(*s_values, "%"PRIu64,
                         (b->total_in_period & ~0x3ff));
  maxval = b->max_total / NUM_SECS_ROLLING_MEASURE;
  smartlist_add_asprintf(*s_maxima, "%"PRIu64,
                         (maxval & ~0x3ff));
}

/** Update <b>state</b> with the newest bandwidth history. Done before
 * writing out a new state file. */
void
bwhist_update_state(or_state_t *state)
{
#define UPDATE(arrname,st) \
  bwhist_update_bwhist_state_section(state,\
                                       (arrname),\
                                       &state->BWHistory ## st ## Values, \
                                       &state->BWHistory ## st ## Maxima, \
                                       &state->BWHistory ## st ## Ends, \
                                       &state->BWHistory ## st ## Interval)

  UPDATE(write_array, Write);
  UPDATE(read_array, Read);
  UPDATE(write_array_ipv6, IPv6Write);
  UPDATE(read_array_ipv6, IPv6Read);
  UPDATE(dir_write_array, DirWrite);
  UPDATE(dir_read_array, DirRead);

  if (server_mode(get_options())) {
    or_state_mark_dirty(state, time(NULL)+(2*3600));
  }
#undef UPDATE
}

/** Load a single bw_array_t from its Values, Ends, Maxima, and Interval
 * entries in an or_state_t. Done while reading the state file. */
static int
bwhist_load_bwhist_state_section(bw_array_t *b,
                                   const smartlist_t *s_values,
                                   const smartlist_t *s_maxima,
                                   const time_t s_begins,
                                   const int s_interval)
{
  time_t now = time(NULL);
  int retval = 0;
  time_t start;

  uint64_t v, mv;
  int i,ok,ok_m = 0;
  int have_maxima = s_maxima && s_values &&
    (smartlist_len(s_values) == smartlist_len(s_maxima));

  if (s_values && s_begins >= now - NUM_SECS_BW_SUM_INTERVAL*NUM_TOTALS) {
    start = s_begins - s_interval*(smartlist_len(s_values));
    if (start > now)
      return 0;
    b->cur_obs_time = start;
    b->next_period = start + NUM_SECS_BW_SUM_INTERVAL;
    SMARTLIST_FOREACH_BEGIN(s_values, const char *, cp) {
        const char *maxstr = NULL;
        v = tor_parse_uint64(cp, 10, 0, UINT64_MAX, &ok, NULL);
        if (have_maxima) {
          maxstr = smartlist_get(s_maxima, cp_sl_idx);
          mv = tor_parse_uint64(maxstr, 10, 0, UINT64_MAX, &ok_m, NULL);
          mv *= NUM_SECS_ROLLING_MEASURE;
        } else {
          /* No maxima known; guess average rate to be conservative. */
          mv = (v / s_interval) * NUM_SECS_ROLLING_MEASURE;
        }
        if (!ok) {
          retval = -1;
          log_notice(LD_HIST, "Could not parse value '%s' into a number.'",cp);
        }
        if (maxstr && !ok_m) {
          retval = -1;
          log_notice(LD_HIST, "Could not parse maximum '%s' into a number.'",
                     maxstr);
        }

        if (start < now) {
          time_t cur_start = start;
          time_t actual_interval_len = s_interval;
          uint64_t cur_val = 0;
          /* Calculate the average per second. This is the best we can do
           * because our state file doesn't have per-second resolution. */
          if (start + s_interval > now)
            actual_interval_len = now - start;
          cur_val = v / actual_interval_len;
          /* This is potentially inefficient, but since we don't do it very
           * often it should be ok. */
          while (cur_start < start + actual_interval_len) {
            add_obs(b, cur_start, cur_val);
            ++cur_start;
          }
          b->max_total = mv;
          /* This will result in some fairly choppy history if s_interval
           * is not the same as NUM_SECS_BW_SUM_INTERVAL. XXXX */
          start += actual_interval_len;
        }
    } SMARTLIST_FOREACH_END(cp);
  }

  /* Clean up maxima and observed */
  for (i=0; i<NUM_SECS_ROLLING_MEASURE; ++i) {
    b->obs[i] = 0;
  }
  b->total_obs = 0;

  return retval;
}

/** Set bandwidth history from the state file we just loaded. */
int
bwhist_load_state(or_state_t *state, char **err)
{
  int all_ok = 1;

  /* Assert they already have been malloced */
  tor_assert(read_array && write_array);
  tor_assert(read_array_ipv6 && write_array_ipv6);
  tor_assert(dir_read_array && dir_write_array);

#define LOAD(arrname,st)                                                \
  if (bwhist_load_bwhist_state_section(                               \
                                (arrname),                              \
                                state->BWHistory ## st ## Values,       \
                                state->BWHistory ## st ## Maxima,       \
                                state->BWHistory ## st ## Ends,         \
                                state->BWHistory ## st ## Interval)<0)  \
    all_ok = 0

  LOAD(write_array, Write);
  LOAD(read_array, Read);
  LOAD(write_array_ipv6, IPv6Write);
  LOAD(read_array_ipv6, IPv6Read);
  LOAD(dir_write_array, DirWrite);
  LOAD(dir_read_array, DirRead);

#undef LOAD
  if (!all_ok) {
    *err = tor_strdup("Parsing of bandwidth history values failed");
    /* and create fresh arrays */
    bwhist_init();
    return -1;
  }
  return 0;
}

void
bwhist_free_all(void)
{
  bw_array_free(read_array);
  bw_array_free(read_array_ipv6);
  bw_array_free(write_array);
  bw_array_free(write_array_ipv6);
  bw_array_free(dir_read_array);
  bw_array_free(dir_write_array);
}