diff options
author | Roger Dingledine <arma@torproject.org> | 2003-09-27 21:09:56 +0000 |
---|---|---|
committer | Roger Dingledine <arma@torproject.org> | 2003-09-27 21:09:56 +0000 |
commit | cb8212bfcb56980893993db5bd4098944735b38a (patch) | |
tree | c1f382a4d4d861498956384804e2a55f338237e4 | |
parent | 798bb6ab3b089005fddfded0513edeb2da231354 (diff) | |
download | tor-cb8212bfcb56980893993db5bd4098944735b38a.tar.gz tor-cb8212bfcb56980893993db5bd4098944735b38a.zip |
clean up receiver buckets; prepare for payloads in relay_end; note a few bugs
svn:r502
-rw-r--r-- | src/or/circuit.c | 3 | ||||
-rw-r--r-- | src/or/command.c | 4 | ||||
-rw-r--r-- | src/or/connection.c | 56 | ||||
-rw-r--r-- | src/or/connection_edge.c | 29 | ||||
-rw-r--r-- | src/or/connection_or.c | 2 | ||||
-rw-r--r-- | src/or/cpuworker.c | 2 | ||||
-rw-r--r-- | src/or/directory.c | 2 | ||||
-rw-r--r-- | src/or/dns.c | 7 | ||||
-rw-r--r-- | src/or/main.c | 3 | ||||
-rw-r--r-- | src/or/or.h | 12 |
10 files changed, 54 insertions, 66 deletions
diff --git a/src/or/circuit.c b/src/or/circuit.c index c7dbcce856..650c531304 100644 --- a/src/or/circuit.c +++ b/src/or/circuit.c @@ -125,6 +125,7 @@ static aci_t get_unique_aci_by_addr_port(uint32_t addr, uint16_t port, int aci_t high_bit = (aci_type == ACI_TYPE_HIGHER) ? 1<<15 : 0; conn = connection_exact_get_by_addr_port(addr,port); + /* XXX race condition: if conn is marked_for_close it won't be noticed */ if (!conn) return (1|high_bit); /* No connection exists; conflict is impossible. */ @@ -910,7 +911,7 @@ int circuit_truncated(circuit_t *circ, crypt_path_t *layer) { for(stream = circ->p_streams; stream; stream=stream->next_stream) { if(stream->cpath_layer == victim) { log_fn(LOG_INFO, "Marking stream %d for close.", *(int*)stream->stream_id); - stream->marked_for_close = 1; +/*ENDCLOSE*/ stream->marked_for_close = 1; } } diff --git a/src/or/command.c b/src/or/command.c index 3b1fe1f644..e3943cc793 100644 --- a/src/or/command.c +++ b/src/or/command.c @@ -88,7 +88,7 @@ static void command_process_create_cell(cell_t *cell, connection_t *conn) { circ = circuit_get_by_aci_conn(cell->aci, conn); if(circ) { - log_fn(LOG_WARNING,"received CREATE cell for known circ. Dropping."); + log_fn(LOG_WARNING,"received CREATE cell (aci %d) for known circ. Dropping.", cell->aci); return; } @@ -118,7 +118,7 @@ static void command_process_created_cell(cell_t *cell, connection_t *conn) { circ = circuit_get_by_aci_conn(cell->aci, conn); if(!circ) { - log_fn(LOG_WARNING,"received CREATED cell for unknown circ. Dropping."); + log_fn(LOG_WARNING,"received CREATED cell (aci %d) for unknown circ. Dropping.", cell->aci); return; } diff --git a/src/or/connection.c b/src/or/connection.c index 4ca535e129..0604cd8f85 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -83,9 +83,6 @@ connection_t *connection_new(int type) { conn->inbuf = buf_new(); conn->outbuf = buf_new(); - conn->receiver_bucket = 50000; /* should be enough to do the handshake */ - conn->bandwidth = conn->receiver_bucket / 10; /* give it a default */ - conn->timestamp_created = now.tv_sec; conn->timestamp_lastread = now.tv_sec; conn->timestamp_lastwritten = now.tv_sec; @@ -149,8 +146,6 @@ int connection_create_listener(struct sockaddr_in *bindaddr, int type) { conn = connection_new(type); conn->s = s; - conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */ - conn->bandwidth = -1; if(connection_add(conn) < 0) { /* no space, forget it */ log_fn(LOG_WARNING,"connection_add failed. Giving up."); @@ -197,11 +192,6 @@ int connection_handle_listener_read(connection_t *conn, int new_type) { newconn = connection_new(new_type); newconn->s = news; - if(!connection_speaks_cells(newconn)) { - newconn->receiver_bucket = -1; - newconn->bandwidth = -1; - } - newconn->address = strdup(inet_ntoa(remote.sin_addr)); /* remember the remote address */ newconn->addr = ntohl(remote.sin_addr.s_addr); newconn->port = ntohs(remote.sin_port); @@ -305,7 +295,7 @@ static int connection_tls_finish_handshake(connection_t *conn) { } crypto_free_pk_env(pk); } else { /* it's an OP */ - conn->bandwidth = DEFAULT_BANDWIDTH_OP; + conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP; } } else { /* I'm a client */ if(!tor_tls_peer_has_cert(conn->tls)) { /* it's a client too?! */ @@ -330,7 +320,7 @@ static int connection_tls_finish_handshake(connection_t *conn) { } log_fn(LOG_DEBUG,"The router's pk matches the one we meant to connect to. Good."); crypto_free_pk_env(pk); - conn->bandwidth = DEFAULT_BANDWIDTH_OP; + conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP; circuit_n_conn_open(conn); /* send the pending create */ } return 0; @@ -446,10 +436,6 @@ int connection_handle_read(connection_t *conn) { //log_fn(LOG_DEBUG,"connection_process_inbuf returned %d.",retval); return -1; } - if(!connection_state_is_open(conn) && conn->receiver_bucket == 0) { - log_fn(LOG_WARNING,"receiver bucket reached 0 before handshake finished. Closing."); - return -1; - } return 0; } @@ -458,9 +444,6 @@ int connection_read_to_buf(connection_t *conn) { int result; int at_most; - assert((connection_speaks_cells(conn) && conn->receiver_bucket >= 0) || - (!connection_speaks_cells(conn) && conn->receiver_bucket < 0)); - if(options.LinkPadding) { at_most = global_read_bucket; } else { @@ -477,14 +460,13 @@ int connection_read_to_buf(connection_t *conn) { at_most = global_read_bucket; } - if(conn->receiver_bucket >= 0 && at_most > conn->receiver_bucket) - at_most = conn->receiver_bucket; - if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) { if(conn->state == OR_CONN_STATE_HANDSHAKING) return connection_tls_continue_handshake(conn); /* else open, or closing */ + if(at_most > conn->receiver_bucket) + at_most = conn->receiver_bucket; result = read_to_buf_tls(conn->tls, at_most, conn->inbuf); switch(result) { @@ -510,14 +492,21 @@ int connection_read_to_buf(connection_t *conn) { } global_read_bucket -= result; assert(global_read_bucket >= 0); - if(connection_speaks_cells(conn)) - conn->receiver_bucket -= result; - if(conn->receiver_bucket == 0 || global_read_bucket == 0) { - log_fn(LOG_DEBUG,"buckets (%d, %d) exhausted. Pausing.", global_read_bucket, conn->receiver_bucket); + if(global_read_bucket == 0) { + log_fn(LOG_DEBUG,"global bucket exhausted. Pausing."); conn->wants_to_read = 1; connection_stop_reading(conn); return 0; } + if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { + conn->receiver_bucket -= result; assert(conn->receiver_bucket >= 0); + if(conn->receiver_bucket == 0) { + log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing."); + conn->wants_to_read = 1; + connection_stop_reading(conn); + return 0; + } + } if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) if(result == at_most) return connection_read_to_buf(conn); @@ -627,7 +616,10 @@ int connection_receiver_bucket_should_increase(connection_t *conn) { if(!connection_speaks_cells(conn)) return 0; /* edge connections don't use receiver_buckets */ + if(conn->state != OR_CONN_STATE_OPEN) + return 0; /* only open connections play the rate limiting game */ + assert(conn->bandwidth > 0); if(conn->receiver_bucket > 9*conn->bandwidth) return 0; @@ -660,7 +652,7 @@ int connection_send_destroy(aci_t aci, connection_t *conn) { if(!connection_speaks_cells(conn)) { log_fn(LOG_INFO,"Aci %d: At an edge. Marking connection for close.", aci); - conn->marked_for_close = 1; +/*ENDCLOSE*/ conn->marked_for_close = 1; return 0; } @@ -746,13 +738,13 @@ void assert_connection_ok(connection_t *conn, time_t now) #endif if (conn->type != CONN_TYPE_OR) { - assert(conn->bandwidth == -1); - assert(conn->receiver_bucket == -1); assert(!conn->tls); } else { - assert(conn->bandwidth); - assert(conn->receiver_bucket >= 0); - assert(conn->receiver_bucket <= 10*conn->bandwidth); + if(conn->state == OR_CONN_STATE_OPEN) { + assert(conn->bandwidth > 0); + assert(conn->receiver_bucket >= 0); + assert(conn->receiver_bucket <= 10*conn->bandwidth); + } assert(conn->addr && conn->port); assert(conn->address); if (conn->state != OR_CONN_STATE_CONNECTING) diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index 1e4b91a50f..845d4223ee 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -29,7 +29,7 @@ int connection_edge_process_inbuf(connection_t *conn) { conn->done_receiving = 1; shutdown(conn->s, 0); /* XXX check return, refactor NM */ if (conn->done_sending) - conn->marked_for_close = 1; +/*ENDCLOSE*/ conn->marked_for_close = 1; /* XXX Factor out common logic here and in circuit_about_to_close NM */ circ = circuit_get_by_conn(conn); @@ -51,17 +51,17 @@ int connection_edge_process_inbuf(connection_t *conn) { #else /* eof reached, kill it. */ log_fn(LOG_INFO,"conn (fd %d) reached eof. Closing.", conn->s); - return -1; +/*ENDCLOSE*/ return -1; #endif } switch(conn->state) { case AP_CONN_STATE_SOCKS_WAIT: - return connection_ap_handshake_process_socks(conn); +/*ENDCLOSE*/ return connection_ap_handshake_process_socks(conn); case AP_CONN_STATE_OPEN: case EXIT_CONN_STATE_OPEN: if(connection_package_raw_inbuf(conn) < 0) - return -1; +/*ENDCLOSE*/ return -1; return 0; case EXIT_CONN_STATE_CONNECTING: log_fn(LOG_INFO,"text from server while in 'connecting' state at exit. Leaving it on buffer."); @@ -133,10 +133,11 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection log_fn(LOG_INFO,"...and informing resolver we don't want the answer anymore."); dns_cancel_pending_resolve(conn->address, conn); } + return 0; } else { - log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Dropping."); + log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Closing."); + return -1; } - return 0; } switch(relay_command) { @@ -174,11 +175,11 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection // printf("New text for buf (%d bytes): '%s'", cell->length - RELAY_HEADER_SIZE, cell->payload + RELAY_HEADER_SIZE); if(connection_write_to_buf(cell->payload + RELAY_HEADER_SIZE, cell->length - RELAY_HEADER_SIZE, conn) < 0) { - conn->marked_for_close = 1; +/*ENDCLOSE*/ conn->marked_for_close = 1; return 0; } if(connection_consider_sending_sendme(conn, edge_type) < 0) - conn->marked_for_close = 1; +/*ENDCLOSE*/ conn->marked_for_close = 1; return 0; case RELAY_COMMAND_END: if(!conn) { @@ -191,9 +192,9 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection conn->done_sending = 1; shutdown(conn->s, 1); /* XXX check return; refactor NM */ if (conn->done_receiving) - conn->marked_for_close = 1; +/*ENDCLOSE*/ conn->marked_for_close = 1; #endif - conn->marked_for_close = 1; +/*ENDCLOSE*/ conn->marked_for_close = 1; break; case RELAY_COMMAND_EXTEND: if(conn) { @@ -240,7 +241,7 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection } log_fn(LOG_INFO,"Connected! Notifying application."); if(connection_ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED) < 0) { - conn->marked_for_close = 1; +/*ENDCLOSE*/ conn->marked_for_close = 1; } break; case RELAY_COMMAND_SENDME: @@ -331,7 +332,7 @@ repeat_connection_package_raw_inbuf: return 0; if(conn->package_window <= 0) { - log_fn(LOG_WARNING,"called with package_window 0. Tell Roger."); + log_fn(LOG_WARNING,"called with package_window %d. Tell Roger.", conn->package_window); connection_stop_reading(conn); return 0; } @@ -526,7 +527,7 @@ static int connection_ap_handshake_socks_reply(connection_t *conn, char result) return connection_flush_buf(conn); /* try to flush it, in case we're about to close the conn */ } -static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) { +/*ENDCLOSE*/ static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) { connection_t *n_stream; char *colon; @@ -553,8 +554,6 @@ static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) { n_stream->address = strdup(cell->payload + RELAY_HEADER_SIZE + STREAM_ID_SIZE); n_stream->port = atoi(colon+1); n_stream->state = EXIT_CONN_STATE_RESOLVING; - n_stream->receiver_bucket = -1; /* edge connections don't do receiver buckets */ - n_stream->bandwidth = -1; n_stream->s = -1; /* not yet valid */ n_stream->package_window = STREAMWINDOW_START; n_stream->deliver_window = STREAMWINDOW_START; diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 6e31966dce..cddbc93c26 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -77,7 +77,7 @@ int connection_or_finished_flushing(connection_t *conn) { void connection_or_init_conn_from_router(connection_t *conn, routerinfo_t *router) { conn->addr = router->addr; conn->port = router->or_port; - conn->bandwidth = router->bandwidth; + conn->receiver_bucket = conn->bandwidth = router->bandwidth; conn->onion_pkey = crypto_pk_dup_key(router->onion_pkey); conn->link_pkey = crypto_pk_dup_key(router->link_pkey); conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey); diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c index 4d3764798b..b47df87a75 100644 --- a/src/or/cpuworker.c +++ b/src/or/cpuworker.c @@ -183,8 +183,6 @@ static int spawn_cpuworker(void) { set_socket_nonblocking(fd[0]); /* set up conn so it's got all the data we need to remember */ - conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */ - conn->bandwidth = -1; conn->s = fd[0]; conn->address = strdup("localhost"); diff --git a/src/or/directory.c b/src/or/directory.c index c410821e05..2ed210cde4 100644 --- a/src/or/directory.c +++ b/src/or/directory.c @@ -48,8 +48,6 @@ void directory_initiate_command(routerinfo_t *router, int command) { conn->addr = router->addr; conn->port = router->dir_port; conn->address = strdup(router->address); - conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */ - conn->bandwidth = -1; if (router->identity_pkey) conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey); else { diff --git a/src/or/dns.c b/src/or/dns.c index 1b955cc004..e12f522c65 100644 --- a/src/or/dns.c +++ b/src/or/dns.c @@ -225,7 +225,7 @@ void dns_cancel_pending_resolve(char *question, connection_t *onlyconn) { /* mark all pending connections to fail */ while(resolve->pending_connections) { pend = resolve->pending_connections; - pend->conn->marked_for_close = 1; +/*ENDCLOSE*/ pend->conn->marked_for_close = 1; resolve->pending_connections = pend->next; free(pend); } @@ -278,7 +278,7 @@ static void dns_found_answer(char *question, uint32_t answer) { pend = resolve->pending_connections; pend->conn->addr = resolve->answer; if(resolve->state == CACHE_STATE_FAILED || connection_exit_connect(pend->conn) < 0) { - pend->conn->marked_for_close = 1; +/*ENDCLOSE*/ pend->conn->marked_for_close = 1; } resolve->pending_connections = pend->next; free(pend); @@ -386,8 +386,6 @@ static int spawn_dnsworker(void) { set_socket_nonblocking(fd[0]); /* set up conn so it's got all the data we need to remember */ - conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */ - conn->bandwidth = -1; conn->s = fd[0]; conn->address = strdup("localhost"); @@ -420,6 +418,7 @@ static void spawn_enough_dnsworkers(void) { dnsconn->marked_for_close = 1; num_dnsworkers_busy--; + num_dnsworkers--; } if(num_dnsworkers_busy >= MIN_DNSWORKERS) diff --git a/src/or/main.c b/src/or/main.c index 3ce121c9e6..eb0ef7a14a 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -391,7 +391,8 @@ static int prepare_for_poll(void) { if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */ && global_read_bucket > 0 /* and we're allowed to read */ - && conn->receiver_bucket != 0) { /* and either an edge conn or non-empty bucket */ + && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) { + /* and either a non-cell conn or a cell conn with non-empty bucket */ conn->wants_to_read = 0; connection_start_reading(conn); if(conn->wants_to_write == 1) { diff --git a/src/or/or.h b/src/or/or.h index 948518a5dd..be41e2b441 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -273,12 +273,6 @@ struct connection_t { long timestamp_created; /* when was this connection_t created? */ - uint32_t bandwidth; /* connection bandwidth. Set to -1 for non-OR conns. */ - int receiver_bucket; /* when this hits 0, stop receiving. Every second we - * add 'bandwidth' to this, capping it at 10*bandwidth. - * Set to -1 for non-OR conns. - */ - uint32_t addr; /* these two uniquely identify a router. Both in host order. */ uint16_t port; /* if non-zero, they identify the guy on the other end * of the connection. */ @@ -294,6 +288,12 @@ struct connection_t { uint16_t next_aci; /* Which ACI do we try to use next on this connection? * This is always in the range 0..1<<15-1.*/ + /* bandwidth and receiver_bucket only used by ORs in OPEN state: */ + uint32_t bandwidth; /* connection bandwidth. */ + int receiver_bucket; /* when this hits 0, stop receiving. Every second we + * add 'bandwidth' to this, capping it at 10*bandwidth. + */ + /* Used only by edge connections: */ char stream_id[STREAM_ID_SIZE]; struct connection_t *next_stream; /* points to the next stream at this edge, if any */ |