summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoger Dingledine <arma@torproject.org>2003-09-27 21:09:56 +0000
committerRoger Dingledine <arma@torproject.org>2003-09-27 21:09:56 +0000
commitcb8212bfcb56980893993db5bd4098944735b38a (patch)
treec1f382a4d4d861498956384804e2a55f338237e4
parent798bb6ab3b089005fddfded0513edeb2da231354 (diff)
downloadtor-cb8212bfcb56980893993db5bd4098944735b38a.tar.gz
tor-cb8212bfcb56980893993db5bd4098944735b38a.zip
clean up receiver buckets; prepare for payloads in relay_end; note a few bugs
svn:r502
-rw-r--r--src/or/circuit.c3
-rw-r--r--src/or/command.c4
-rw-r--r--src/or/connection.c56
-rw-r--r--src/or/connection_edge.c29
-rw-r--r--src/or/connection_or.c2
-rw-r--r--src/or/cpuworker.c2
-rw-r--r--src/or/directory.c2
-rw-r--r--src/or/dns.c7
-rw-r--r--src/or/main.c3
-rw-r--r--src/or/or.h12
10 files changed, 54 insertions, 66 deletions
diff --git a/src/or/circuit.c b/src/or/circuit.c
index c7dbcce856..650c531304 100644
--- a/src/or/circuit.c
+++ b/src/or/circuit.c
@@ -125,6 +125,7 @@ static aci_t get_unique_aci_by_addr_port(uint32_t addr, uint16_t port, int aci_t
high_bit = (aci_type == ACI_TYPE_HIGHER) ? 1<<15 : 0;
conn = connection_exact_get_by_addr_port(addr,port);
+ /* XXX race condition: if conn is marked_for_close it won't be noticed */
if (!conn)
return (1|high_bit); /* No connection exists; conflict is impossible. */
@@ -910,7 +911,7 @@ int circuit_truncated(circuit_t *circ, crypt_path_t *layer) {
for(stream = circ->p_streams; stream; stream=stream->next_stream) {
if(stream->cpath_layer == victim) {
log_fn(LOG_INFO, "Marking stream %d for close.", *(int*)stream->stream_id);
- stream->marked_for_close = 1;
+/*ENDCLOSE*/ stream->marked_for_close = 1;
}
}
diff --git a/src/or/command.c b/src/or/command.c
index 3b1fe1f644..e3943cc793 100644
--- a/src/or/command.c
+++ b/src/or/command.c
@@ -88,7 +88,7 @@ static void command_process_create_cell(cell_t *cell, connection_t *conn) {
circ = circuit_get_by_aci_conn(cell->aci, conn);
if(circ) {
- log_fn(LOG_WARNING,"received CREATE cell for known circ. Dropping.");
+ log_fn(LOG_WARNING,"received CREATE cell (aci %d) for known circ. Dropping.", cell->aci);
return;
}
@@ -118,7 +118,7 @@ static void command_process_created_cell(cell_t *cell, connection_t *conn) {
circ = circuit_get_by_aci_conn(cell->aci, conn);
if(!circ) {
- log_fn(LOG_WARNING,"received CREATED cell for unknown circ. Dropping.");
+ log_fn(LOG_WARNING,"received CREATED cell (aci %d) for unknown circ. Dropping.", cell->aci);
return;
}
diff --git a/src/or/connection.c b/src/or/connection.c
index 4ca535e129..0604cd8f85 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -83,9 +83,6 @@ connection_t *connection_new(int type) {
conn->inbuf = buf_new();
conn->outbuf = buf_new();
- conn->receiver_bucket = 50000; /* should be enough to do the handshake */
- conn->bandwidth = conn->receiver_bucket / 10; /* give it a default */
-
conn->timestamp_created = now.tv_sec;
conn->timestamp_lastread = now.tv_sec;
conn->timestamp_lastwritten = now.tv_sec;
@@ -149,8 +146,6 @@ int connection_create_listener(struct sockaddr_in *bindaddr, int type) {
conn = connection_new(type);
conn->s = s;
- conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
- conn->bandwidth = -1;
if(connection_add(conn) < 0) { /* no space, forget it */
log_fn(LOG_WARNING,"connection_add failed. Giving up.");
@@ -197,11 +192,6 @@ int connection_handle_listener_read(connection_t *conn, int new_type) {
newconn = connection_new(new_type);
newconn->s = news;
- if(!connection_speaks_cells(newconn)) {
- newconn->receiver_bucket = -1;
- newconn->bandwidth = -1;
- }
-
newconn->address = strdup(inet_ntoa(remote.sin_addr)); /* remember the remote address */
newconn->addr = ntohl(remote.sin_addr.s_addr);
newconn->port = ntohs(remote.sin_port);
@@ -305,7 +295,7 @@ static int connection_tls_finish_handshake(connection_t *conn) {
}
crypto_free_pk_env(pk);
} else { /* it's an OP */
- conn->bandwidth = DEFAULT_BANDWIDTH_OP;
+ conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP;
}
} else { /* I'm a client */
if(!tor_tls_peer_has_cert(conn->tls)) { /* it's a client too?! */
@@ -330,7 +320,7 @@ static int connection_tls_finish_handshake(connection_t *conn) {
}
log_fn(LOG_DEBUG,"The router's pk matches the one we meant to connect to. Good.");
crypto_free_pk_env(pk);
- conn->bandwidth = DEFAULT_BANDWIDTH_OP;
+ conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP;
circuit_n_conn_open(conn); /* send the pending create */
}
return 0;
@@ -446,10 +436,6 @@ int connection_handle_read(connection_t *conn) {
//log_fn(LOG_DEBUG,"connection_process_inbuf returned %d.",retval);
return -1;
}
- if(!connection_state_is_open(conn) && conn->receiver_bucket == 0) {
- log_fn(LOG_WARNING,"receiver bucket reached 0 before handshake finished. Closing.");
- return -1;
- }
return 0;
}
@@ -458,9 +444,6 @@ int connection_read_to_buf(connection_t *conn) {
int result;
int at_most;
- assert((connection_speaks_cells(conn) && conn->receiver_bucket >= 0) ||
- (!connection_speaks_cells(conn) && conn->receiver_bucket < 0));
-
if(options.LinkPadding) {
at_most = global_read_bucket;
} else {
@@ -477,14 +460,13 @@ int connection_read_to_buf(connection_t *conn) {
at_most = global_read_bucket;
}
- if(conn->receiver_bucket >= 0 && at_most > conn->receiver_bucket)
- at_most = conn->receiver_bucket;
-
if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) {
if(conn->state == OR_CONN_STATE_HANDSHAKING)
return connection_tls_continue_handshake(conn);
/* else open, or closing */
+ if(at_most > conn->receiver_bucket)
+ at_most = conn->receiver_bucket;
result = read_to_buf_tls(conn->tls, at_most, conn->inbuf);
switch(result) {
@@ -510,14 +492,21 @@ int connection_read_to_buf(connection_t *conn) {
}
global_read_bucket -= result; assert(global_read_bucket >= 0);
- if(connection_speaks_cells(conn))
- conn->receiver_bucket -= result;
- if(conn->receiver_bucket == 0 || global_read_bucket == 0) {
- log_fn(LOG_DEBUG,"buckets (%d, %d) exhausted. Pausing.", global_read_bucket, conn->receiver_bucket);
+ if(global_read_bucket == 0) {
+ log_fn(LOG_DEBUG,"global bucket exhausted. Pausing.");
conn->wants_to_read = 1;
connection_stop_reading(conn);
return 0;
}
+ if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+ conn->receiver_bucket -= result; assert(conn->receiver_bucket >= 0);
+ if(conn->receiver_bucket == 0) {
+ log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing.");
+ conn->wants_to_read = 1;
+ connection_stop_reading(conn);
+ return 0;
+ }
+ }
if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING)
if(result == at_most)
return connection_read_to_buf(conn);
@@ -627,7 +616,10 @@ int connection_receiver_bucket_should_increase(connection_t *conn) {
if(!connection_speaks_cells(conn))
return 0; /* edge connections don't use receiver_buckets */
+ if(conn->state != OR_CONN_STATE_OPEN)
+ return 0; /* only open connections play the rate limiting game */
+ assert(conn->bandwidth > 0);
if(conn->receiver_bucket > 9*conn->bandwidth)
return 0;
@@ -660,7 +652,7 @@ int connection_send_destroy(aci_t aci, connection_t *conn) {
if(!connection_speaks_cells(conn)) {
log_fn(LOG_INFO,"Aci %d: At an edge. Marking connection for close.", aci);
- conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
return 0;
}
@@ -746,13 +738,13 @@ void assert_connection_ok(connection_t *conn, time_t now)
#endif
if (conn->type != CONN_TYPE_OR) {
- assert(conn->bandwidth == -1);
- assert(conn->receiver_bucket == -1);
assert(!conn->tls);
} else {
- assert(conn->bandwidth);
- assert(conn->receiver_bucket >= 0);
- assert(conn->receiver_bucket <= 10*conn->bandwidth);
+ if(conn->state == OR_CONN_STATE_OPEN) {
+ assert(conn->bandwidth > 0);
+ assert(conn->receiver_bucket >= 0);
+ assert(conn->receiver_bucket <= 10*conn->bandwidth);
+ }
assert(conn->addr && conn->port);
assert(conn->address);
if (conn->state != OR_CONN_STATE_CONNECTING)
diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c
index 1e4b91a50f..845d4223ee 100644
--- a/src/or/connection_edge.c
+++ b/src/or/connection_edge.c
@@ -29,7 +29,7 @@ int connection_edge_process_inbuf(connection_t *conn) {
conn->done_receiving = 1;
shutdown(conn->s, 0); /* XXX check return, refactor NM */
if (conn->done_sending)
- conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
/* XXX Factor out common logic here and in circuit_about_to_close NM */
circ = circuit_get_by_conn(conn);
@@ -51,17 +51,17 @@ int connection_edge_process_inbuf(connection_t *conn) {
#else
/* eof reached, kill it. */
log_fn(LOG_INFO,"conn (fd %d) reached eof. Closing.", conn->s);
- return -1;
+/*ENDCLOSE*/ return -1;
#endif
}
switch(conn->state) {
case AP_CONN_STATE_SOCKS_WAIT:
- return connection_ap_handshake_process_socks(conn);
+/*ENDCLOSE*/ return connection_ap_handshake_process_socks(conn);
case AP_CONN_STATE_OPEN:
case EXIT_CONN_STATE_OPEN:
if(connection_package_raw_inbuf(conn) < 0)
- return -1;
+/*ENDCLOSE*/ return -1;
return 0;
case EXIT_CONN_STATE_CONNECTING:
log_fn(LOG_INFO,"text from server while in 'connecting' state at exit. Leaving it on buffer.");
@@ -133,10 +133,11 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
log_fn(LOG_INFO,"...and informing resolver we don't want the answer anymore.");
dns_cancel_pending_resolve(conn->address, conn);
}
+ return 0;
} else {
- log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Dropping.");
+ log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Closing.");
+ return -1;
}
- return 0;
}
switch(relay_command) {
@@ -174,11 +175,11 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
// printf("New text for buf (%d bytes): '%s'", cell->length - RELAY_HEADER_SIZE, cell->payload + RELAY_HEADER_SIZE);
if(connection_write_to_buf(cell->payload + RELAY_HEADER_SIZE,
cell->length - RELAY_HEADER_SIZE, conn) < 0) {
- conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
return 0;
}
if(connection_consider_sending_sendme(conn, edge_type) < 0)
- conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
return 0;
case RELAY_COMMAND_END:
if(!conn) {
@@ -191,9 +192,9 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
conn->done_sending = 1;
shutdown(conn->s, 1); /* XXX check return; refactor NM */
if (conn->done_receiving)
- conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
#endif
- conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
break;
case RELAY_COMMAND_EXTEND:
if(conn) {
@@ -240,7 +241,7 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
}
log_fn(LOG_INFO,"Connected! Notifying application.");
if(connection_ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED) < 0) {
- conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
}
break;
case RELAY_COMMAND_SENDME:
@@ -331,7 +332,7 @@ repeat_connection_package_raw_inbuf:
return 0;
if(conn->package_window <= 0) {
- log_fn(LOG_WARNING,"called with package_window 0. Tell Roger.");
+ log_fn(LOG_WARNING,"called with package_window %d. Tell Roger.", conn->package_window);
connection_stop_reading(conn);
return 0;
}
@@ -526,7 +527,7 @@ static int connection_ap_handshake_socks_reply(connection_t *conn, char result)
return connection_flush_buf(conn); /* try to flush it, in case we're about to close the conn */
}
-static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
+/*ENDCLOSE*/ static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
connection_t *n_stream;
char *colon;
@@ -553,8 +554,6 @@ static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
n_stream->address = strdup(cell->payload + RELAY_HEADER_SIZE + STREAM_ID_SIZE);
n_stream->port = atoi(colon+1);
n_stream->state = EXIT_CONN_STATE_RESOLVING;
- n_stream->receiver_bucket = -1; /* edge connections don't do receiver buckets */
- n_stream->bandwidth = -1;
n_stream->s = -1; /* not yet valid */
n_stream->package_window = STREAMWINDOW_START;
n_stream->deliver_window = STREAMWINDOW_START;
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index 6e31966dce..cddbc93c26 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -77,7 +77,7 @@ int connection_or_finished_flushing(connection_t *conn) {
void connection_or_init_conn_from_router(connection_t *conn, routerinfo_t *router) {
conn->addr = router->addr;
conn->port = router->or_port;
- conn->bandwidth = router->bandwidth;
+ conn->receiver_bucket = conn->bandwidth = router->bandwidth;
conn->onion_pkey = crypto_pk_dup_key(router->onion_pkey);
conn->link_pkey = crypto_pk_dup_key(router->link_pkey);
conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey);
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index 4d3764798b..b47df87a75 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -183,8 +183,6 @@ static int spawn_cpuworker(void) {
set_socket_nonblocking(fd[0]);
/* set up conn so it's got all the data we need to remember */
- conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
- conn->bandwidth = -1;
conn->s = fd[0];
conn->address = strdup("localhost");
diff --git a/src/or/directory.c b/src/or/directory.c
index c410821e05..2ed210cde4 100644
--- a/src/or/directory.c
+++ b/src/or/directory.c
@@ -48,8 +48,6 @@ void directory_initiate_command(routerinfo_t *router, int command) {
conn->addr = router->addr;
conn->port = router->dir_port;
conn->address = strdup(router->address);
- conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
- conn->bandwidth = -1;
if (router->identity_pkey)
conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey);
else {
diff --git a/src/or/dns.c b/src/or/dns.c
index 1b955cc004..e12f522c65 100644
--- a/src/or/dns.c
+++ b/src/or/dns.c
@@ -225,7 +225,7 @@ void dns_cancel_pending_resolve(char *question, connection_t *onlyconn) {
/* mark all pending connections to fail */
while(resolve->pending_connections) {
pend = resolve->pending_connections;
- pend->conn->marked_for_close = 1;
+/*ENDCLOSE*/ pend->conn->marked_for_close = 1;
resolve->pending_connections = pend->next;
free(pend);
}
@@ -278,7 +278,7 @@ static void dns_found_answer(char *question, uint32_t answer) {
pend = resolve->pending_connections;
pend->conn->addr = resolve->answer;
if(resolve->state == CACHE_STATE_FAILED || connection_exit_connect(pend->conn) < 0) {
- pend->conn->marked_for_close = 1;
+/*ENDCLOSE*/ pend->conn->marked_for_close = 1;
}
resolve->pending_connections = pend->next;
free(pend);
@@ -386,8 +386,6 @@ static int spawn_dnsworker(void) {
set_socket_nonblocking(fd[0]);
/* set up conn so it's got all the data we need to remember */
- conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
- conn->bandwidth = -1;
conn->s = fd[0];
conn->address = strdup("localhost");
@@ -420,6 +418,7 @@ static void spawn_enough_dnsworkers(void) {
dnsconn->marked_for_close = 1;
num_dnsworkers_busy--;
+ num_dnsworkers--;
}
if(num_dnsworkers_busy >= MIN_DNSWORKERS)
diff --git a/src/or/main.c b/src/or/main.c
index 3ce121c9e6..eb0ef7a14a 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -391,7 +391,8 @@ static int prepare_for_poll(void) {
if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */
&& global_read_bucket > 0 /* and we're allowed to read */
- && conn->receiver_bucket != 0) { /* and either an edge conn or non-empty bucket */
+ && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) {
+ /* and either a non-cell conn or a cell conn with non-empty bucket */
conn->wants_to_read = 0;
connection_start_reading(conn);
if(conn->wants_to_write == 1) {
diff --git a/src/or/or.h b/src/or/or.h
index 948518a5dd..be41e2b441 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -273,12 +273,6 @@ struct connection_t {
long timestamp_created; /* when was this connection_t created? */
- uint32_t bandwidth; /* connection bandwidth. Set to -1 for non-OR conns. */
- int receiver_bucket; /* when this hits 0, stop receiving. Every second we
- * add 'bandwidth' to this, capping it at 10*bandwidth.
- * Set to -1 for non-OR conns.
- */
-
uint32_t addr; /* these two uniquely identify a router. Both in host order. */
uint16_t port; /* if non-zero, they identify the guy on the other end
* of the connection. */
@@ -294,6 +288,12 @@ struct connection_t {
uint16_t next_aci; /* Which ACI do we try to use next on this connection?
* This is always in the range 0..1<<15-1.*/
+ /* bandwidth and receiver_bucket only used by ORs in OPEN state: */
+ uint32_t bandwidth; /* connection bandwidth. */
+ int receiver_bucket; /* when this hits 0, stop receiving. Every second we
+ * add 'bandwidth' to this, capping it at 10*bandwidth.
+ */
+
/* Used only by edge connections: */
char stream_id[STREAM_ID_SIZE];
struct connection_t *next_stream; /* points to the next stream at this edge, if any */